Skip to content
Snippets Groups Projects
Main.c 20.70 KiB
/**
 * Brandon Rodriguez
 * CS 3240
 * 11-15-17
 * a4 (Assignment 5)
 */


/**
 * Description:
 *  Implement threads to solve a problem.
 *
 *  In this "real world scenario" problem, originally a master file had a bunch of data.
 *  Somehow, someone "messed up real good" and now the master file no longer exists.
 *  Furthermore, all the data from the master file technically exists, but it's
 *  fragmented into smaller subfiles, and out of order.
 *
 *  This program needs to be able to correct this issue, rebuilding the master file
 *  and sorting it as well. And it needs to use threads so that it doesn't take
 *  forever to accomplish (assuming the master file was originally really really big).
 */


/**
 * Known Issues:
 *  This appears to have a race condition, somehow.
 *  Unfortunately, I don't seem to understand threading enough to troubleshoot it.
 *
 *  But sometimes when running, all memory is cleared according to valgrind and
 *  everything is fine.
 *  Other times, valgrind reports errors on program end. There will arbitrarily be
 *  exactly "1,614 bytes in 4 blocks" that are lost. This appears to generally be
 *  consistent, regardless of how many files are in the selected folder (IE, how
 *  many threads are created) or what the contents of said files are.
 *  The actual error descrition seems to point to the imported pthread code,
 *  which means the issue is something to do with my implementation of threading.
 */


#define _BSD_SOURCE

// Import headers.
#include <ctype.h>
#include <dirent.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
#include "apue.h"
#include "HelperHeader.h"


// Define Vars.
#define BUFFER_SIZE 4096


// Variables.
typedef struct {
    char* user_name;
    char* password;
    char* blood_type;
    char* domain_name;
    int db_index;
} data_struct;
typedef struct {
    int array_count;
    data_struct* data_array;
} thread_return_struct;

int dir_file_counter;
char* absolute_path;
pthread_t* thread_array;
thread_return_struct* full_dataset;


// Method Declaration.
int change_directory();         // Safely changes directory to inicated folder.
void open_folder();             // Opens indicated folder.
void* thread_read_file();       // Reads given file and reorganizes data.
data_struct* sort_data_array(); // Begins sorting of read-in file.
data_struct* merge_sort();      // First half of merge sort.
data_struct* merge_array();     // Second half of merge sort.
void merge_to_file();           // Merges all thread data into one to write to file.
thread_return_struct* merge_to_master();    // Merges thread data together.
void write_to_file();           // Writes a single data line to file.


/**
 * Program's main.
 * Initializes and runs program.
 */
int main(int argc, char* argv[]) {
    int index;
    int return_int;
    int struct_number;
    thread_return_struct* return_struct;
    thread_return_struct** thread_arrays;

    // Check for valid args.
    if (argc < 2) {
        err_msg("Must enter a directory path.");
        return -1;
    } else if (argc > 2) {
        err_msg("Must enter exacly one directory path.");
        return -1;
    } else {

        return_int = change_directory(argv[1]);
        if (return_int == 0) {
            // Get absolute path.
            absolute_path = calloc(BUFFER_SIZE, sizeof(char*));
            if (getcwd(absolute_path, BUFFER_SIZE) == NULL) {
                err_sys("Failed to get absolute path.");
            }

            // Open folder and create appropriate number of threads.
            open_folder();

            thread_arrays = calloc(dir_file_counter, sizeof(*return_struct));

            // Iterate through all threads and grab returned value.
            for (index = 0; index < dir_file_counter; index++) {
                pthread_join(thread_array[index], (void**) &return_struct);

                // thread_arrays[index] = return_struct;
                thread_arrays[index] = calloc(1, sizeof(thread_return_struct));
                thread_arrays[index]->array_count = return_struct->array_count;
                thread_arrays[index]->data_array = return_struct->data_array;
                free(return_struct);
            }

            // Take thread data and merge + write to file.
            merge_to_file(thread_arrays);

            printf("\nAll operations complete.\n");
            printf("Sorted data can found at %s/sorted.yay\n", absolute_path);

            // Merging done. Free memory.
            for (index = 0; index < dir_file_counter; index++) {
                for (struct_number = 0; struct_number < thread_arrays[index]->array_count; struct_number++) {
                    free(thread_arrays[index]->data_array[struct_number].user_name);
                    free(thread_arrays[index]->data_array[struct_number].password);
                    free(thread_arrays[index]->data_array[struct_number].blood_type);
                    free(thread_arrays[index]->data_array[struct_number].domain_name);
                }
                free(thread_arrays[index]->data_array);
                free(thread_arrays[index]);
            }
            free(thread_arrays);
            free(absolute_path);
            free(thread_array);
            free(full_dataset->data_array);
            free(full_dataset);
        }
    }

    return 0;
}


