diff --git a/src/load_balance_general.c b/src/load_balance_general.c index cc2376423bb8703be11c55582d5cad2b20eae1dd..b82bd70282013e9a09615295effe10e9af3d936b 100644 --- a/src/load_balance_general.c +++ b/src/load_balance_general.c @@ -12,6 +12,36 @@ #endif +/** + * Checks for status updates from given worker process. + * + * :param thread_args_ptr: Struct for processor values. + * :param worker_id: Process id of worker to check. + * :return: Bool indicating if message was acquired from worker or not. + */ +int main_check_worker_status(thread_struct* thread_args_ptr, int worker_id) { + int msg_received = 0; + int msg_status_flag = 0; + int* recv_array; + + // Check if message present from respective process. + MPI_Iprobe(worker_id, tag_status_update, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + while (msg_status_flag == 1) { + msg_received = 1; + + // Message present. Retrieve and handle. + recv_array = calloc(3, sizeof(int)); + MPI_Recv(recv_array, 2, MPI_INT, worker_id, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + main_display_status(thread_args_ptr, recv_array[0], recv_array[1], 0); + free(recv_array); + + MPI_Iprobe(worker_id, tag_status_update, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + } + + return msg_received; +} + + /** * Displays provided worker data through master process. * diff --git a/src/load_balance_general.h b/src/load_balance_general.h index 64601b0c49b13c325fb23f4eecb7e644d2dc44b9..45db3b6601e326ece5e99190d56ae113fdf2f09d 100644 --- a/src/load_balance_general.h +++ b/src/load_balance_general.h @@ -13,6 +13,7 @@ // Function Prototypes. +int main_check_worker_status(); void main_display_status(); void main_terminate_workers(); void worker_send_status(); diff --git a/src/load_balance_schemes.c b/src/load_balance_schemes.c index d80e4ea8436c1a36db4a711a87a547aa488b9434..55cdc77c1990f2864f75847ebdd540200cb13967 100644 --- a/src/load_balance_schemes.c +++ b/src/load_balance_schemes.c @@ -60,10 +60,8 @@ void arr_run(thread_struct* thread_args_ptr) { */ void arr_main(thread_struct* thread_args_ptr) { int index; - int main_loop_bool = 1; - int init_run = 1; - int msg_status_flag; - int msg_recieved = 0; + int main_loop_bool = 3; + int msg_received = 0; int* recv_array; log("Starting arr_main."); @@ -72,7 +70,7 @@ void arr_main(thread_struct* thread_args_ptr) { for (index = 1; index < thread_args_ptr->total_processors; index++) { recv_array = calloc(3, sizeof(int)); MPI_Recv(recv_array, 2, MPI_INT, index, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - main_display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); + main_display_status(thread_args_ptr, recv_array[0], recv_array[1], 1); free(recv_array); } @@ -80,22 +78,11 @@ void arr_main(thread_struct* thread_args_ptr) { // For each loop, it checks all worker processes for a message about work being done. // If no messages are found for two consecutive loops in a row, then assumes all work is complete and terminates. index = 1; - init_run = 0; - while (main_loop_bool == 1) { - msg_status_flag = 0; - - // Check if message present from respective process. - MPI_Iprobe(index, tag_status_update, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); - while (msg_status_flag == 1) { - msg_recieved = 1; + while (main_loop_bool > 0) { - // Message present. Retrieve and handle. - recv_array = calloc(3, sizeof(int)); - MPI_Recv(recv_array, 2, MPI_INT, index, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - main_display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); - free(recv_array); - - MPI_Iprobe(index, tag_status_update, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + // Check status of worker process. + if (main_check_worker_status(thread_args_ptr, index)) { + msg_received = 1; } // Increment for next thread. @@ -106,17 +93,19 @@ void arr_main(thread_struct* thread_args_ptr) { index += 1; // Check if messages were recieved this run. - if (msg_recieved == 0) { - // No messages recieved. Run one last loop to double check. - msg_recieved = -1; - sleep(2); - } else if (msg_recieved == -1) { - // No messages recieved for two loops in a row. Terminate. - main_terminate_workers(thread_args_ptr); - main_loop_bool = 0; + if (msg_received == 0) { + // No messages recieved. + main_loop_bool -= 1; + + // Send termination request if loop bool hit 0. + // Otherwise loop a few more times to give requests time to propagate. + if (main_loop_bool <= 0) { + main_terminate_workers(thread_args_ptr); + } + } else { - // One or more messages recieved this loop. Reset value. - msg_recieved = 0; + // One or more messages recieved this loop. Reset main loop value. + main_loop_bool = 3; } // Sleep number of seconds provided by user terminal. @@ -124,6 +113,7 @@ void arr_main(thread_struct* thread_args_ptr) { // before continuing on. In a real program, the worker would be doing something useful in this time, // instead of just blocking. sleep(thread_args_ptr->seconds_per_load); + msg_received = 0; } } @@ -143,7 +133,6 @@ void arr_worker(thread_struct* thread_args_ptr) { int main_loop_bool = 1; int msg_status_flag; int request_flag = -1; - int index; int work_value = 0; int send_status = 1;