diff --git a/src/load_balance_schemes.c b/src/load_balance_schemes.c index edac0e05a98cc190c94eb32903df8f2c30746e75..033b1c5a8950996fcf3b56d849a41cd667badbb0 100644 --- a/src/load_balance_schemes.c +++ b/src/load_balance_schemes.c @@ -503,9 +503,37 @@ void rp_worker(thread_struct* thread_args_ptr) { * :param thread_args_ptr: Struct for processor values. */ void nn_run(thread_struct* thread_args_ptr) { - log("Starting NN load scheme."); + int process_rank = thread_args_ptr->thread_num; + + if (process_rank == 0) { + printf("Starting NN load scheme.\n"); + printf("\n"); + } + + // Wait for all processes to synchronize. + // Also sleep for a second to allow time for console output to process. + MPI_Barrier(MPI_COMM_WORLD); + sleep(1); - log("Finished NN load scheme."); + // Handle based on process number. + if (process_rank > 0) { + // Child process. Handles all the work. + nn_worker(thread_args_ptr); + } else { + // Main process. Handles communication to terminal. + nn_main(thread_args_ptr); + } + + // Wait for all processes to synchronize. + // Also sleep for a second to allow time for console output to process. + MPI_Barrier(MPI_COMM_WORLD); + sleep(1); + + if (process_rank == 0) { + printf("\n"); + printf("Finished NN load scheme.\n"); + printf("\n"); + } } @@ -515,12 +543,102 @@ void nn_run(thread_struct* thread_args_ptr) { * :param thread_args_ptr: Struct for processor values. */ void nn_main(thread_struct* thread_args_ptr) { + int index; + int main_loop_bool = 3; + int msg_received = 0; + int* recv_array; + log("Starting NN main."); + // Initialize console display. + for (index = 1; index < thread_args_ptr->total_processors; index++) { + recv_array = calloc(3, sizeof(int)); + MPI_Recv(recv_array, 2, MPI_INT, index, tag_status_update, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + main_display_status(thread_args_ptr, recv_array[0], recv_array[1], 1); + free(recv_array); + } + + // Continually grab messages from all worker processors until done. + // For each loop, it checks all worker processes for a message about work being done. + // If no messages are found for two consecutive loops in a row, then assumes all work is complete and terminates. + index = 1; + while (main_loop_bool > 0) { + + // Check status of worker process. + if (main_check_worker_status(thread_args_ptr, index)) { + msg_received = 1; + } + + // Increment for next thread. + index = (index + 1) % thread_args_ptr->total_processors; + + // Check if past last processor. + if (index == 0) { + index += 1; + + // Check if messages were recieved this run. + if (msg_received == 0) { + // No messages recieved. + main_loop_bool -= 1; + + // Send termination request if loop bool hit 0. + // Otherwise loop a few more times to give requests time to propagate. + if (main_loop_bool <= 0) { + main_terminate_workers(thread_args_ptr); + } + + } else { + // One or more messages recieved this loop. Reset main loop value. + main_loop_bool = 3; + } + + // Sleep number of seconds provided by user terminal. + // Meant to simulate "work", as it forces each worker process to block for this amount of time each load, + // before continuing on. In a real program, the worker would be doing something useful in this time, + // instead of just blocking. + sleep(thread_args_ptr->seconds_per_load); + msg_received = 0; + } + + } + log("Finished NN main."); } +// // Determine "neighbor" range, based on thread number. +// // Range is [ process_id +- 10%_of_total_processes ]. If hits 0 or max, then loops around. +// // If only a few processes exist, then at minimum checks processor directly above and processor directly below. +// ten_percent = thread_args_ptr->total_processors / 10; +// if (ten_percent < 1) { +// ten_percent = 1; +// } + +// range_low = process_rank - ten_percent; +// range_high = process_rank + ten_percent; + +// if (range_low < 1) { +// range_low = range_low + thread_args_ptr->total_processors - 1; +// overflow_range = 1; +// } +// if (range_high >= thread_args_ptr->total_processors) { +// range_high = range_high - thread_args_ptr->total_processors + 1; +// overflow_range = 1; +// } + +// char* log_msg = calloc(256, sizeof(char)); +// sprintf(log_msg, "processor_num: %i", process_rank); +// log(log_msg); free(log_msg); +// log_msg = calloc(256, sizeof(char)); +// sprintf(log_msg, "ten_percent: %i", ten_percent); +// log(log_msg); free(log_msg); +// log_msg = calloc(256, sizeof(char)); +// sprintf(log_msg, "range_low: %i", range_low); +// log(log_msg); free(log_msg); +// log_msg = calloc(256, sizeof(char)); +// sprintf(log_msg, "range_high: %i", range_high); +// log(log_msg); free(log_msg); + /** * Worker processor logic for "Nearest Neighbor" load scheme. @@ -528,7 +646,124 @@ void nn_main(thread_struct* thread_args_ptr) { * :param thread_args_ptr: Struct for processor values. */ void nn_worker(thread_struct* thread_args_ptr) { + int process_rank = thread_args_ptr->thread_num; + int nn_counter; + int main_loop_bool = 1; + int msg_status_flag; + int request_flag = -1; + int work_value = 0; + int send_status = 1; + int ten_percent; + int nn_range; + log("Starting NN worker."); + // Initialize random number generator. + srand(time(NULL)); + + // Determine "neighbor" range, based on thread number. + // Range is [ process_id +- 10%_of_total_processes ]. If hits 0 or max, then loops around. + ten_percent = thread_args_ptr->total_processors / 10; + if (ten_percent < 1) { + ten_percent = 1; + } + nn_range = ten_percent * 2; + + // Initialize process "1" to have full loads. All others start with none. + if (process_rank == 1) { + thread_args_ptr->remaining_loads = thread_args_ptr->total_loads; + } else { + thread_args_ptr->remaining_loads = 0; + } + + // Initialize output by immediately sending status to main process. + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + + // Main loop bool. Continues until main process says all work is done. + while (main_loop_bool == 1) { + + // Loop all worker processes until load count message of "0" has been sent. + while (thread_args_ptr->remaining_loads > 0) { + + // Send message for current load. + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + + // Check for message requests from other processes. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + // Decriment load number. + thread_args_ptr->remaining_loads -= 1; + send_status = 1; + } + + // If we made it this far, then worker no longer has work. + + // Send notification message to main, to show value of 0. + if (send_status == 1) { + worker_send_status(process_rank, thread_args_ptr->remaining_loads); + send_status = 0; + } + + // Check for pending work requests. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + if (request_flag == -1) { + // Update personal NN counter. + nn_counter = thread_args_ptr->thread_num; + while (nn_counter == thread_args_ptr->thread_num) { + nn_counter = rand() % nn_range; + nn_counter = nn_counter + thread_args_ptr->thread_num + 1; + if (nn_counter > thread_args_ptr->total_processors - 1) { + nn_counter = (nn_counter % thread_args_ptr->total_processors) + 1; + } + } + + // Beg fellow worker for work. + worker_send_request(thread_args_ptr, nn_counter); + request_flag = nn_counter; + } else { + msg_status_flag = 0; + MPI_Iprobe(request_flag, tag_work_response, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + MPI_Recv(&work_value, 1, MPI_INT, request_flag, tag_work_response, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Update personal load value if non-reject message provided. + if (work_value > 0) { + thread_args_ptr->remaining_loads = work_value; + } + + // Reset flag to allow begging for work again in the future. + request_flag = -1; + } + } + + // Check if program as a whole is done running. + main_loop_bool = worker_check_status(); + + // Check again for pending work requests. + msg_status_flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag == 1) { + // At least one message pending. Handle before proceeding. + worker_handle_request(thread_args_ptr); + } + + // Check status of overall work. + if (main_loop_bool == 1) { + // Sleep to simulate "doing work". + sleep(1); + } + } + log("Finished NN worker."); } diff --git a/src/main.c b/src/main.c index b68ee207048b44284e828843d786ce4c7cbbe51b..3c4ff9bd77c3913c2ad95cb239fdeaed78e216f0 100644 --- a/src/main.c +++ b/src/main.c @@ -201,8 +201,8 @@ void run_program() { // arr_run(thread_args_ptr); // grr_run(thread_args_ptr); - rp_run(thread_args_ptr); - // nn_run(thread_args_ptr); + // rp_run(thread_args_ptr); + nn_run(thread_args_ptr); free_thread_struct(thread_args_ptr);