diff --git a/documents/references.md b/documents/references.md index 9053dea2f28c8e4d80c44b3678aad1c1b55d821f..8c8d4adb1d6e42623d91a2cbc3fa409536b26f50 100644 --- a/documents/references.md +++ b/documents/references.md @@ -14,6 +14,13 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin #### Getting Processor Count <https://stackoverflow.com/a/47520857> +#### Checking if MPI Message is Present +<https://stackoverflow.com/a/38039050> + +#### MPI Non-blocking Messages +<https://www.clustermonkey.net/MPI/the-joys-of-asynchronous-communication.html> +<http://www.mathcs.emory.edu/~cheung/Courses/561/Syllabus/92-MPI/async.html> + ### Data Types #### Various C Data Types diff --git a/src/main.c b/src/main.c index 0b34381cd77039d977d49541a14cd8940eae64cf..e0df9b81dc7b816259c47a6f82f8a01838c817ea 100644 --- a/src/main.c +++ b/src/main.c @@ -32,8 +32,7 @@ void run_program(); // Main program logic. Handles various Load Balance // Global Variables. int total_processors; int process_rank; -int seconds_per_index; -int indexes_per_load; +int seconds_per_load; int total_loads; @@ -62,12 +61,11 @@ int main(int argc, char* argv[]) { if (process_rank == 0) { printf("System & Program Variables:\n"); printf(" %i Total Loads\n", total_loads); - printf(" %i Indexes Per Load\n", indexes_per_load); - printf(" %i Seconds Per Index\n", seconds_per_index); + printf(" %i Seconds Per Load\n", seconds_per_load); printf("\n"); - printf(" Total Expected work = TotalLoads * IndexesPerLoad * SecondsPerIndex\n"); - printf(" = %i * %i * %i\n", total_loads, indexes_per_load, seconds_per_index); - int seconds_active_work = total_loads * indexes_per_load * seconds_per_index; + printf(" Total Expected work = TotalLoads * SecondsPerLoad\n"); + printf(" = %i * %i\n", total_loads, seconds_per_load); + int seconds_active_work = total_loads * seconds_per_load; printf(" = %i Seconds Active Work\n", seconds_active_work); printf(" = %i Minutes Active Work\n", (seconds_active_work / 60)); printf("\n"); @@ -103,25 +101,14 @@ int main(int argc, char* argv[]) { */ void validate_args(int argc, char* argv[]) { - if (argc == 4) { + if (argc == 3) { // Expected number of params. Validate passed args. - // Validate "seconds_per_index" value. Should be between 1 and 10. - seconds_per_index = strtol(argv[1], NULL, 10); - if ((seconds_per_index < 1) || (seconds_per_index > 10)) { + // Validate "seconds_per_load" value. Should be between 1 and 10. + seconds_per_load = strtol(argv[1], NULL, 10); + if ((seconds_per_load < 1) || (seconds_per_load > 10)) { if (process_rank == 0) { - printf("Arg1 (seconds_per_index) should be int between 1 and 10.\n"); - printf("\n"); - printf("Terminating program.\n"); - } - exit(1); - } - - // Validate "indexes_per_load" value. Should be between 100 and 10,000. - indexes_per_load = strtol(argv[2], NULL, 10); - if ((indexes_per_load < 10) || (indexes_per_load > 10000)) { - if (process_rank == 0) { - printf("Arg2 (indexes_per_load) should be int between 10 and 10,000.\n"); + printf("Arg1 (seconds_per_load) should be int between 1 and 10.\n"); printf("\n"); printf("Terminating program.\n"); } @@ -129,20 +116,20 @@ void validate_args(int argc, char* argv[]) { } // Validate "total_loads" value. Should be between 100 and 10,000. - total_loads = strtol(argv[3], NULL, 10); + total_loads = strtol(argv[2], NULL, 10); if ((total_loads < 10) || (total_loads > 10000)) { if (process_rank == 0) { - printf("Arg3 (total_loads) should be int between 10 and 10,000.\n"); + printf("Arg2 (total_loads) should be int between 10 and 10,000.\n"); printf("\n"); printf("Terminating program.\n"); } exit(1); } - } else if (argc > 4) { + } else if (argc > 3) { // Too many args. Error. if (process_rank == 0) { - printf("Too many args passed. Got %i. Expected 3.\n", (argc - 1)); + printf("Too many args passed. Got %i. Expected 2.\n", (argc - 1)); printf("\n"); printf("Terminating program.\n"); } @@ -151,7 +138,7 @@ void validate_args(int argc, char* argv[]) { } else { // Too few args. Error. if (process_rank == 0) { - printf("Too few args passed. Got %i. Expected 3.\n", (argc - 1)); + printf("Too few args passed. Got %i. Expected 2.\n", (argc - 1)); printf("\n"); printf("Terminating program.\n"); } @@ -189,13 +176,12 @@ void run_program() { MPI_Barrier(MPI_COMM_WORLD); thread_struct* thread_args_ptr = initialize_thread_struct( - total_processors, seconds_per_index, indexes_per_load, total_loads, process_rank + total_processors, seconds_per_load, total_loads, process_rank ); run_arr(thread_args_ptr); - // Test load logic with no parallelization. Debugging only. - // simulate_load("main"); + free_thread_struct(thread_args_ptr); // Wait for all processes to synchronize. MPI_Barrier(MPI_COMM_WORLD); diff --git a/src/simulate_loads.c b/src/simulate_loads.c index d0cbff066f5bb947b72e8ac6c89c6511e48b4641..7597f75a4dfe1796b7daaedf95e157c00f13b537 100644 --- a/src/simulate_loads.c +++ b/src/simulate_loads.c @@ -12,11 +12,12 @@ #endif -void display_status(thread_struct* thread_args_ptr, int process_num, int process_load_status, int init_run) { +/** + * Displays provided worker data through master process. + */ +void main_display_status(thread_struct* thread_args_ptr, int process_num, int process_load_status, int init_run) { int index; - // printf("init_run: %i\n", init_run); - // Travel to desired terminal row. Skip if first loop through thread displaying. if (init_run == 0) { for (index = 0; index < (thread_args_ptr->total_processors - process_num); index ++) { @@ -40,6 +41,133 @@ void display_status(thread_struct* thread_args_ptr, int process_num, int process } +/** + * Sends message to all worker processes that all work has been completed, and it's okay to terminate now. + */ +void main_terminate_workers(thread_struct* thread_args_ptr) { + int program_end_tag = 1; + int index; + MPI_Request request_var; // What is this for?? Why is it required? + + for (index = 1; index < thread_args_ptr->total_processors; index++) { + MPI_Isend(&program_end_tag, 1, MPI_INT, index, program_end_tag, MPI_COMM_WORLD, &request_var); + // printf("Sending termination message to process %i.\n", index); + // MPI_Send(&program_end_tag, 1, MPI_INT, index, program_end_tag, MPI_COMM_WORLD); + // printf("Process %i received termination message.\n", index); + } + +} + + +/** + * Sends worker load status and rank to master process. + */ +void worker_send_status(int process_rank, int process_load_status) { + int* send_array = calloc(2, sizeof(int)); + send_array[0] = process_rank; + send_array[1] = process_load_status; + MPI_Send(send_array, 2, MPI_INT, 0, 0, MPI_COMM_WORLD); + free(send_array); +} + + +/** + * Checks status of program runtime for worker to eventually get the okay to terminate. + * + * :return: 0 if received okay from main process to end. Otherwise 1 to continue working. + */ +int worker_check_status() { + int msg_status_flag = 0; + int temp = 0; + int main_process = 0; + int program_end_tag = 1; + + // Check for pending message from main process. + // Should only ocurr if main is telling child process that all work has completed. + MPI_Iprobe(main_process, program_end_tag, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // Message present. Officially grab message so master can unblock. + MPI_Recv(&temp, 1, MPI_INT, main_process, program_end_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + return 0; + } + return 1; +} + + +/** + * Attempts to receive work from given "possible donor" process (donor is determined by load balancing schema). + * + * Does so by sending message to "possible donor" process, to initiate request. + * Once it gets around to message checking, donor will then send back a message of: + * * -1 indicating donor does not have work to give. + * * Positive integer of work being passed back to begging process. + */ +int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { + int donor_response = 0; + + // Initialilze interaction by sending a "begging message" for work. + MPI_Send(&donor_response, 1, MPI_INT, possible_donor, 0, MPI_COMM_WORLD); + + // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. + MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Check what response was and handle accoringly. + if (donor_response > 0) { + // Donor sent workload to process. + thread_args_ptr->remaining_loads = donor_response; + return 1; + } else { + // Donor sent reject message. + return 0; + } +} + + +/** + * Checks if worker has any pending work requests from other processes. + * On any pending request, checks if request exists on all processes. + * + * To handle request, either splits current work load in half (asking process gets the smaller portion if odd number) + * or returns negative number if no work to send. + */ +void worker_handle_request(thread_struct* thread_args_ptr) { + int msg_status_flag = 0; + int work_send_value = 0; + int temp = 0; + int index; + + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + + // One or more messages present. Loop through and check for message from all processors. + for (index = 1; index < thread_args_ptr->total_processors; index++) { + msg_status_flag = 0; + MPI_Iprobe(index, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // Message present from processor at corresponding index. Handle. + + // First, properly "receive" pending message, so asking worker can unblock. + MPI_Recv(&temp, 1, MPI_INT, index, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Check if donating worker has work to split and send. + if (thread_args_ptr->remaining_loads > 1) { + // Worker has at least one load to split and send. Do so. + work_send_value = thread_args_ptr->remaining_loads / 2; + thread_args_ptr->remaining_loads = work_send_value + (thread_args_ptr->remaining_loads % 2); + MPI_Send(&work_send_value, 1, MPI_INT, index, 0, MPI_COMM_WORLD); + } else { + // Worker has exactly 0 or 1 loads. Not enough to send. Reject request instead. + work_send_value = -1; + MPI_Send(&work_send_value, 1, MPI_INT, index, 0, MPI_COMM_WORLD); + } + + } + } + + } +} + + /** * Logic to run "Asynchronous Round Robin" load scheme. */ @@ -81,17 +209,40 @@ void run_arr(thread_struct* thread_args_ptr) { * Main processor logic for "Asynchronous Round Robin" load scheme. */ void main_arr(thread_struct* thread_args_ptr) { - int index = 1; - int main_loop_bool = thread_args_ptr->total_loads; + int index; + int main_loop_bool = 1; int init_run = 1; + int msg_status_flag; + int msg_recieved = 0; + int* recv_array; - // Get messages from all worker processors. - while (main_loop_bool >= 0) { - - int* recv_array = calloc(3, sizeof(int)); + // Initialize console display. + for (index = 1; index < thread_args_ptr->total_processors; index++) { + recv_array = calloc(3, sizeof(int)); MPI_Recv(recv_array, 2, MPI_INT, index, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); + main_display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); free(recv_array); + } + + // Continually grab messages from all worker processors until done. + // For each loop, it checks all worker processes for a message about work being done. + // If no messages are found for two consecutive loops in a row, then assumes all work is complete and terminates. + index = 1; + init_run = 0; + while (main_loop_bool == 1) { + msg_status_flag = 0; + + // Check if message present from respective process. + MPI_Iprobe(index, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + msg_recieved = 1; + + // Message present. Retrieve and handle. + recv_array = calloc(3, sizeof(int)); + MPI_Recv(recv_array, 2, MPI_INT, index, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + main_display_status(thread_args_ptr, recv_array[0], recv_array[1], init_run); + free(recv_array); + } // Increment for next thread. index = (index + 1) % thread_args_ptr->total_processors; @@ -100,9 +251,25 @@ void main_arr(thread_struct* thread_args_ptr) { if (index == 0) { index += 1; - init_run = 0; - main_loop_bool -= 1; - sleep(1); + // Check if messages were recieved this run. + if (msg_recieved == 0) { + // No messages recieved. Run one last loop to double check. + msg_recieved = -1; + sleep(2); + } else if (msg_recieved == -1) { + // No messages recieved for two loops in a row. Terminate. + main_terminate_workers(thread_args_ptr); + main_loop_bool = 0; + } else { + // One or more messages recieved this loop. Reset value. + msg_recieved = 0; + } + + // Sleep number of seconds provided by user terminal. + // Meant to simulate "work", as it forces each worker process to block for this amount of time each load, + // before continuing on. In a real program, the worker would be doing something useful in this time, + // instead of just blocking. + sleep(thread_args_ptr->seconds_per_load); } } @@ -114,20 +281,38 @@ void main_arr(thread_struct* thread_args_ptr) { */ void worker_arr(thread_struct* thread_args_ptr) { int process_rank = thread_args_ptr->thread_num; - int load_num = thread_args_ptr->total_loads; + int arr_counter = thread_args_ptr->thread_num; + int main_loop_bool = 1; + int index; + int msg_status_flag; + + // Initialize process "1" to have full loads. All others start with none. + if (process_rank == 1) { + thread_args_ptr->remaining_loads = thread_args_ptr->total_loads; + } else { + thread_args_ptr->remaining_loads = 0; + } + + // Main loop bool. Continues until main process says all work is done. + while (main_loop_bool == 1) { + + // Loop all worker processes until load count message of "0" has been sent. + while (thread_args_ptr->remaining_loads >= 0) { - // Loop all worker processes until load count message of "0" has been sent. - while (load_num >= 0) { + // Send message for current load. + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + + // Check for message requests from other processes. + worker_handle_request(thread_args_ptr); + + // Decriment load number. + thread_args_ptr->remaining_loads -= 1; + } - // Sent message for current load. - int* send_array = calloc(2, sizeof(int)); - send_array[0] = process_rank; - send_array[1] = load_num; - MPI_Send(send_array, 2, MPI_INT, 0, 0, MPI_COMM_WORLD); - free(send_array); + // If we made it this far, then worker no longer has work. First beg fellow worker for work. - // Decriment load number. - load_num -= 1; + // Then check if program as a whole is done running. + main_loop_bool = worker_check_status(); } } diff --git a/src/simulate_loads.h b/src/simulate_loads.h index 8c663eb4345db732076c55308fe0749adfe892b8..3d08d5ef27ca4e18fbfe4422ac5d3b966b548aa0 100644 --- a/src/simulate_loads.h +++ b/src/simulate_loads.h @@ -13,7 +13,12 @@ // Function Prototypes. -void display_status(); +void main_display_status(); +void main_terminate_workers(); +void worker_send_status(); +int worker_check_status(); +int worker_send_request(); +void worker_handle_request(); void run_arr(); // Encapsulated logic for "Asynchronous Round Robin" load scheme. void main_arr(); void worker_arr(); diff --git a/src/structs.c b/src/structs.c index 78f79aba91110e827daf697b7b39ddce6f534813..c1a05520f01324fec34e6a9d5adb53e917143ad1 100644 --- a/src/structs.c +++ b/src/structs.c @@ -16,10 +16,10 @@ */ thread_struct* initialize_thread_struct( int total_processors, - int seconds_per_index, - int indexes_per_load, + int seconds_per_load, int total_loads, - int thread_num + int thread_num, + int remaining_loads ) { // Create struct. @@ -27,10 +27,10 @@ thread_struct* initialize_thread_struct( // Populate fields. new_struct->total_processors = total_processors; - new_struct->seconds_per_index = seconds_per_index; - new_struct->indexes_per_load = indexes_per_load; + new_struct->seconds_per_load = seconds_per_load; new_struct->total_loads = total_loads; new_struct->thread_num = thread_num; + new_struct->remaining_loads = 0; return new_struct; } diff --git a/src/structs.h b/src/structs.h index 61f60f166c2c2fc365624dedf5c1aa99aa8839af..0d71d006d4abd214160058ad4cb951a8bc0eb2ba 100644 --- a/src/structs.h +++ b/src/structs.h @@ -18,12 +18,12 @@ typedef struct { // "Global" thread values. int total_processors; - int seconds_per_index; - int indexes_per_load; + int seconds_per_load; int total_loads; // "Local" thread values. int thread_num; + int remaining_loads; } thread_struct;