diff --git a/documents/references.md b/documents/references.md index 45127340e64b83ea6815007103408e2e3fce761d..c19eb8bdb8293a08c621bfd9370fe33c3c809c4b 100644 --- a/documents/references.md +++ b/documents/references.md @@ -18,8 +18,12 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin <https://stackoverflow.com/a/38039050> #### MPI Non-blocking Messages -<https://www.clustermonkey.net/MPI/the-joys-of-asynchronous-communication.html> -<http://www.mathcs.emory.edu/~cheung/Courses/561/Syllabus/92-MPI/async.html> +* <https://www.clustermonkey.net/MPI/the-joys-of-asynchronous-communication.html> +* <http://www.mathcs.emory.edu/~cheung/Courses/561/Syllabus/92-MPI/async.html> + +### Determining Seeming Inconsistencies in MPI Synchronization +* <http://www.oscer.ou.edu/ncsiworkshop2012intropar_mpiblockingnonblocking_murphy_20120803.pdf> +* <https://stackoverflow.com/questions/17582900/difference-between-mpi-send-and-mpi-ssend> ### Data Types @@ -30,6 +34,11 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin <https://www.tutorialspoint.com/c_standard_library/limits_h.htm> +### Makefiles +#### Passing Command Line Args +<https://stackoverflow.com/a/47008498> + + ### Other #### strtol Function <https://www.tutorialspoint.com/c_standard_library/c_function_strtol.htm> @@ -38,8 +47,8 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin <https://www.tutorialspoint.com/cprogramming/c_pointer_to_pointer.htm> #### Moving Cursor in Terminal -<https://stackoverflow.com/a/35190285> -<https://tldp.org/HOWTO/Bash-Prompt-HOWTO/x361.html> +* <https://stackoverflow.com/a/35190285> +* <https://tldp.org/HOWTO/Bash-Prompt-HOWTO/x361.html> #### Logging * Create Dir - <https://stackoverflow.com/a/7430262> @@ -49,7 +58,9 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin ** <https://stackoverflow.com/a/8884408> * Function Macros - <https://gcc.gnu.org/onlinedocs/cpp/Function-like-Macros.html#Function-like-Macros> +#### Constants Between Files +* <https://stackoverflow.com/a/15531238> +* <https://stackoverflow.com/a/5499530> + + -### Makefiles -#### Passing Command Line Args -<https://stackoverflow.com/a/47008498> diff --git a/src/load_balance_general.c b/src/load_balance_general.c index d791ca9524b8a3622d2850975745850772ff0ec6..c52514c571dda75c153cab21f9271637f7bc2823 100644 --- a/src/load_balance_general.c +++ b/src/load_balance_general.c @@ -48,7 +48,7 @@ void main_display_status(thread_struct* thread_args_ptr, int process_num, int pr // Travel back to original terminal location. Skip if first loop through thread displaying. if (init_run == 0) { - for (index = 0; index < (thread_args_ptr->total_processors - process_num); index ++) { + for (index = 0; index < (thread_args_ptr->total_processors - process_num - 1); index ++) { terminal_line_down(); } terminal_line_start(); @@ -60,7 +60,6 @@ void main_display_status(thread_struct* thread_args_ptr, int process_num, int pr * Sends message to all worker processes that all work has been completed, and it's okay to terminate now. */ void main_terminate_workers(thread_struct* thread_args_ptr) { - int program_end_tag = 1; int index; MPI_Request request_var; // What is this for?? Why is it required? @@ -70,9 +69,9 @@ void main_terminate_workers(thread_struct* thread_args_ptr) { fflush(stdout); for (index = 1; index < thread_args_ptr->total_processors; index++) { - MPI_Isend(&program_end_tag, 1, MPI_INT, index, program_end_tag, MPI_COMM_WORLD, &request_var); + MPI_Isend(&index, 1, MPI_INT, index, tag_main_termination, MPI_COMM_WORLD, &request_var); // printf("Sending termination message to process %i.\n", index); - // MPI_Send(&program_end_tag, 1, MPI_INT, index, program_end_tag, MPI_COMM_WORLD); + // MPI_Ssend(&index, 1, MPI_INT, index, tag_main_termination, MPI_COMM_WORLD); // printf("Process %i received termination message.\n", index); } @@ -96,7 +95,7 @@ void worker_send_status(int process_rank, int process_load_status) { send_array[0] = process_rank; send_array[1] = process_load_status; - MPI_Send(send_array, 2, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Ssend(send_array, 2, MPI_INT, 0, tag_status_update, MPI_COMM_WORLD); free(send_array); } @@ -111,7 +110,6 @@ int worker_check_status() { int msg_status_flag = 0; int temp = 0; int main_process = 0; - int program_end_tag = 1; log(""); log("FUNCTION worker_check_status()"); @@ -120,10 +118,10 @@ int worker_check_status() { // Check for pending message from main process. // Should only ocurr if main is telling child process that all work has completed. - MPI_Iprobe(main_process, program_end_tag, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + MPI_Iprobe(main_process, tag_main_termination, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { // Message present. Officially grab message so master can unblock. - MPI_Recv(&temp, 1, MPI_INT, main_process, program_end_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&temp, 1, MPI_INT, main_process, tag_main_termination, MPI_COMM_WORLD, MPI_STATUS_IGNORE); return 0; } return 1; @@ -158,13 +156,13 @@ int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { } // Ensure thread isn't currently sending a message to this thread. If so, abort. - MPI_Iprobe(possible_donor, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + MPI_Iprobe(possible_donor, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag != 1) { // Initialilze interaction by sending a "begging message" for work. - MPI_Send(&donor_response, 1, MPI_INT, possible_donor, 0, MPI_COMM_WORLD); + MPI_Ssend(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD); // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. - MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } // Check what response was and handle accoringly. @@ -197,29 +195,29 @@ void worker_handle_request(thread_struct* thread_args_ptr) { log(""); fflush(stdout); - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { // One or more messages present. Loop through and check for message from all processors. for (index = 1; index < thread_args_ptr->total_processors; index++) { msg_status_flag = 0; - MPI_Iprobe(index, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + MPI_Iprobe(index, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { // Message present from processor at corresponding index. Handle. // First, properly "receive" pending message, so asking worker can unblock. - MPI_Recv(&temp, 1, MPI_INT, index, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&temp, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // Check if donating worker has work to split and send. if (thread_args_ptr->remaining_loads > 1) { // Worker has at least one load to split and send. Do so. work_send_value = thread_args_ptr->remaining_loads / 2; thread_args_ptr->remaining_loads = work_send_value + (thread_args_ptr->remaining_loads % 2); - MPI_Send(&work_send_value, 1, MPI_INT, index, 0, MPI_COMM_WORLD); + MPI_Ssend(&work_send_value, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD); } else { // Worker has exactly 0 or 1 loads. Not enough to send. Reject request instead. work_send_value = -1; - MPI_Send(&work_send_value, 1, MPI_INT, index, 0, MPI_COMM_WORLD); + MPI_Ssend(&work_send_value, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD); } } diff --git a/src/load_balance_schemes.c b/src/load_balance_schemes.c index b19139626998856df4c3370efdd264a18a5a6529..e74ae36b249ac0fba349a4ac8190d975ad879710 100644 --- a/src/load_balance_schemes.c +++ b/src/load_balance_schemes.c @@ -66,7 +66,7 @@ void arr_main(thread_struct* thread_args_ptr) { // Initialize console display. for (index = 1; index < thread_args_ptr->total_processors; index++) { recv_array = calloc(3, sizeof(int)); - MPI_Recv(recv_array, 2, MPI_INT, index, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(recv_array, 2, MPI_INT, index, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); main_display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); free(recv_array); } @@ -80,13 +80,13 @@ void arr_main(thread_struct* thread_args_ptr) { msg_status_flag = 0; // Check if message present from respective process. - MPI_Iprobe(index, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + MPI_Iprobe(index, tag_status_update, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { msg_recieved = 1; // Message present. Retrieve and handle. recv_array = calloc(3, sizeof(int)); - MPI_Recv(recv_array, 2, MPI_INT, index, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(recv_array, 2, MPI_INT, index, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); main_display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); free(recv_array); } @@ -132,6 +132,7 @@ void arr_worker(thread_struct* thread_args_ptr) { int process_rank = thread_args_ptr->thread_num; int arr_counter = thread_args_ptr->thread_num; int main_loop_bool = 1; + int msg_status_flag; log("Starting GRR worker."); @@ -158,19 +159,41 @@ void arr_worker(thread_struct* thread_args_ptr) { thread_args_ptr->remaining_loads -= 1; } - // If we made it this far, then worker no longer has work. First beg fellow worker for work. + // If we made it this far, then worker no longer has work. + // Start every step at this point by double checking for messages first. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + // Update personal ARR counter. arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; while (arr_counter == thread_args_ptr->thread_num || arr_counter == 0) { arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; } - // worker_send_request(thread_args_ptr, arr_counter); - // Reject any work requests from other processors. - worker_handle_request(thread_args_ptr); + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + // Beg fellow worker for work. + worker_send_request(thread_args_ptr, arr_counter); // Check if program as a whole is done running. main_loop_bool = worker_check_status(); + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + if (main_loop_bool == 1) { sleep(1); } diff --git a/src/main.c b/src/main.c index 2d33259ebaaee1087910177d9feaf820eeffec86..38d117f2c7a6cc6aebb14ed2e8715ce7bfe8affb 100644 --- a/src/main.c +++ b/src/main.c @@ -1,5 +1,10 @@ /** * An implementation of various Load Balancing schemes, to examine the concepts of Isoefficiency. + * + * MPI Tags: + * * 1 - Termination message from main. + * * 2 - Work request message. + * * 3 - Status update message. */ diff --git a/src/structs.c b/src/structs.c index 92950339d54aa5f371e48fdfad95ce2f4daaaeb0..02a7e3833395b3241536294326937b629fc9206c 100644 --- a/src/structs.c +++ b/src/structs.c @@ -11,6 +11,9 @@ // Global Variables. +const int tag_main_termination = 1; +const int tag_work_request = 2; +const int tag_status_update = 3; thread_struct* thread_args_ptr; @@ -20,7 +23,7 @@ thread_struct* thread_args_ptr; void _log(char const* file, long line, char const* message) { // Print to console (debugging only). - printf(" P%3i [%s:%ld] %s\n", thread_args_ptr->thread_num, file, line, message); + // printf(" P%3i [%s:%ld] %s\n", thread_args_ptr->thread_num, file, line, message); // Log to file. fprintf(thread_args_ptr->log_file, "[%s:%ld] %s\n", file, line, message); diff --git a/src/structs.h b/src/structs.h index cf3b2d55cffc2f30ecb7cf4d7ff799b031f31c67..a31d38b926f311163dccbaba87b4be43c5d02458 100644 --- a/src/structs.h +++ b/src/structs.h @@ -14,6 +14,12 @@ #include <sys/stat.h> +// Global Variables. +extern const int tag_main_termination; +extern const int tag_work_request; +extern const int tag_status_update; + + /** * Data for a single thread runtime. */