/**
 * Safely changes directory by first checking path value and permissions.
 *
 * Returns 0 on success or -1 on failure.
 */
int change_directory(char* folder_location) {
    int return_int;
    struct stat stat_buffer;

    return_int =lstat(folder_location, &stat_buffer);
    if (return_int <0) {
        err_sys("Failed to stat file with err %d", return_int);
        return -1;
    }

    // First, ensure that it is, infact, a directory.
    if (S_ISDIR(stat_buffer.st_mode)) {
        // Next, check permissions.
        if (access(folder_location, X_OK) == 0) {
            // Change into directory.
            return_int = chdir(folder_location);
            if (return_int < 0) {
                err_sys("Failed to change directory with err %d", return_int);
                return -1;
            }
        } else { // No execute permission.
            err_msg("No execute permission. Cannot change into directory.\n");
            return -1;
        }
    } else { // Not a dir.
        err_msg("Provided path is not a directory.\n");
        return -1;
    }

    return 0;
}


/**
 * Opens indicated folder and hands files off to threads.
 */
void open_folder() {
    int index;
    int return_int;
    char* temp_string;
    char* save_file_path;
    struct dirent* dir_struct;
    struct stat stat_buffer;
    DIR* dir_pointer;
    pthread_attr_t attr;
    size_t stacksize;

    printf("Locating files to merge...\n");

    // Save path of file to save to.
    save_file_path = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
    strcat(save_file_path, "/");
    strcat(save_file_path, "sorted.yay");

    // Iterate through directory first time, to count number of files to open/threads to make.
    dir_file_counter = 0;
    dir_pointer = opendir(absolute_path);

    // Check that directory has read permissions.
    if (access(absolute_path, R_OK) == 0) {

        // Loop until no more files in directory.
        while (dir_pointer != NULL) {
            if ((dir_struct = readdir(dir_pointer)) != NULL) {

                // New file found. Check if standard file type.
                return_int = lstat(dir_struct->d_name, &stat_buffer);
                if (return_int < 0) {
                    err_sys("Failed to stat file with err %d", return_int);
                }
                if (S_ISREG(stat_buffer.st_mode)) {

                    // Get absolute file path.
                    temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
                    strcat(temp_string, "/");
                    strcat(temp_string, dir_struct->d_name);

                    // Ensure that file is not save file.
                    if (strcmp(temp_string, save_file_path) != 0) {
                        printf("Path: %s\n", temp_string);
                        dir_file_counter++;
                    }

                    free(temp_string);

                }
            } else {
                // End of files in directory. Closing stream.
                return_int = closedir(dir_pointer);
                if (return_int < 0) {
                    err_msg("Failed to properly close directory.");
                }
                dir_pointer = NULL;
            }
        }
    } else {
        err_msg("Directory does not have read access. Cannot view files.");
    }

    // Prepare to set up threads.
    thread_array = calloc(dir_file_counter, sizeof(pthread_t));
    index = 0;
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr, &stacksize);
    if (stacksize < 8388608) {
        stacksize = 8388608;
    }
    pthread_attr_setstacksize(&attr, (stacksize * 2));

    printf("Reading in files and creating sorting threads...\n");

    // Iterate through directory again. This time, actually create threads and hand off files.
    dir_pointer = opendir(absolute_path);

    // Check that directory has read permissions.
    if (access(absolute_path, R_OK) == 0) {

        // Loop until no more files in directory.
        while (dir_pointer != NULL) {
            if ((dir_struct = readdir(dir_pointer)) != NULL) {

                // New file found. Check if standard file type.
                return_int = lstat(dir_struct->d_name, &stat_buffer);
                if (return_int < 0) {
                    err_sys("Failed to stat file with err %d", return_int);
                }
                if (S_ISREG(stat_buffer.st_mode)) {

                    // Get absolute file path.
                    temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
                    strcat(temp_string, "/");
                    strcat(temp_string, dir_struct->d_name);

                    // Actually create threads.
                    if (strcmp(temp_string, save_file_path) != 0) {
                        // printf("Path: %s\n", temp_string);
                        pthread_create(&thread_array[index], &attr, thread_read_file, (void*) copy_string(temp_string));
                        index++;
                    }
                    free(temp_string);
                }
            } else {
                // End of files in directory. Closing stream.
                return_int = closedir(dir_pointer);
                if (return_int < 0) {
                    err_msg("Failed to properly close directory.");
                }
                dir_pointer = NULL;
            }
        }
    } else {
        err_msg("Directory does not have read access. Cannot view files.");
    }
    free(save_file_path);
}


/**
 * Uses thread to read file value.
 */
