/** * Brandon Rodriguez * CS 3240 * 11-15-17 * a4 (Assignment 5) */ /** * Description: * Implement threads to solve a problem. * * In this "real world scenario" problem, originally a master file had a bunch of data. * Somehow, someone "messed up real good" and now the master file no longer exists. * Furthermore, all the data from the master file technically exists, but it's * fragmented into smaller subfiles, and out of order. * * This program needs to be able to correct this issue, rebuilding the master file * and sorting it as well. And it needs to use threads so that it doesn't take * forever to accomplish (assuming the master file was originally really really big). */ /** * Known Issues: * This appears to have a race condition, somehow. * Unfortunately, I don't seem to understand threading enough to troubleshoot it. * * But sometimes when running, all memory is cleared according to valgrind and * everything is fine. * Other times, valgrind reports errors on program end. There will arbitrarily be * exactly "1,614 bytes in 4 blocks" that are lost. This appears to generally be * consistent, regardless of how many files are in the selected folder (IE, how * many threads are created) or what the contents of said files are. * The actual error descrition seems to point to the imported pthread code, * which means the issue is something to do with my implementation of threading. */ #define _BSD_SOURCE // Import headers. #include <ctype.h> #include <dirent.h> #include <fcntl.h> #include <pthread.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/stat.h> #include "apue.h" #include "HelperHeader.h" // Define Vars. #define BUFFER_SIZE 4096 // Variables. typedef struct { char* user_name; char* password; char* blood_type; char* domain_name; int db_index; } data_struct; typedef struct { int array_count; data_struct* data_array; } thread_return_struct; int dir_file_counter; char* absolute_path; pthread_t* thread_array; thread_return_struct* full_dataset; // Method Declaration. int change_directory(); // Safely changes directory to inicated folder. void open_folder(); // Opens indicated folder. void* thread_read_file(); // Reads given file and reorganizes data. data_struct* sort_data_array(); // Begins sorting of read-in file. data_struct* merge_sort(); // First half of merge sort. data_struct* merge_array(); // Second half of merge sort. void merge_to_file(); // Merges all thread data into one to write to file. thread_return_struct* merge_to_master(); // Merges thread data together. void write_to_file(); // Writes a single data line to file. /** * Program's main. * Initializes and runs program. */ int main(int argc, char* argv[]) { int index; int return_int; int struct_number; thread_return_struct* return_struct; thread_return_struct** thread_arrays; // Check for valid args. if (argc < 2) { err_msg("Must enter a directory path."); return -1; } else if (argc > 2) { err_msg("Must enter exacly one directory path."); return -1; } else { return_int = change_directory(argv[1]); if (return_int == 0) { // Get absolute path. absolute_path = calloc(BUFFER_SIZE, sizeof(char*)); if (getcwd(absolute_path, BUFFER_SIZE) == NULL) { err_sys("Failed to get absolute path."); } // Open folder and create appropriate number of threads. open_folder(); thread_arrays = calloc(dir_file_counter, sizeof(*return_struct)); // Iterate through all threads and grab returned value. for (index = 0; index < dir_file_counter; index++) { pthread_join(thread_array[index], (void**) &return_struct); // thread_arrays[index] = return_struct; thread_arrays[index] = calloc(1, sizeof(thread_return_struct)); thread_arrays[index]->array_count = return_struct->array_count; thread_arrays[index]->data_array = return_struct->data_array; free(return_struct); } // Take thread data and merge + write to file. merge_to_file(thread_arrays); printf("\nAll operations complete.\n"); printf("Sorted data can found at %s/sorted.yay\n", absolute_path); // Merging done. Free memory. for (index = 0; index < dir_file_counter; index++) { for (struct_number = 0; struct_number < thread_arrays[index]->array_count; struct_number++) { free(thread_arrays[index]->data_array[struct_number].user_name); free(thread_arrays[index]->data_array[struct_number].password); free(thread_arrays[index]->data_array[struct_number].blood_type); free(thread_arrays[index]->data_array[struct_number].domain_name); } free(thread_arrays[index]->data_array); free(thread_arrays[index]); } free(thread_arrays); free(absolute_path); free(thread_array); free(full_dataset->data_array); free(full_dataset); } } return 0; } /** * Safely changes directory by first checking path value and permissions. * * Returns 0 on success or -1 on failure. */ int change_directory(char* folder_location) { int return_int; struct stat stat_buffer; return_int =lstat(folder_location, &stat_buffer); if (return_int <0) { err_sys("Failed to stat file with err %d", return_int); return -1; } // First, ensure that it is, infact, a directory. if (S_ISDIR(stat_buffer.st_mode)) { // Next, check permissions. if (access(folder_location, X_OK) == 0) { // Change into directory. return_int = chdir(folder_location); if (return_int < 0) { err_sys("Failed to change directory with err %d", return_int); return -1; } } else { // No execute permission. err_msg("No execute permission. Cannot change into directory.\n"); return -1; } } else { // Not a dir. err_msg("Provided path is not a directory.\n"); return -1; } return 0; } /** * Opens indicated folder and hands files off to threads. */ void open_folder() { int index; int return_int; char* temp_string; char* save_file_path; struct dirent* dir_struct; struct stat stat_buffer; DIR* dir_pointer; pthread_attr_t attr; size_t stacksize; printf("Locating files to merge...\n"); // Save path of file to save to. save_file_path = copy_string_with_buffer(absolute_path, BUFFER_SIZE); strcat(save_file_path, "/"); strcat(save_file_path, "sorted.yay"); // Iterate through directory first time, to count number of files to open/threads to make. dir_file_counter = 0; dir_pointer = opendir(absolute_path); // Check that directory has read permissions. if (access(absolute_path, R_OK) == 0) { // Loop until no more files in directory. while (dir_pointer != NULL) { if ((dir_struct = readdir(dir_pointer)) != NULL) { // New file found. Check if standard file type. return_int = lstat(dir_struct->d_name, &stat_buffer); if (return_int < 0) { err_sys("Failed to stat file with err %d", return_int); } if (S_ISREG(stat_buffer.st_mode)) { // Get absolute file path. temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE); strcat(temp_string, "/"); strcat(temp_string, dir_struct->d_name); // Ensure that file is not save file. if (strcmp(temp_string, save_file_path) != 0) { printf("Path: %s\n", temp_string); dir_file_counter++; } free(temp_string); } } else { // End of files in directory. Closing stream. return_int = closedir(dir_pointer); if (return_int < 0) { err_msg("Failed to properly close directory."); } dir_pointer = NULL; } } } else { err_msg("Directory does not have read access. Cannot view files."); } // Prepare to set up threads. thread_array = calloc(dir_file_counter, sizeof(pthread_t)); index = 0; pthread_attr_init(&attr); pthread_attr_getstacksize(&attr, &stacksize); if (stacksize < 8388608) { stacksize = 8388608; } pthread_attr_setstacksize(&attr, (stacksize * 2)); printf("Reading in files and creating sorting threads...\n"); // Iterate through directory again. This time, actually create threads and hand off files. dir_pointer = opendir(absolute_path); // Check that directory has read permissions. if (access(absolute_path, R_OK) == 0) { // Loop until no more files in directory. while (dir_pointer != NULL) { if ((dir_struct = readdir(dir_pointer)) != NULL) { // New file found. Check if standard file type. return_int = lstat(dir_struct->d_name, &stat_buffer); if (return_int < 0) { err_sys("Failed to stat file with err %d", return_int); } if (S_ISREG(stat_buffer.st_mode)) { // Get absolute file path. temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE); strcat(temp_string, "/"); strcat(temp_string, dir_struct->d_name); // Actually create threads. if (strcmp(temp_string, save_file_path) != 0) { // printf("Path: %s\n", temp_string); pthread_create(&thread_array[index], &attr, thread_read_file, (void*) copy_string(temp_string)); index++; } free(temp_string); } } else { // End of files in directory. Closing stream. return_int = closedir(dir_pointer); if (return_int < 0) { err_msg("Failed to properly close directory."); } dir_pointer = NULL; } } } else { err_msg("Directory does not have read access. Cannot view files."); } free(save_file_path); } /** * Uses thread to read file value. */ void* thread_read_file(void* file_location) { int result_int; int struct_max; int struct_number; char* line_buffer = calloc(BUFFER_SIZE, sizeof(char*)); char* token; char* save_pointer; FILE* read_file; data_struct* data_array; thread_return_struct* return_struct; struct_max = 1024; struct_number = -1; data_array = calloc(struct_max, sizeof(data_struct)); // Open file. read_file = fopen(file_location, "r"); if (read_file == NULL) { err_sys("Failed to open file at %s", file_location); } free(file_location); // Read file line by line and assign values to struct. while(fgets(line_buffer, BUFFER_SIZE, read_file) != NULL) { line_buffer = remove_newline(line_buffer); struct_number++; // Check that there is still more space in data_array. if (struct_number >= struct_max) { struct_max = struct_max * 2; data_array = realloc(data_array, struct_max); } // Actually parse line to array. // First get username. token = strtok_r(line_buffer, ",", &save_pointer); data_array[struct_number].user_name = copy_string(token); // Get password. token = strtok_r(NULL, ",", &save_pointer); data_array[struct_number].password = copy_string(token); // Get blood type. token = strtok_r(NULL, ",", &save_pointer); data_array[struct_number].blood_type = copy_string(token); // Get domain name. token = strtok_r(NULL, ",", &save_pointer); data_array[struct_number].domain_name = copy_string(token); // Get db index. token = strtok_r(NULL, ",", &save_pointer); data_array[struct_number].db_index = atoi(token); } data_array = merge_sort(data_array, 0, struct_number, (struct_number) + 1); return_struct = calloc(1, sizeof(thread_return_struct)); return_struct->array_count = (struct_number + 1); return_struct->data_array = data_array; // Close file and exit out. result_int = fclose(read_file); if (result_int != 0) { err_msg("Failed to close file properly."); } free(line_buffer); pthread_exit((void*) return_struct); } /** * Begins sorting of single read-in file's data. * * Returns struct array of sorted data. */ data_struct* sort_data_array(data_struct* data_array, int array_size) { data_struct* temp_array = calloc(array_size, sizeof(data_struct)); temp_array = merge_sort(data_array, 0, array_size); free(data_array); return temp_array; } /** * First half of data array merge sort. * * Returns struct array of sorted data. */ data_struct* merge_sort(data_struct* data_array, int low_int, int high_int, int array_size) { int mid_int; // Base case. If equal or less, back out of recursion. if (high_int <= low_int) { return data_array; } // Recursively divide and merge. mid_int = (low_int + high_int) / 2; data_array = merge_sort(data_array, low_int, mid_int, array_size); data_array = merge_sort(data_array, (mid_int + 1), high_int, array_size); return merge_array(data_array, low_int, mid_int, high_int, array_size); } /** * Second half of data array merge sort. * * Returns sorted section of data array. */ data_struct* merge_array(data_struct* data_array, int low_int, int mid_int, int high_int, int array_size) { int index; int left_int = low_int; int right_int = mid_int + 1; data_struct* temp_array = calloc(array_size, sizeof(data_struct)); // Duplicate array. for (index = low_int; index < array_size; index++) { temp_array[index] = data_array[index]; } // Loop until all index have been iterated. for (index = low_int; index < (high_int + 1); index++) { // Check if left side is done but right is not. if (left_int > mid_int) { data_array[index] = temp_array[right_int]; right_int++; } else { // Check if right side is done but left is not. if (right_int > high_int) { data_array[index] = temp_array[left_int]; left_int++; } else { // Both sides not done. Compare both values. if (temp_array[left_int].db_index < temp_array[right_int].db_index) { // Left less than right. data_array[index] = temp_array[left_int]; left_int++; } else { // Right less than left. data_array[index] = temp_array[right_int]; right_int++; } } } } free(temp_array); return data_array; } /** * Merges all thread data into one struct. * Then saves this by writing to file. */ void merge_to_file(thread_return_struct** thread_arrays) { int index; char* temp_string; FILE* reset_file; thread_return_struct* temp_dataset; printf("Merging array data together...\n"); // Get file path of file to save. temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE); strcat(temp_string, "/"); strcat(temp_string, "sorted.yay"); // Creates new file if does not exist. If already present, then clears file of all data. reset_file = fopen(temp_string, "w"); if (reset_file == NULL) { err_sys("Failed to open file at %s", temp_string); } fclose(reset_file); free(temp_string); full_dataset = calloc(1, sizeof(thread_return_struct)); full_dataset->data_array = calloc(thread_arrays[0]->array_count, sizeof(thread_return_struct)); // For each dataset from directory, merge into full_dataset struct. for (index = 0; index < dir_file_counter; index++) { temp_dataset = merge_to_master(thread_arrays[index]); free(full_dataset->data_array); full_dataset->array_count = temp_dataset->array_count; full_dataset->data_array = temp_dataset->data_array; free(temp_dataset); } printf("Saving sorted data to file...\n"); // For each record stored in the full_dataset struct, write to file. for (index = 0; index < full_dataset->array_count; index++) { write_to_file(full_dataset->data_array[index]); } } /** * Merge given subset of data into into the larger datset struct. * Keeps data in sorted order. * * Returns new full set of sorted data. */ thread_return_struct* merge_to_master(thread_return_struct* merge_dataset) { int index; int master_int = 0; int child_int = 0; int new_dataset_size; thread_return_struct* temp_dataset; // Calculate size of new dataset. new_dataset_size = (full_dataset->array_count + merge_dataset->array_count); // Create duplicate temp array.. temp_dataset = calloc(new_dataset_size, sizeof(thread_return_struct)); temp_dataset->data_array = calloc(new_dataset_size, sizeof(data_struct)); // Actually merge given given struct into full_dataset struct. for (index = 0; index < new_dataset_size; index++) { // Check if master dataset is done. if (master_int >= full_dataset->array_count) { temp_dataset->data_array[index] = merge_dataset->data_array[child_int]; child_int++; } else { // Check if merge dataset is done. if (child_int >= merge_dataset->array_count) { temp_dataset->data_array[index] = full_dataset->data_array[master_int]; master_int++; } else { // Both not done. Compare values. if (full_dataset->data_array[master_int].db_index < merge_dataset->data_array[child_int].db_index) { temp_dataset->data_array[index] = full_dataset->data_array[master_int]; master_int++; } else { temp_dataset->data_array[index] = merge_dataset->data_array[child_int]; child_int++; } } } } temp_dataset->array_count = new_dataset_size; return temp_dataset; } /** * Writes a single data line to file. */ void write_to_file(data_struct data_array) { int index; int write_buffer_size; int newline_present_bool; char* temp_string = calloc(20, sizeof(char*)); FILE* write_file; // Determine size of write buffer to create. snprintf(temp_string, 10, "%d", data_array.db_index); write_buffer_size = (strlen(data_array.user_name) + 1); write_buffer_size += (strlen(data_array.password) + 1); write_buffer_size += (strlen(data_array.blood_type) + 1); write_buffer_size += (strlen(data_array.domain_name) + 1); write_buffer_size += (strlen(temp_string)); write_buffer_size += 2; // Create and populate write buffer. char* write_buffer = calloc(write_buffer_size, sizeof(char*)); write_buffer = strcat(write_buffer, data_array.user_name); write_buffer = strcat(write_buffer, ","); write_buffer = strcat(write_buffer, data_array.password); write_buffer = strcat(write_buffer, ","); write_buffer = strcat(write_buffer, data_array.blood_type); write_buffer = strcat(write_buffer, ","); write_buffer = strcat(write_buffer, data_array.domain_name); write_buffer = strcat(write_buffer, ","); write_buffer = strcat(write_buffer, temp_string); free(temp_string); // Ensure file has newline char. index = 0; newline_present_bool = 0; while ((newline_present_bool == 0) && index < write_buffer_size) { if (write_buffer[index] == '\0') { if (write_buffer[index - 1] != '\n') { write_buffer[index] = '\n'; } newline_present_bool = 1; } index++; } // Save write buffer to file. temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE); strcat(temp_string, "/"); strcat(temp_string, "sorted.yay"); write_file = fopen(temp_string, "a"); if (write_file == NULL) { err_sys("Failed to open file at %s", temp_string); } fputs(write_buffer, write_file); fclose(write_file); free(write_buffer); free(temp_string); }