diff --git a/src/load_balance_general.c b/src/load_balance_general.c index 1ab6b8b5c3f91024f2e58a783bead8f1369e7a98..d791ca9524b8a3622d2850975745850772ff0ec6 100644 --- a/src/load_balance_general.c +++ b/src/load_balance_general.c @@ -17,6 +17,21 @@ */ void main_display_status(thread_struct* thread_args_ptr, int process_num, int process_load_status, int init_run) { int index; + char* log_msg = calloc(256, sizeof(char)); + + log(""); + log("FUNCTION main_display_status()"); + sprintf(log_msg, " process_num: %i", process_num); + log(log_msg); + free(log_msg); log_msg = calloc(256, sizeof(char)); + sprintf(log_msg, " process_load_status: %i", process_load_status); + log(log_msg); + free(log_msg); log_msg = calloc(256, sizeof(char)); + sprintf(log_msg, " init_run: %i", init_run); + log(log_msg); + free(log_msg); + log(""); + fflush(stdout); // Travel to desired terminal row. Skip if first loop through thread displaying. if (init_run == 0) { @@ -49,6 +64,11 @@ void main_terminate_workers(thread_struct* thread_args_ptr) { int index; MPI_Request request_var; // What is this for?? Why is it required? + log(""); + log("FUNCTION main_terminate_workers()"); + log(""); + fflush(stdout); + for (index = 1; index < thread_args_ptr->total_processors; index++) { MPI_Isend(&program_end_tag, 1, MPI_INT, index, program_end_tag, MPI_COMM_WORLD, &request_var); // printf("Sending termination message to process %i.\n", index); @@ -64,9 +84,20 @@ void main_terminate_workers(thread_struct* thread_args_ptr) { */ void worker_send_status(int process_rank, int process_load_status) { int* send_array = calloc(2, sizeof(int)); + char* log_msg = calloc(256, sizeof(char)); + + log(""); + log("FUNCTION worker_send_status()"); + sprintf(log_msg, " process_load_status: %i", process_load_status); + log(log_msg); + free(log_msg); + log(""); + fflush(stdout); + send_array[0] = process_rank; send_array[1] = process_load_status; MPI_Send(send_array, 2, MPI_INT, 0, 0, MPI_COMM_WORLD); + free(send_array); } @@ -82,6 +113,11 @@ int worker_check_status() { int main_process = 0; int program_end_tag = 1; + log(""); + log("FUNCTION worker_check_status()"); + log(""); + fflush(stdout); + // Check for pending message from main process. // Should only ocurr if main is telling child process that all work has completed. MPI_Iprobe(main_process, program_end_tag, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); @@ -104,12 +140,32 @@ int worker_check_status() { */ int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { int donor_response = 0; + int msg_status_flag = 0; + char* log_msg = calloc(256, sizeof(char)); + + log(""); + log("FUNCTION worker_send_request()"); + sprintf(log_msg, " possible_donor: %i", possible_donor); + log(log_msg); + free(log_msg); + log(""); + fflush(stdout); + + // Validate donor value. + if (possible_donor >= thread_args_ptr->total_processors || possible_donor == 0) { + // Invalid. Exit with error. + exit(2); + } - // Initialilze interaction by sending a "begging message" for work. - MPI_Send(&donor_response, 1, MPI_INT, possible_donor, 0, MPI_COMM_WORLD); + // Ensure thread isn't currently sending a message to this thread. If so, abort. + MPI_Iprobe(possible_donor, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag != 1) { + // Initialilze interaction by sending a "begging message" for work. + MPI_Send(&donor_response, 1, MPI_INT, possible_donor, 0, MPI_COMM_WORLD); - // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. - MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. + MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } // Check what response was and handle accoringly. if (donor_response > 0) { @@ -136,6 +192,11 @@ void worker_handle_request(thread_struct* thread_args_ptr) { int temp = 0; int index; + log(""); + log("FUNCTION worker_handle_request()"); + log(""); + fflush(stdout); + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { diff --git a/src/load_balance_schemes.c b/src/load_balance_schemes.c index f4b0572d4605f64a15ffcd67712f172386f8d225..b19139626998856df4c3370efdd264a18a5a6529 100644 --- a/src/load_balance_schemes.c +++ b/src/load_balance_schemes.c @@ -40,12 +40,12 @@ void arr_run(thread_struct* thread_args_ptr) { // Wait for all processes to synchronize. // Also sleep for a second to allow time for console output to process. - MPI_Barrier(MPI_COMM_WORLD); - sleep(1); + // MPI_Barrier(MPI_COMM_WORLD); + // sleep(1); if (process_rank == 0) { + printf("Finished ARR load scheme.\n"); printf("\n"); - printf("ARR load scheme. Finished.\n"); } } @@ -61,6 +61,8 @@ void arr_main(thread_struct* thread_args_ptr) { int msg_recieved = 0; int* recv_array; + log("Starting arr_main."); + // Initialize console display. for (index = 1; index < thread_args_ptr->total_processors; index++) { recv_array = calloc(3, sizeof(int)); @@ -118,6 +120,8 @@ void arr_main(thread_struct* thread_args_ptr) { } } + + log("Finished ARR main."); } @@ -128,8 +132,8 @@ void arr_worker(thread_struct* thread_args_ptr) { int process_rank = thread_args_ptr->thread_num; int arr_counter = thread_args_ptr->thread_num; int main_loop_bool = 1; - int index; - int msg_status_flag; + + log("Starting GRR worker."); // Initialize process "1" to have full loads. All others start with none. if (process_rank == 1) { @@ -155,10 +159,24 @@ void arr_worker(thread_struct* thread_args_ptr) { } // If we made it this far, then worker no longer has work. First beg fellow worker for work. + arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; + while (arr_counter == thread_args_ptr->thread_num || arr_counter == 0) { + arr_counter = (arr_counter + 1) % thread_args_ptr->total_processors; + } + // worker_send_request(thread_args_ptr, arr_counter); + + // Reject any work requests from other processors. + worker_handle_request(thread_args_ptr); - // Then check if program as a whole is done running. + // Check if program as a whole is done running. main_loop_bool = worker_check_status(); + + if (main_loop_bool == 1) { + sleep(1); + } } + + log("Finished ARR worker."); } @@ -167,12 +185,8 @@ void arr_worker(thread_struct* thread_args_ptr) { */ void grr_run(int total_processors) { printf("Starting GRR load scheme."); - printf("\n"); - - // pthread_t* thread_pool = NULL; - printf("\n"); - printf("GRR load scheme. Finished."); + printf("Finished GRR load scheme."); } @@ -180,7 +194,9 @@ void grr_run(int total_processors) { * Main processor logic for "Global Round Robin" load scheme. */ void grr_main(thread_struct* thread_args_ptr) { + log("Starting GRR main."); + log("Finished GRR main."); } @@ -188,8 +204,9 @@ void grr_main(thread_struct* thread_args_ptr) { * Worker processor logic for "Global Round Robin" load scheme. */ void grr_worker(thread_struct* thread_args_ptr) { + log("Starting GRR worker."); - + log("Finished GRR worker."); } @@ -197,13 +214,9 @@ void grr_worker(thread_struct* thread_args_ptr) { * Entrypoint for logic to run "Random Poling" load scheme. */ void rp_run(int total_processors) { - printf("Starting RP load scheme."); - printf("\n"); - - // pthread_t* thread_pool = NULL; + log("Starting RP load scheme."); - printf("\n"); - printf("RP load scheme. Finished."); + log("Finished RP load scheme."); } @@ -211,7 +224,9 @@ void rp_run(int total_processors) { * Main processor logic for "Random Poling" load scheme. */ void rp_main(thread_struct* thread_args_ptr) { + log("Starting RP main."); + log("Finished RP main."); } @@ -219,7 +234,9 @@ void rp_main(thread_struct* thread_args_ptr) { * Worker processor logic for "Random Poling" load scheme. */ void rp_worker(thread_struct* thread_args_ptr) { + log("Starting RP worker."); + log("Finished RP worker."); } @@ -228,13 +245,9 @@ void rp_worker(thread_struct* thread_args_ptr) { * Entrypoint for logic to run "Nearest Neighbor" load scheme. */ void nn_run(int total_processors) { - printf("Starting NN load scheme."); - printf("\n"); + log("Starting NN load scheme."); - // pthread_t* thread_pool = NULL; - - printf("\n"); - printf("NN load scheme. Finished."); + log("Finished NN load scheme."); } @@ -242,7 +255,9 @@ void nn_run(int total_processors) { * Main processor logic for "Nearest Neighbor" load scheme. */ void nn_main(thread_struct* thread_args_ptr) { + log("Starting NN main."); + log("Finished NN main."); } @@ -251,6 +266,7 @@ void nn_main(thread_struct* thread_args_ptr) { * Worker processor logic for "Nearest Neighbor" load scheme. */ void nn_worker(thread_struct* thread_args_ptr) { + log("Starting NN worker."); - + log("Finished NN worker."); } diff --git a/src/main.c b/src/main.c index 3adfe93b2bee53b1e5d8c58b255e7ce04ccce7ec..2d33259ebaaee1087910177d9feaf820eeffec86 100644 --- a/src/main.c +++ b/src/main.c @@ -179,12 +179,12 @@ void run_program() { total_processors, seconds_per_load, total_loads, process_rank ); - // arr_run(thread_args_ptr); + arr_run(thread_args_ptr); free_thread_struct(thread_args_ptr); // Wait for all processes to synchronize. - MPI_Barrier(MPI_COMM_WORLD); + // MPI_Barrier(MPI_COMM_WORLD); if (process_rank == 0) { printf("\n"); diff --git a/src/structs.c b/src/structs.c index 1b9e26eb016f241e57a3de345b9f3b567839f27f..92950339d54aa5f371e48fdfad95ce2f4daaaeb0 100644 --- a/src/structs.c +++ b/src/structs.c @@ -24,6 +24,12 @@ void _log(char const* file, long line, char const* message) { // Log to file. fprintf(thread_args_ptr->log_file, "[%s:%ld] %s\n", file, line, message); + + // char* msg_buffer = calloc(1024, sizeof(char)); + // sprintf(msg_buffer, "[%s:%ld] %s\n", file, line, message); + // MPI_File* log_file = thread_args_ptr->log_file; + // MPI_File_write(&log_file, msg_buffer, 1024, MPI_CHAR, MPI_STATUS_IGNORE); + // free(msg_buffer); } @@ -58,6 +64,8 @@ thread_struct* initialize_thread_struct( char* log_location = calloc(256, sizeof(char)); sprintf(log_location, "logging/thread_%i.log", thread_num); FILE* file_ptr = fopen(log_location, "w"); + // MPI_File* file_ptr = malloc(sizeof(MPI_File)); + // MPI_File_open(MPI_COMM_SELF, log_location, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, file_ptr); // Check for errors. if (file_ptr == NULL) { @@ -83,6 +91,8 @@ void free_thread_struct(thread_struct* thread_args_ptr) { log("Destroying arg_struct."); fclose(thread_args_ptr->log_file); + // MPI_File_close(thread_args_ptr->log_file); + // free(thread_args_ptr->log_file); free(thread_args_ptr->log_file_location); free(thread_args_ptr); } diff --git a/src/structs.h b/src/structs.h index 408626da0aedefb69bdfc1bb2f2e647ce0ce2085..cf3b2d55cffc2f30ecb7cf4d7ff799b031f31c67 100644 --- a/src/structs.h +++ b/src/structs.h @@ -5,6 +5,7 @@ // System Import Headers. #include <ctype.h> +#include <mpi.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> @@ -28,6 +29,7 @@ typedef struct { int remaining_loads; char* log_file_location; FILE* log_file; + // MPI_File* log_file; } thread_struct;