void* thread_read_file(void* file_location) {
    int result_int;
    int struct_max;
    int struct_number;
    char* line_buffer = calloc(BUFFER_SIZE, sizeof(char*));
    char* token;
    char* save_pointer;
    FILE* read_file;
    data_struct* data_array;
    thread_return_struct* return_struct;

    struct_max = 1024;
    struct_number = -1;
    data_array = calloc(struct_max, sizeof(data_struct));

    // Open file.
    read_file = fopen(file_location, "r");
    if (read_file == NULL) {
        err_sys("Failed to open file at %s", file_location);
    }
    free(file_location);

    // Read file line by line and assign values to struct.
    while(fgets(line_buffer, BUFFER_SIZE, read_file) != NULL) {
        line_buffer = remove_newline(line_buffer);
        struct_number++;

        // Check that there is still more space in data_array.
        if (struct_number >= struct_max) {
            struct_max = struct_max * 2;
            data_array = realloc(data_array, struct_max);
        }

        // Actually parse line to array.
        // First get username.
        token = strtok_r(line_buffer, ",", &save_pointer);
        data_array[struct_number].user_name = copy_string(token);

        // Get password.
        token = strtok_r(NULL, ",", &save_pointer);
        data_array[struct_number].password = copy_string(token);

        // Get blood type.
        token = strtok_r(NULL, ",", &save_pointer);
        data_array[struct_number].blood_type = copy_string(token);

        // Get domain name.
        token = strtok_r(NULL, ",", &save_pointer);
        data_array[struct_number].domain_name = copy_string(token);

        // Get db index.
        token = strtok_r(NULL, ",", &save_pointer);
        data_array[struct_number].db_index = atoi(token);
    }

    data_array = merge_sort(data_array, 0, struct_number, (struct_number) + 1);

    return_struct = calloc(1, sizeof(thread_return_struct));
    return_struct->array_count = (struct_number + 1);
    return_struct->data_array = data_array;

    // Close file and exit out.
    result_int = fclose(read_file);
    if (result_int != 0) {
        err_msg("Failed to close file properly.");
    }

    free(line_buffer);
    pthread_exit((void*) return_struct);
}


/**
 * Begins sorting of single read-in file's data.
 *
 * Returns struct array of sorted data.
 */
data_struct* sort_data_array(data_struct* data_array, int array_size) {
    data_struct* temp_array = calloc(array_size, sizeof(data_struct));

    temp_array = merge_sort(data_array, 0, array_size);

    free(data_array);
    return temp_array;
}


/**
 * First half of data array merge sort.
 *
 * Returns struct array of sorted data.
 */
data_struct* merge_sort(data_struct* data_array, int low_int, int high_int, int array_size) {
    int mid_int;

    // Base case. If equal or less, back out of recursion.
    if (high_int <= low_int) {
        return data_array;
    }

    // Recursively divide and merge.
    mid_int = (low_int + high_int) / 2;
    data_array = merge_sort(data_array, low_int, mid_int, array_size);
    data_array = merge_sort(data_array, (mid_int + 1), high_int, array_size);
    return merge_array(data_array, low_int, mid_int, high_int, array_size);
}


/**
 * Second half of data array merge sort.
 *
 * Returns sorted section of data array.
 */
data_struct* merge_array(data_struct* data_array, int low_int, int mid_int, int high_int, int array_size) {
    int index;
    int left_int = low_int;
    int right_int = mid_int + 1;
    data_struct* temp_array = calloc(array_size, sizeof(data_struct));

    // Duplicate array.
    for (index = low_int; index < array_size; index++) {
        temp_array[index] = data_array[index];
    }

    // Loop until all index have been iterated.
    for (index = low_int; index < (high_int + 1); index++) {

        // Check if left side is done but right is not.
        if (left_int > mid_int) {
            data_array[index] = temp_array[right_int];
            right_int++;
        } else {
            // Check if right side is done but left is not.
            if (right_int > high_int) {
                data_array[index] = temp_array[left_int];
                left_int++;
            } else {
                // Both sides not done. Compare both values.
                if (temp_array[left_int].db_index < temp_array[right_int].db_index) {
                    // Left less than right.
                    data_array[index] = temp_array[left_int];
                    left_int++;
                } else {
                    // Right less than left.
                    data_array[index] = temp_array[right_int];
                    right_int++;
                }
            }
        }
    }
    free(temp_array);
    return data_array;
}


/**
 * Merges all thread data into one struct.
 * Then saves this by writing to file.
 */
