diff --git a/makefile b/makefile index f9954558ada25cb91cc498533e162e843427cd02..de4f269c5fc1ddfbcb72879cbfb5337ca780fde3 100644 --- a/makefile +++ b/makefile @@ -28,7 +28,8 @@ all: $(TARGET) # Compile target if dependencies update. Then run. run: $(TARGET) - @$(EXE) -n $(CORES) --oversubscribe ./$(TARGET) $(ARGS) +# @$(EXE) -n $(CORES) --oversubscribe ./$(TARGET) $(ARGS) + @$(EXE) -n 4 --oversubscribe ./$(TARGET) $(ARGS) # Compile target if dependencies update. Then run. diff --git a/src/load_balance_general.c b/src/load_balance_general.c index c52514c571dda75c153cab21f9271637f7bc2823..0cdd9eca3eb24fe149b56bc7d42d08de5b4e0361 100644 --- a/src/load_balance_general.c +++ b/src/load_balance_general.c @@ -30,29 +30,28 @@ void main_display_status(thread_struct* thread_args_ptr, int process_num, int pr sprintf(log_msg, " init_run: %i", init_run); log(log_msg); free(log_msg); - log(""); fflush(stdout); // Travel to desired terminal row. Skip if first loop through thread displaying. - if (init_run == 0) { - for (index = 0; index < (thread_args_ptr->total_processors - process_num); index ++) { - terminal_line_up(); - } - } - // Clear row. - terminal_line_erase(); - terminal_line_start(); + // if (init_run == 0) { + // for (index = 0; index < (thread_args_ptr->total_processors - process_num); index ++) { + // terminal_line_up(); + // } + // } + // // Clear row. + // terminal_line_erase(); + // terminal_line_start(); // Display message. printf("Thread %5i: %5i / %5i Loads Remaining\n", process_num, process_load_status, thread_args_ptr->total_loads); // Travel back to original terminal location. Skip if first loop through thread displaying. - if (init_run == 0) { - for (index = 0; index < (thread_args_ptr->total_processors - process_num - 1); index ++) { - terminal_line_down(); - } - terminal_line_start(); - } + // if (init_run == 0) { + // for (index = 0; index < (thread_args_ptr->total_processors - process_num - 1); index ++) { + // terminal_line_down(); + // } + // terminal_line_start(); + // } } @@ -65,7 +64,6 @@ void main_terminate_workers(thread_struct* thread_args_ptr) { log(""); log("FUNCTION main_terminate_workers()"); - log(""); fflush(stdout); for (index = 1; index < thread_args_ptr->total_processors; index++) { @@ -90,7 +88,6 @@ void worker_send_status(int process_rank, int process_load_status) { sprintf(log_msg, " process_load_status: %i", process_load_status); log(log_msg); free(log_msg); - log(""); fflush(stdout); send_array[0] = process_rank; @@ -113,7 +110,6 @@ int worker_check_status() { log(""); log("FUNCTION worker_check_status()"); - log(""); fflush(stdout); // Check for pending message from main process. @@ -146,7 +142,6 @@ int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { sprintf(log_msg, " possible_donor: %i", possible_donor); log(log_msg); free(log_msg); - log(""); fflush(stdout); // Validate donor value. @@ -158,17 +153,32 @@ int worker_send_request(thread_struct* thread_args_ptr, int possible_donor) { // Ensure thread isn't currently sending a message to this thread. If so, abort. MPI_Iprobe(possible_donor, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag != 1) { - // Initialilze interaction by sending a "begging message" for work. - MPI_Ssend(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD); - // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. - MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + sleep(1); + + // Ensure that there is still possible work to do. + if (worker_check_status() == 1) { + + // Double check that donor does not have waiting request. To try to prevent race conditions. + // (I don't know how to properly "mutex lock" out entirely separate processes, when they can't share memory) + MPI_Iprobe(possible_donor, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); + if (msg_status_flag != 1) { + // Initialilze interaction by sending a "begging message" for work. + MPI_Ssend(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD); + + // If got this far, then we have unblocked. Means that "possible donor" process is sending a response. + MPI_Recv(&donor_response, 1, MPI_INT, possible_donor, tag_work_request, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + } } // Check what response was and handle accoringly. if (donor_response > 0) { // Donor sent workload to process. thread_args_ptr->remaining_loads = donor_response; + + // Work has been provided. Immediately attempt to update main processor of change. + worker_send_status(thread_args_ptr->thread_num, thread_args_ptr->remaining_loads); return 1; } else { // Donor sent reject message. @@ -192,7 +202,6 @@ void worker_handle_request(thread_struct* thread_args_ptr) { log(""); log("FUNCTION worker_handle_request()"); - log(""); fflush(stdout); MPI_Iprobe(MPI_ANY_SOURCE, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); @@ -200,6 +209,9 @@ void worker_handle_request(thread_struct* thread_args_ptr) { // One or more messages present. Loop through and check for message from all processors. for (index = 1; index < thread_args_ptr->total_processors; index++) { + char* msg = calloc(256, sizeof(char)); + sprintf(msg, " Checking for request from process %i", index); + log(msg); free(msg); msg_status_flag = 0; MPI_Iprobe(index, tag_work_request, MPI_COMM_WORLD, &msg_status_flag, MPI_STATUS_IGNORE); if (msg_status_flag == 1) { @@ -211,12 +223,26 @@ void worker_handle_request(thread_struct* thread_args_ptr) { // Check if donating worker has work to split and send. if (thread_args_ptr->remaining_loads > 1) { // Worker has at least one load to split and send. Do so. + char* msg = calloc(256, sizeof(char)); + sprintf(msg, " orig load: %i", thread_args_ptr->remaining_loads); + log(msg); free(msg); msg = calloc(256, sizeof(char)); work_send_value = thread_args_ptr->remaining_loads / 2; + sprintf(msg, " sending value: %i", work_send_value); + log(msg); free(msg); msg = calloc(256, sizeof(char)); thread_args_ptr->remaining_loads = work_send_value + (thread_args_ptr->remaining_loads % 2); + sprintf(msg, " new personal load value: %i", thread_args_ptr->remaining_loads); + log(msg); free(msg); MPI_Ssend(&work_send_value, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD); + + // Load has been split. Immediately attempt to update main processor of change. + worker_send_status(thread_args_ptr->thread_num, thread_args_ptr->remaining_loads); } else { // Worker has exactly 0 or 1 loads. Not enough to send. Reject request instead. work_send_value = -1; + char* msg = calloc(256, sizeof(char)); + sprintf(msg, " rejecting request with current load of %i", thread_args_ptr->remaining_loads); + log(msg); + free(msg); MPI_Ssend(&work_send_value, 1, MPI_INT, index, tag_work_request, MPI_COMM_WORLD); } diff --git a/src/structs.c b/src/structs.c index 02a7e3833395b3241536294326937b629fc9206c..958d591cd1ca5e3c035c790efa4733d06b53a4d7 100644 --- a/src/structs.c +++ b/src/structs.c @@ -23,7 +23,7 @@ thread_struct* thread_args_ptr; void _log(char const* file, long line, char const* message) { // Print to console (debugging only). - // printf(" P%3i [%s:%ld] %s\n", thread_args_ptr->thread_num, file, line, message); + printf(" P%3i [%s:%ld] %s\n", thread_args_ptr->thread_num, file, line, message); // Log to file. fprintf(thread_args_ptr->log_file, "[%s:%ld] %s\n", file, line, message);