diff --git a/documents/references.md b/documents/references.md index f8a407eb959c93b01dbaa9b55e1cac1e23edc0a6..8e423c7699a84e13addd66f21002f8df72365bc9 100644 --- a/documents/references.md +++ b/documents/references.md @@ -73,3 +73,7 @@ Most parallelization logic is from the book "Introduction to Parallel Programmin #### Display Percent Sign in Printf <https://stackoverflow.com/a/1860164> + + +#### Generating a Random Number +<https://stackoverflow.com/questions/822323/how-to-generate-a-random-int-in-c> diff --git a/src/load_balance_schemes.c b/src/load_balance_schemes.c index 55cdc77c1990f2864f75847ebdd540200cb13967..edac0e05a98cc190c94eb32903df8f2c30746e75 100644 --- a/src/load_balance_schemes.c +++ b/src/load_balance_schemes.c @@ -64,7 +64,7 @@ void arr_main(thread_struct* thread_args_ptr) { int msg_received = 0; int* recv_array; - log("Starting arr_main."); + log("Starting ARR main."); // Initialize console display. for (index = 1; index < thread_args_ptr->total_processors; index++) { @@ -240,7 +240,7 @@ void arr_worker(thread_struct* thread_args_ptr) { * * :param thread_args_ptr: Struct for processor values. */ -void grr_run(int total_processors) { +void grr_run(thread_struct* thread_args_ptr) { printf("Starting GRR load scheme."); printf("Finished GRR load scheme."); @@ -276,10 +276,38 @@ void grr_worker(thread_struct* thread_args_ptr) { * * :param thread_args_ptr: Struct for processor values. */ -void rp_run(int total_processors) { - log("Starting RP load scheme."); +void rp_run(thread_struct* thread_args_ptr) { + int process_rank = thread_args_ptr->thread_num; + + if (process_rank == 0) { + printf("Starting RP load scheme.\n"); + printf("\n"); + } + + // Wait for all processes to synchronize. + // Also sleep for a second to allow time for console output to process. + MPI_Barrier(MPI_COMM_WORLD); + sleep(1); - log("Finished RP load scheme."); + // Handle based on process number. + if (process_rank > 0) { + // Child process. Handles all the work. + rp_worker(thread_args_ptr); + } else { + // Main process. Handles communication to terminal. + rp_main(thread_args_ptr); + } + + // Wait for all processes to synchronize. + // Also sleep for a second to allow time for console output to process. + MPI_Barrier(MPI_COMM_WORLD); + sleep(1); + + if (process_rank == 0) { + printf("\n"); + printf("Finished RP load scheme.\n"); + printf("\n"); + } } @@ -289,8 +317,65 @@ void rp_run(int total_processors) { * :param thread_args_ptr: Struct for processor values. */ void rp_main(thread_struct* thread_args_ptr) { + int index; + int main_loop_bool = 3; + int msg_received = 0; + int* recv_array; + log("Starting RP main."); + // Initialize console display. + for (index = 1; index < thread_args_ptr->total_processors; index++) { + recv_array = calloc(3, sizeof(int)); + MPI_Recv(recv_array, 2, MPI_INT, index, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + main_display_status(thread_args_ptr, recv_array[0], recv_array[1], 1); + free(recv_array); + } + + // Continually grab messages from all worker processors until done. + // For each loop, it checks all worker processes for a message about work being done. + // If no messages are found for two consecutive loops in a row, then assumes all work is complete and terminates. + index = 1; + while (main_loop_bool > 0) { + + // Check status of worker process. + if (main_check_worker_status(thread_args_ptr, index)) { + msg_received = 1; + } + + // Increment for next thread. + index = (index + 1) % thread_args_ptr->total_processors; + + // Check if past last processor. + if (index == 0) { + index += 1; + + // Check if messages were recieved this run. + if (msg_received == 0) { + // No messages recieved. + main_loop_bool -= 1; + + // Send termination request if loop bool hit 0. + // Otherwise loop a few more times to give requests time to propagate. + if (main_loop_bool <= 0) { + main_terminate_workers(thread_args_ptr); + } + + } else { + // One or more messages recieved this loop. Reset main loop value. + main_loop_bool = 3; + } + + // Sleep number of seconds provided by user terminal. + // Meant to simulate "work", as it forces each worker process to block for this amount of time each load, + // before continuing on. In a real program, the worker would be doing something useful in this time, + // instead of just blocking. + sleep(thread_args_ptr->seconds_per_load); + msg_received = 0; + } + + } + log("Finished RP main."); } @@ -301,8 +386,112 @@ void rp_main(thread_struct* thread_args_ptr) { * :param thread_args_ptr: Struct for processor values. */ void rp_worker(thread_struct* thread_args_ptr) { + int process_rank = thread_args_ptr->thread_num; + int rp_counter; + int main_loop_bool = 1; + int msg_status_flag; + int request_flag = -1; + int work_value = 0; + int send_status = 1; + log("Starting RP worker."); + // Initialize random number generator. + srand(time(NULL)); + + // Initialize process "1" to have full loads. All others start with none. + if (process_rank == 1) { + thread_args_ptr->remaining_loads = thread_args_ptr->total_loads; + } else { + thread_args_ptr->remaining_loads = 0; + } + + // Initialize output by immediately sending status to main process. + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + + // Main loop bool. Continues until main process says all work is done. + while (main_loop_bool == 1) { + + // Loop all worker processes until load count message of "0" has been sent. + while (thread_args_ptr->remaining_loads > 0) { + + // Send message for current load. + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + + // Check for message requests from other processes. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + // Decriment load number. + thread_args_ptr->remaining_loads -= 1; + send_status = 1; + } + + // If we made it this far, then worker no longer has work. + + // Send notification message to main, to show value of 0. + if (send_status == 1) { + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + send_status = 0; + } + + // Check for pending work requests. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + + if (request_flag == -1) { + // Update personal RP counter. + rp_counter = -1; + while (rp_counter < 1 || rp_counter >= thread_args_ptr->total_processors) { + rp_counter = rand() % thread_args_ptr->total_processors; + } + + // Beg fellow worker for work. + worker_send_request(thread_args_ptr, rp_counter); + request_flag = rp_counter; + } else { + msg_status_flag = 0; + MPI_Iprobe(request_flag, tag_work_response, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + MPI_Recv(&work_value, 1, MPI_INT, request_flag, tag_work_response, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Update personal load value if non-reject message provided. + if (work_value > 0) { + thread_args_ptr->remaining_loads = work_value; + } + + // Reset flag to allow begging for work again in the future. + request_flag = -1; + } + } + + // Check if program as a whole is done running. + main_loop_bool = worker_check_status(); + + // Check again for pending work requests. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + // Check status of overall work. + if (main_loop_bool == 1) { + // Sleep to simulate "doing work". + sleep(1); + } + } + log("Finished RP worker."); } @@ -313,7 +502,7 @@ void rp_worker(thread_struct* thread_args_ptr) { * * :param thread_args_ptr: Struct for processor values. */ -void nn_run(int total_processors) { +void nn_run(thread_struct* thread_args_ptr) { log("Starting NN load scheme."); log("Finished NN load scheme."); diff --git a/src/main.c b/src/main.c index 82f621771bc115a3b4eed8faf6a674db0efab91b..b68ee207048b44284e828843d786ce4c7cbbe51b 100644 --- a/src/main.c +++ b/src/main.c @@ -199,7 +199,10 @@ void run_program() { total_processors, seconds_per_load, total_loads, process_rank ); - arr_run(thread_args_ptr); + // arr_run(thread_args_ptr); + // grr_run(thread_args_ptr); + rp_run(thread_args_ptr); + // nn_run(thread_args_ptr); free_thread_struct(thread_args_ptr);