void merge_to_file(thread_return_struct** thread_arrays) {
    int index;
    char* temp_string;
    FILE* reset_file;
    thread_return_struct* temp_dataset;

    printf("Merging array data together...\n");

    // Get file path of file to save.
    temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
    strcat(temp_string, "/");
    strcat(temp_string, "sorted.yay");

    // Creates new file if does not exist. If already present, then clears file of all data.
    reset_file = fopen(temp_string, "w");
    if (reset_file == NULL) {
        err_sys("Failed to open file at %s", temp_string);
    }
    fclose(reset_file);
    free(temp_string);

    full_dataset = calloc(1, sizeof(thread_return_struct));
    full_dataset->data_array = calloc(thread_arrays[0]->array_count, sizeof(thread_return_struct));

    // For each dataset from directory, merge into full_dataset struct.
    for (index = 0; index < dir_file_counter; index++) {
        temp_dataset = merge_to_master(thread_arrays[index]);
        free(full_dataset->data_array);
        full_dataset->array_count = temp_dataset->array_count;
        full_dataset->data_array = temp_dataset->data_array;
        free(temp_dataset);
    }

    printf("Saving sorted data to file...\n");

    // For each record stored in the full_dataset struct, write to file.
    for (index = 0; index < full_dataset->array_count; index++) {
        write_to_file(full_dataset->data_array[index]);
    }

}


/**
 * Merge given subset of data into into the larger datset struct.
 * Keeps data in sorted order.
 *
 * Returns new full set of sorted data.
 */
thread_return_struct* merge_to_master(thread_return_struct* merge_dataset) {
    int index;
    int master_int = 0;
    int child_int = 0;
    int new_dataset_size;
    thread_return_struct* temp_dataset;

    // Calculate size of new dataset.
    new_dataset_size = (full_dataset->array_count + merge_dataset->array_count);

    // Create duplicate temp array..
    temp_dataset = calloc(new_dataset_size, sizeof(thread_return_struct));
    temp_dataset->data_array = calloc(new_dataset_size, sizeof(data_struct));

    // Actually merge given given struct into full_dataset struct.
    for (index = 0; index < new_dataset_size; index++) {

        // Check if master dataset is done.
        if (master_int >= full_dataset->array_count) {
            temp_dataset->data_array[index] = merge_dataset->data_array[child_int];
            child_int++;
        } else {
            // Check if merge dataset is done.
            if (child_int >= merge_dataset->array_count) {
                temp_dataset->data_array[index] = full_dataset->data_array[master_int];
                master_int++;
            } else {
                // Both not done. Compare values.
                if (full_dataset->data_array[master_int].db_index < merge_dataset->data_array[child_int].db_index) {
                    temp_dataset->data_array[index] = full_dataset->data_array[master_int];
                    master_int++;
                } else {
                    temp_dataset->data_array[index] = merge_dataset->data_array[child_int];
                    child_int++;
                }
            }
        }
    }
    temp_dataset->array_count = new_dataset_size;
    return temp_dataset;
}


/**
 * Writes a single data line to file.
 */
void write_to_file(data_struct data_array) {
    int index;
    int write_buffer_size;
    int newline_present_bool;
    char* temp_string = calloc(20, sizeof(char*));
    FILE* write_file;

    // Determine size of write buffer to create.
    snprintf(temp_string, 10, "%d", data_array.db_index);
    write_buffer_size = (strlen(data_array.user_name) + 1);
    write_buffer_size += (strlen(data_array.password) + 1);
    write_buffer_size += (strlen(data_array.blood_type) + 1);
    write_buffer_size += (strlen(data_array.domain_name) + 1);
    write_buffer_size += (strlen(temp_string));
    write_buffer_size += 2;

    // Create and populate write buffer.
    char* write_buffer = calloc(write_buffer_size, sizeof(char*));
    write_buffer = strcat(write_buffer, data_array.user_name);
    write_buffer = strcat(write_buffer, ",");
    write_buffer = strcat(write_buffer, data_array.password);
    write_buffer = strcat(write_buffer, ",");
    write_buffer = strcat(write_buffer, data_array.blood_type);
    write_buffer = strcat(write_buffer, ",");
    write_buffer = strcat(write_buffer, data_array.domain_name);
    write_buffer = strcat(write_buffer, ",");
    write_buffer = strcat(write_buffer, temp_string);
    free(temp_string);

    // Ensure file has newline char.
    index = 0;
    newline_present_bool = 0;
    while ((newline_present_bool == 0) && index < write_buffer_size) {
        if (write_buffer[index] == '\0') {
            if (write_buffer[index - 1] != '\n') {
                write_buffer[index] = '\n';
            }
            newline_present_bool = 1;
        }
        index++;
    }

    // Save write buffer to file.
    temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
    strcat(temp_string, "/");
    strcat(temp_string, "sorted.yay");

    write_file = fopen(temp_string, "a");
    if (write_file == NULL) {
        err_sys("Failed to open file at %s", temp_string);
    }

    fputs(write_buffer, write_file);
    fclose(write_file);

    free(write_buffer);
    free(temp_string);
}