From f9e60b636f82ccd77e90aaccfae07c05cbb1b87f Mon Sep 17 00:00:00 2001 From: Brandon Rodriguez <brodriguez8774@gmail.com> Date: Sat, 10 Mar 2018 16:38:17 -0500 Subject: [PATCH] Implement full thread-safe bounded buffer --- Main.c | 8 ++++---- buffer.c | 42 +++++++++++++++++++++++++----------------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/Main.c b/Main.c index 9d2b125..ef92d47 100644 --- a/Main.c +++ b/Main.c @@ -36,8 +36,8 @@ // Constant Defines. -#define SLEEP_MIN 3 -#define SLEEP_MAX 10 +#define SLEEP_MIN 2 +#define SLEEP_MAX 3 // Variable Declaration. @@ -214,10 +214,10 @@ void* consumer(void* max_time) { sleep(sleep_actual); item = calloc(1, sizeof(int)); - if ((*item = remove_item(item)) != 0) { + if ((remove_item(item)) != 0) { fprintf(stderr, "Consumer failed to properly remove item.\n"); } else { - printf("Consumer removed item %d.\n", *item); + printf("Consumer removed item with value %d.\n", *item); } free(item); } diff --git a/buffer.c b/buffer.c index d9d9084..730ff4b 100644 --- a/buffer.c +++ b/buffer.c @@ -16,7 +16,7 @@ // Variable Declaration. -buffer_item buffer[BUFFER_SIZE]; +volatile buffer_item buffer[BUFFER_SIZE]; volatile int empty_lock; // Holds number of empty buffer slots. volatile int full_lock; // Holds number of used buffer slots. volatile int mutex_lock; // Limits accessing of buffer. 0 if currently accessed, 1 if available. @@ -41,17 +41,20 @@ void intialize_item_buffer() { int insert_item(buffer_item item) { int return_int = 0; - // printf("Producer is attempting to add item.\n"); - + // Check for empty semaphore. while((empty_lock == 0) || (semaphore_lock == 0)) usleep(full_lock); // Wait for open buffer slot, as determined by empty lock. wait_semaphore(&empty_lock); + // Check for mutex. while(mutex_lock == 0) usleep(mutex_lock); // Wait for mutex lock. wait_mutex(&mutex_lock); - // Add item. + + // Actually add item. + buffer[full_lock] = item; + // Error checking for mutex. if (mutex_lock == 1) { @@ -67,10 +70,8 @@ int insert_item(buffer_item item) { return_int = 1; } else { signal_semaphore(&full_lock); - // printf("Full lock is now: %d\n", full_lock); } - return return_int; } @@ -81,19 +82,22 @@ int insert_item(buffer_item item) { int remove_item(buffer_item* save_location) { int return_int = 0; - // printf("Consumer is attempting to remove item.\n"); - + // Check for full semaphore. while((full_lock == 0) || (semaphore_lock == 0)) - // printf("Waiting on 'Full Lock': %d\n", full_lock); usleep(empty_lock); // Wait for filled buffer slot, as determined by full lock. wait_semaphore(&full_lock); + // Check for mutex. while(mutex_lock == 0) - // printf("Waiting on 'Mutex Lock': %d\n", mutex_lock); usleep(mutex_lock); // Wait for mutex lock. wait_mutex(&mutex_lock); - // Remove item. + + // Actually remove item. + *save_location = buffer[full_lock]; + // printf("Removing %d at index %d\n", *save_location, full_lock); + buffer[full_lock] = 0; + // Error checking for mutex. if (mutex_lock == 1) { @@ -111,8 +115,6 @@ int remove_item(buffer_item* save_location) { signal_semaphore(&empty_lock); } - // Consume item. - return return_int; } @@ -139,6 +141,7 @@ void signal_mutex(volatile int* lock) { * Decriment provided semaphore for locking purposes. */ void wait_semaphore(int* semaphore) { + while ((*semaphore <= 0) || (semaphore_lock == 0)) ; // Doublecheck for semaphore locks. If 0 or error, wait again. @@ -147,9 +150,11 @@ void wait_semaphore(int* semaphore) { usleep(semaphore_lock); // Wait for semaphore lock. wait_mutex(&semaphore_lock); - printf("Wait Semaphore value (before) is %d\n", *semaphore); + + // printf("Wait Semaphore value (before) is %d\n", *semaphore); *semaphore = *semaphore - 1; - printf("Wait Semaphore value (after) is %d\n", *semaphore); + // printf("Wait Semaphore value (after) is %d\n", *semaphore); + // Allow for accessing of semaphore section. signal_mutex(&semaphore_lock); @@ -160,6 +165,7 @@ void wait_semaphore(int* semaphore) { * Increment provided semaphore for locking purposes. */ void signal_semaphore(int* semaphore) { + while ((*semaphore >= BUFFER_SIZE) || (semaphore_lock == 0)) ; // Doublecheck for semaphore locks. If max or error, wait again. @@ -168,9 +174,11 @@ void signal_semaphore(int* semaphore) { usleep(semaphore_lock); // Wait for semaphore lock. wait_mutex(&semaphore_lock); - printf("Signal Semaphore value (before) is %d\n", *semaphore); + + // printf("Signal Semaphore value (before) is %d\n", *semaphore); *semaphore = *semaphore + 1; - printf("Signal Semaphore value (after) is %d\n", *semaphore); + // printf("Signal Semaphore value (after) is %d\n", *semaphore); + // Allow for accessing of semaphore section. signal_mutex(&semaphore_lock); -- GitLab