diff --git a/documents/references.md b/documents/references.md index c19eb8bdb8293a08c621bfd9370fe33c3c809c4b..563bc86f913c973a022dd7c9c613205c4e10658f 100644 --- a/documents/references.md +++ b/documents/references.md @@ -25,6 +25,12 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin * <http://www.oscer.ou.edu/ncsiworkshop2012intropar_mpiblockingnonblocking_murphy_20120803.pdf> * <https://stackoverflow.com/questions/17582900/difference-between-mpi-send-and-mpi-ssend> +### MPI Async Functions +* Irecv - <https://www.open-mpi.org/doc/v3.1/man3/MPI_Irecv.3.php> +* Isend - <https://www.open-mpi.org/doc/v3.1/man3/MPI_Isend.3.php> +* Msg Buffer not Updating - <https://stackoverflow.com/a/42122170> +* MPI_Test - <https://mpi.deino.net/mpi_functions/MPI_Test.html> + ### Data Types #### Various C Data Types @@ -61,6 +67,3 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin #### Constants Between Files * <https://stackoverflow.com/a/15531238> * <https://stackoverflow.com/a/5499530> - - - diff --git a/src/load_balance_general.c b/src/load_balance_general.c index 0cdd9eca3eb24fe149b56bc7d42d08de5b4e0361..8e836d45a4c935021c4bb10248bd058ad0397208 100644 --- a/src/load_balance_general.c +++ b/src/load_balance_general.c @@ -22,14 +22,11 @@ void main_display_status(thread_struct* thread_args_ptr, int process_num, int pr log(""); log("FUNCTION main_display_status()"); sprintf(log_msg, " process_num: %i", process_num); - log(log_msg); - free(log_msg); log_msg = calloc(256, sizeof(char)); + log(log_msg); free(log_msg); log_msg = calloc(256, sizeof(char)); sprintf(log_msg, " process_load_status: %i", process_load_status); - log(log_msg); - free(log_msg); log_msg = calloc(256, sizeof(char)); + log(log_msg); free(log_msg); log_msg = calloc(256, sizeof(char)); sprintf(log_msg, " init_run: %i", init_run); - log(log_msg); - free(log_msg); + log(log_msg); free(log_msg); fflush(stdout); // Travel to desired terminal row. Skip if first loop through thread displaying. @@ -52,6 +49,10 @@ void main_display_status(thread_struct* thread_args_ptr, int process_num, int pr // } // terminal_line_start(); // } + + log("EXIT FUNCTION main_terminate_workers()"); + log(""); + fflush(stdout); } @@ -73,6 +74,9 @@ void main_terminate_workers(thread_struct* thread_args_ptr) { // printf("Process %i received termination message.\n", index); } + log("EXIT FUNCTION main_terminate_workers()"); + log(""); + fflush(stdout); } @@ -86,8 +90,7 @@ void worker_send_status(int process_rank, int process_load_status) { log(""); log("FUNCTION worker_send_status()"); sprintf(log_msg, " process_load_status: %i", process_load_status); - log(log_msg); - free(log_msg); + log(log_msg); free(log_msg); fflush(stdout); send_array[0] = process_rank; @@ -95,6 +98,10 @@ void worker_send_status(int process_rank, int process_load_status) { MPI_Ssend(send_array, 2, MPI_INT, 0, tag_status_update, MPI_COMM_WORLD); free(send_array); + + log("EXIT FUNCTION worker_send_status()"); + log(""); + fflush(stdout); } @@ -120,6 +127,11 @@ int worker_check_status() { MPI_Recv(&temp, 1, MPI_INT, main_process, tag_main_termination, MPI_COMM_WORLD, MPI_STATUS_IGNORE); return 0; } + + log("EXIT FUNCTION worker_check_status()"); + log(""); + fflush(stdout); + return 1; } @@ -128,20 +140,17 @@ int worker_check_status() { * Attempts to receive work from given "possible donor" process (donor is determined by load balancing schema). * * Does so by sending message to "possible donor" process, to initiate request. - * Once it gets around to message checking, donor will then send back a message of: - * * -1 indicating donor does not have work to give. - * * Positive integer of work being passed back to begging process. */ -int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { - int donor_response = 0; +void worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { int msg_status_flag = 0; char* log_msg = calloc(256, sizeof(char)); + int req_buffer = 0; + MPI_Request request_var; log(""); log("FUNCTION worker_send_request()"); sprintf(log_msg, " possible_donor: %i", possible_donor); - log(log_msg); - free(log_msg); + log(log_msg); free(log_msg); fflush(stdout); // Validate donor value. @@ -164,26 +173,14 @@ int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { MPI_Iprobe(possible_donor, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag != 1) { // Initialilze interaction by sending a "begging message" for work. - MPI_Ssend(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD); - - // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. - MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Isend(&req_buffer, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD, &request_var); } } } - // Check what response was and handle accoringly. - if (donor_response > 0) { - // Donor sent workload to process. - thread_args_ptr->remaining_loads = donor_response; - - // Work has been provided. Immediately attempt to update main processor of change. - worker_send_status(thread_args_ptr->thread_num, thread_args_ptr->remaining_loads); - return 1; - } else { - // Donor sent reject message. - return 0; - } + log("EXIT FUNCTION worker_send_request()"); + log(""); + fflush(stdout); } @@ -199,6 +196,7 @@ void worker_handle_request(thread_struct* thread_args_ptr) { int work_send_value = 0; int temp = 0; int index; + MPI_Request request_var; // What is this for?? Why is it required? log(""); log("FUNCTION worker_handle_request()"); @@ -232,7 +230,7 @@ void worker_handle_request(thread_struct* thread_args_ptr) { thread_args_ptr->remaining_loads = work_send_value + (thread_args_ptr->remaining_loads % 2); sprintf(msg, " new personal load value: %i", thread_args_ptr->remaining_loads); log(msg); free(msg); - MPI_Ssend(&work_send_value, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD); + MPI_Isend(&work_send_value, 1, MPI_INT, index, tag_work_response, MPI_COMM_WORLD, &request_var); // Load has been split. Immediately attempt to update main processor of change. worker_send_status(thread_args_ptr->thread_num, thread_args_ptr->remaining_loads); @@ -243,11 +241,15 @@ void worker_handle_request(thread_struct* thread_args_ptr) { sprintf(msg, " rejecting request with current load of %i", thread_args_ptr->remaining_loads); log(msg); free(msg); - MPI_Ssend(&work_send_value, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD); + MPI_Isend(&work_send_value, 1, MPI_INT, index, tag_work_response, MPI_COMM_WORLD, &request_var); } } } } + + log("EXIT FUNCTION worker_handle_request()"); + log(""); + fflush(stdout); } diff --git a/src/load_balance_general.h b/src/load_balance_general.h index 426296583ce20aa49a53dbbe11d9b4604ad9eaa8..64601b0c49b13c325fb23f4eecb7e644d2dc44b9 100644 --- a/src/load_balance_general.h +++ b/src/load_balance_general.h @@ -17,5 +17,5 @@ void main_display_status(); void main_terminate_workers(); void worker_send_status(); int worker_check_status(); -int worker_send_request(); +void worker_send_request(); void worker_handle_request(); diff --git a/src/load_balance_schemes.c b/src/load_balance_schemes.c index e74ae36b249ac0fba349a4ac8190d975ad879710..5b26ceeed22d56869c0ab01c06181e98c325d6f5 100644 --- a/src/load_balance_schemes.c +++ b/src/load_balance_schemes.c @@ -133,8 +133,11 @@ void arr_worker(thread_struct* thread_args_ptr) { int arr_counter = thread_args_ptr->thread_num; int main_loop_bool = 1; int msg_status_flag; + int request_flag = -1; + int index; + int work_value = 0; - log("Starting GRR worker."); + log("Starting ARR worker."); // Initialize process "1" to have full loads. All others start with none. if (process_rank == 1) { @@ -143,24 +146,33 @@ void arr_worker(thread_struct* thread_args_ptr) { thread_args_ptr->remaining_loads = 0; } + // Initialize output by immediately sending status to main process. + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + // Main loop bool. Continues until main process says all work is done. while (main_loop_bool == 1) { // Loop all worker processes until load count message of "0" has been sent. - while (thread_args_ptr->remaining_loads >= 0) { + while (thread_args_ptr->remaining_loads > 0) { // Send message for current load. worker_send_status(process_rank, thread_args_ptr->remaining_loads); // Check for message requests from other processes. - worker_handle_request(thread_args_ptr); + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } // Decriment load number. thread_args_ptr->remaining_loads -= 1; } // If we made it this far, then worker no longer has work. - // Start every step at this point by double checking for messages first. + + // Check for pending work requests. msg_status_flag = 0; MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { @@ -168,25 +180,37 @@ void arr_worker(thread_struct* thread_args_ptr) { worker_handle_request(thread_args_ptr); } - // Update personal ARR counter. - arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; - while (arr_counter == thread_args_ptr->thread_num || arr_counter == 0) { + + if (request_flag == -1) { + // Update personal ARR counter. arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; - } + while (arr_counter == thread_args_ptr->thread_num || arr_counter == 0) { + arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; + } - msg_status_flag = 0; - MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); - if (msg_status_flag == 1) { - // At least one message pending. Handle before proceeding. - worker_handle_request(thread_args_ptr); + // Beg fellow worker for work. + worker_send_request(thread_args_ptr, arr_counter); + request_flag = arr_counter; + } else { + msg_status_flag = 0; + MPI_Iprobe(request_flag, tag_work_response, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + MPI_Recv(&work_value, 1, MPI_INT, request_flag, tag_work_response, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Update personal load value if non-reject message provided. + if (work_value > 0) { + thread_args_ptr->remaining_loads = work_value; + } + + // Reset flag to allow begging for work again in the future. + request_flag = -1; + } } - // Beg fellow worker for work. - worker_send_request(thread_args_ptr, arr_counter); - // Check if program as a whole is done running. main_loop_bool = worker_check_status(); + // Check again for pending work requests. msg_status_flag = 0; MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { @@ -194,11 +218,19 @@ void arr_worker(thread_struct* thread_args_ptr) { worker_handle_request(thread_args_ptr); } + // Check status of overall work. if (main_loop_bool == 1) { + // Sleep to simulate "doing work". sleep(1); } } + // Work is complete. Stick around briefly to ensure all pending message requests are handled. + for (index = 0; index < 5; index++) { + worker_handle_request(thread_args_ptr); + sleep(1); + } + log("Finished ARR worker."); } diff --git a/src/main.c b/src/main.c index 38d117f2c7a6cc6aebb14ed2e8715ce7bfe8affb..28f6f194c5a9e124e717f833a5de98f1abfc8b45 100644 --- a/src/main.c +++ b/src/main.c @@ -5,6 +5,7 @@ * * 1 - Termination message from main. * * 2 - Work request message. * * 3 - Status update message. + * * 4 - Work response message. */ diff --git a/src/structs.c b/src/structs.c index 958d591cd1ca5e3c035c790efa4733d06b53a4d7..b2baae546f2f78145b58043ebf617e0da8c0b82e 100644 --- a/src/structs.c +++ b/src/structs.c @@ -14,6 +14,7 @@ const int tag_main_termination = 1; const int tag_work_request = 2; const int tag_status_update = 3; +const int tag_work_response = 4; thread_struct* thread_args_ptr; diff --git a/src/structs.h b/src/structs.h index a31d38b926f311163dccbaba87b4be43c5d02458..020b154cbb1dd4358ab0a97a0c48ca564ca7300f 100644 --- a/src/structs.h +++ b/src/structs.h @@ -18,6 +18,7 @@ extern const int tag_main_termination; extern const int tag_work_request; extern const int tag_status_update; +extern const int tag_work_response; /**