-
Brandon Rodriguez authored5be6d618
Main.c 20.70 KiB
/**
* Brandon Rodriguez
* CS 3240
* 11-15-17
* a4 (Assignment 5)
*/
/**
* Description:
* Implement threads to solve a problem.
*
* In this "real world scenario" problem, originally a master file had a bunch of data.
* Somehow, someone "messed up real good" and now the master file no longer exists.
* Furthermore, all the data from the master file technically exists, but it's
* fragmented into smaller subfiles, and out of order.
*
* This program needs to be able to correct this issue, rebuilding the master file
* and sorting it as well. And it needs to use threads so that it doesn't take
* forever to accomplish (assuming the master file was originally really really big).
*/
/**
* Known Issues:
* This appears to have a race condition, somehow.
* Unfortunately, I don't seem to understand threading enough to troubleshoot it.
*
* But sometimes when running, all memory is cleared according to valgrind and
* everything is fine.
* Other times, valgrind reports errors on program end. There will arbitrarily be
* exactly "1,614 bytes in 4 blocks" that are lost. This appears to generally be
* consistent, regardless of how many files are in the selected folder (IE, how
* many threads are created) or what the contents of said files are.
* The actual error descrition seems to point to the imported pthread code,
* which means the issue is something to do with my implementation of threading.
*/
#define _BSD_SOURCE
// Import headers.
#include <ctype.h>
#include <dirent.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
#include "apue.h"
#include "HelperHeader.h"
// Define Vars.
#define BUFFER_SIZE 4096
// Variables.
typedef struct {
char* user_name;
char* password;
char* blood_type;
char* domain_name;
int db_index;
} data_struct;
typedef struct {
int array_count;
data_struct* data_array;
} thread_return_struct;
int dir_file_counter;
char* absolute_path;
pthread_t* thread_array;
thread_return_struct* full_dataset;
// Method Declaration.
int change_directory(); // Safely changes directory to inicated folder.
void open_folder(); // Opens indicated folder.
void* thread_read_file(); // Reads given file and reorganizes data.
data_struct* sort_data_array(); // Begins sorting of read-in file.
data_struct* merge_sort(); // First half of merge sort.
data_struct* merge_array(); // Second half of merge sort.
void merge_to_file(); // Merges all thread data into one to write to file.
thread_return_struct* merge_to_master(); // Merges thread data together.
void write_to_file(); // Writes a single data line to file.
/**
* Program's main.
* Initializes and runs program.
*/
int main(int argc, char* argv[]) {
int index;
int return_int;
int struct_number;
thread_return_struct* return_struct;
thread_return_struct** thread_arrays;
// Check for valid args.
if (argc < 2) {
err_msg("Must enter a directory path.");
return -1;
} else if (argc > 2) {
err_msg("Must enter exacly one directory path.");
return -1;
} else {
return_int = change_directory(argv[1]);
if (return_int == 0) {
// Get absolute path.
absolute_path = calloc(BUFFER_SIZE, sizeof(char*));
if (getcwd(absolute_path, BUFFER_SIZE) == NULL) {
err_sys("Failed to get absolute path.");
}
// Open folder and create appropriate number of threads.
open_folder();
thread_arrays = calloc(dir_file_counter, sizeof(*return_struct));
// Iterate through all threads and grab returned value.
for (index = 0; index < dir_file_counter; index++) {
pthread_join(thread_array[index], (void**) &return_struct);
// thread_arrays[index] = return_struct;
thread_arrays[index] = calloc(1, sizeof(thread_return_struct));
thread_arrays[index]->array_count = return_struct->array_count;
thread_arrays[index]->data_array = return_struct->data_array;
free(return_struct);
}
// Take thread data and merge + write to file.
merge_to_file(thread_arrays);
printf("\nAll operations complete.\n");
printf("Sorted data can found at %s/sorted.yay\n", absolute_path);
// Merging done. Free memory.
for (index = 0; index < dir_file_counter; index++) {
for (struct_number = 0; struct_number < thread_arrays[index]->array_count; struct_number++) {
free(thread_arrays[index]->data_array[struct_number].user_name);
free(thread_arrays[index]->data_array[struct_number].password);
free(thread_arrays[index]->data_array[struct_number].blood_type);
free(thread_arrays[index]->data_array[struct_number].domain_name);
}
free(thread_arrays[index]->data_array);
free(thread_arrays[index]);
}
free(thread_arrays);
free(absolute_path);
free(thread_array);
free(full_dataset->data_array);
free(full_dataset);
}
}
return 0;
}
/**
* Safely changes directory by first checking path value and permissions.
*
* Returns 0 on success or -1 on failure.
*/
int change_directory(char* folder_location) {
int return_int;
struct stat stat_buffer;
return_int =lstat(folder_location, &stat_buffer);
if (return_int <0) {
err_sys("Failed to stat file with err %d", return_int);
return -1;
}
// First, ensure that it is, infact, a directory.
if (S_ISDIR(stat_buffer.st_mode)) {
// Next, check permissions.
if (access(folder_location, X_OK) == 0) {
// Change into directory.
return_int = chdir(folder_location);
if (return_int < 0) {
err_sys("Failed to change directory with err %d", return_int);
return -1;
}
} else { // No execute permission.
err_msg("No execute permission. Cannot change into directory.\n");
return -1;
}
} else { // Not a dir.
err_msg("Provided path is not a directory.\n");
return -1;
}
return 0;
}
/**
* Opens indicated folder and hands files off to threads.
*/
void open_folder() {
int index;
int return_int;
char* temp_string;
char* save_file_path;
struct dirent* dir_struct;
struct stat stat_buffer;
DIR* dir_pointer;
pthread_attr_t attr;
size_t stacksize;
printf("Locating files to merge...\n");
// Save path of file to save to.
save_file_path = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
strcat(save_file_path, "/");
strcat(save_file_path, "sorted.yay");
// Iterate through directory first time, to count number of files to open/threads to make.
dir_file_counter = 0;
dir_pointer = opendir(absolute_path);
// Check that directory has read permissions.
if (access(absolute_path, R_OK) == 0) {
// Loop until no more files in directory.
while (dir_pointer != NULL) {
if ((dir_struct = readdir(dir_pointer)) != NULL) {
// New file found. Check if standard file type.
return_int = lstat(dir_struct->d_name, &stat_buffer);
if (return_int < 0) {
err_sys("Failed to stat file with err %d", return_int);
}
if (S_ISREG(stat_buffer.st_mode)) {
// Get absolute file path.
temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
strcat(temp_string, "/");
strcat(temp_string, dir_struct->d_name);
// Ensure that file is not save file.
if (strcmp(temp_string, save_file_path) != 0) {
printf("Path: %s\n", temp_string);
dir_file_counter++;
}
free(temp_string);
}
} else {
// End of files in directory. Closing stream.
return_int = closedir(dir_pointer);
if (return_int < 0) {
err_msg("Failed to properly close directory.");
}
dir_pointer = NULL;
}
}
} else {
err_msg("Directory does not have read access. Cannot view files.");
}
// Prepare to set up threads.
thread_array = calloc(dir_file_counter, sizeof(pthread_t));
index = 0;
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr, &stacksize);
if (stacksize < 8388608) {
stacksize = 8388608;
}
pthread_attr_setstacksize(&attr, (stacksize * 2));
printf("Reading in files and creating sorting threads...\n");
// Iterate through directory again. This time, actually create threads and hand off files.
dir_pointer = opendir(absolute_path);
// Check that directory has read permissions.
if (access(absolute_path, R_OK) == 0) {
// Loop until no more files in directory.
while (dir_pointer != NULL) {
if ((dir_struct = readdir(dir_pointer)) != NULL) {
// New file found. Check if standard file type.
return_int = lstat(dir_struct->d_name, &stat_buffer);
if (return_int < 0) {
err_sys("Failed to stat file with err %d", return_int);
}
if (S_ISREG(stat_buffer.st_mode)) {
// Get absolute file path.
temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
strcat(temp_string, "/");
strcat(temp_string, dir_struct->d_name);
// Actually create threads.
if (strcmp(temp_string, save_file_path) != 0) {
// printf("Path: %s\n", temp_string);
pthread_create(&thread_array[index], &attr, thread_read_file, (void*) copy_string(temp_string));
index++;
}
free(temp_string);
}
} else {
// End of files in directory. Closing stream.
return_int = closedir(dir_pointer);
if (return_int < 0) {
err_msg("Failed to properly close directory.");
}
dir_pointer = NULL;
}
}
} else {
err_msg("Directory does not have read access. Cannot view files.");
}
free(save_file_path);
}
/**
* Uses thread to read file value.
*/
void* thread_read_file(void* file_location) {
int result_int;
int struct_max;
int struct_number;
char* line_buffer = calloc(BUFFER_SIZE, sizeof(char*));
char* token;
char* save_pointer;
FILE* read_file;
data_struct* data_array;
thread_return_struct* return_struct;
struct_max = 1024;
struct_number = -1;
data_array = calloc(struct_max, sizeof(data_struct));
// Open file.
read_file = fopen(file_location, "r");
if (read_file == NULL) {
err_sys("Failed to open file at %s", file_location);
}
free(file_location);
// Read file line by line and assign values to struct.
while(fgets(line_buffer, BUFFER_SIZE, read_file) != NULL) {
line_buffer = remove_newline(line_buffer);
struct_number++;
// Check that there is still more space in data_array.
if (struct_number >= struct_max) {
struct_max = struct_max * 2;
data_array = realloc(data_array, struct_max);
}
// Actually parse line to array.
// First get username.
token = strtok_r(line_buffer, ",", &save_pointer);
data_array[struct_number].user_name = copy_string(token);
// Get password.
token = strtok_r(NULL, ",", &save_pointer);
data_array[struct_number].password = copy_string(token);
// Get blood type.
token = strtok_r(NULL, ",", &save_pointer);
data_array[struct_number].blood_type = copy_string(token);
// Get domain name.
token = strtok_r(NULL, ",", &save_pointer);
data_array[struct_number].domain_name = copy_string(token);
// Get db index.
token = strtok_r(NULL, ",", &save_pointer);
data_array[struct_number].db_index = atoi(token);
}
data_array = merge_sort(data_array, 0, struct_number, (struct_number) + 1);
return_struct = calloc(1, sizeof(thread_return_struct));
return_struct->array_count = (struct_number + 1);
return_struct->data_array = data_array;
// Close file and exit out.
result_int = fclose(read_file);
if (result_int != 0) {
err_msg("Failed to close file properly.");
}
free(line_buffer);
pthread_exit((void*) return_struct);
}
/**
* Begins sorting of single read-in file's data.
*
* Returns struct array of sorted data.
*/
data_struct* sort_data_array(data_struct* data_array, int array_size) {
data_struct* temp_array = calloc(array_size, sizeof(data_struct));
temp_array = merge_sort(data_array, 0, array_size);
free(data_array);
return temp_array;
}
/**
* First half of data array merge sort.
*
* Returns struct array of sorted data.
*/
data_struct* merge_sort(data_struct* data_array, int low_int, int high_int, int array_size) {
int mid_int;
// Base case. If equal or less, back out of recursion.
if (high_int <= low_int) {
return data_array;
}
// Recursively divide and merge.
mid_int = (low_int + high_int) / 2;
data_array = merge_sort(data_array, low_int, mid_int, array_size);
data_array = merge_sort(data_array, (mid_int + 1), high_int, array_size);
return merge_array(data_array, low_int, mid_int, high_int, array_size);
}
/**
* Second half of data array merge sort.
*
* Returns sorted section of data array.
*/
data_struct* merge_array(data_struct* data_array, int low_int, int mid_int, int high_int, int array_size) {
int index;
int left_int = low_int;
int right_int = mid_int + 1;
data_struct* temp_array = calloc(array_size, sizeof(data_struct));
// Duplicate array.
for (index = low_int; index < array_size; index++) {
temp_array[index] = data_array[index];
}
// Loop until all index have been iterated.
for (index = low_int; index < (high_int + 1); index++) {
// Check if left side is done but right is not.
if (left_int > mid_int) {
data_array[index] = temp_array[right_int];
right_int++;
} else {
// Check if right side is done but left is not.
if (right_int > high_int) {
data_array[index] = temp_array[left_int];
left_int++;
} else {
// Both sides not done. Compare both values.
if (temp_array[left_int].db_index < temp_array[right_int].db_index) {
// Left less than right.
data_array[index] = temp_array[left_int];
left_int++;
} else {
// Right less than left.
data_array[index] = temp_array[right_int];
right_int++;
}
}
}
}
free(temp_array);
return data_array;
}
/**
* Merges all thread data into one struct.
* Then saves this by writing to file.
*/
void merge_to_file(thread_return_struct** thread_arrays) {
int index;
char* temp_string;
FILE* reset_file;
thread_return_struct* temp_dataset;
printf("Merging array data together...\n");
// Get file path of file to save.
temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
strcat(temp_string, "/");
strcat(temp_string, "sorted.yay");
// Creates new file if does not exist. If already present, then clears file of all data.
reset_file = fopen(temp_string, "w");
if (reset_file == NULL) {
err_sys("Failed to open file at %s", temp_string);
}
fclose(reset_file);
free(temp_string);
full_dataset = calloc(1, sizeof(thread_return_struct));
full_dataset->data_array = calloc(thread_arrays[0]->array_count, sizeof(thread_return_struct));
// For each dataset from directory, merge into full_dataset struct.
for (index = 0; index < dir_file_counter; index++) {
temp_dataset = merge_to_master(thread_arrays[index]);
free(full_dataset->data_array);
full_dataset->array_count = temp_dataset->array_count;
full_dataset->data_array = temp_dataset->data_array;
free(temp_dataset);
}
printf("Saving sorted data to file...\n");
// For each record stored in the full_dataset struct, write to file.
for (index = 0; index < full_dataset->array_count; index++) {
write_to_file(full_dataset->data_array[index]);
}
}
/**
* Merge given subset of data into into the larger datset struct.
* Keeps data in sorted order.
*
* Returns new full set of sorted data.
*/
thread_return_struct* merge_to_master(thread_return_struct* merge_dataset) {
int index;
int master_int = 0;
int child_int = 0;
int new_dataset_size;
thread_return_struct* temp_dataset;
// Calculate size of new dataset.
new_dataset_size = (full_dataset->array_count + merge_dataset->array_count);
// Create duplicate temp array..
temp_dataset = calloc(new_dataset_size, sizeof(thread_return_struct));
temp_dataset->data_array = calloc(new_dataset_size, sizeof(data_struct));
// Actually merge given given struct into full_dataset struct.
for (index = 0; index < new_dataset_size; index++) {
// Check if master dataset is done.
if (master_int >= full_dataset->array_count) {
temp_dataset->data_array[index] = merge_dataset->data_array[child_int];
child_int++;
} else {
// Check if merge dataset is done.
if (child_int >= merge_dataset->array_count) {
temp_dataset->data_array[index] = full_dataset->data_array[master_int];
master_int++;
} else {
// Both not done. Compare values.
if (full_dataset->data_array[master_int].db_index < merge_dataset->data_array[child_int].db_index) {
temp_dataset->data_array[index] = full_dataset->data_array[master_int];
master_int++;
} else {
temp_dataset->data_array[index] = merge_dataset->data_array[child_int];
child_int++;
}
}
}
}
temp_dataset->array_count = new_dataset_size;
return temp_dataset;
}
/**
* Writes a single data line to file.
*/
void write_to_file(data_struct data_array) {
int index;
int write_buffer_size;
int newline_present_bool;
char* temp_string = calloc(20, sizeof(char*));
FILE* write_file;
// Determine size of write buffer to create.
snprintf(temp_string, 10, "%d", data_array.db_index);
write_buffer_size = (strlen(data_array.user_name) + 1);
write_buffer_size += (strlen(data_array.password) + 1);
write_buffer_size += (strlen(data_array.blood_type) + 1);
write_buffer_size += (strlen(data_array.domain_name) + 1);
write_buffer_size += (strlen(temp_string));
write_buffer_size += 2;
// Create and populate write buffer.
char* write_buffer = calloc(write_buffer_size, sizeof(char*));
write_buffer = strcat(write_buffer, data_array.user_name);
write_buffer = strcat(write_buffer, ",");
write_buffer = strcat(write_buffer, data_array.password);
write_buffer = strcat(write_buffer, ",");
write_buffer = strcat(write_buffer, data_array.blood_type);
write_buffer = strcat(write_buffer, ",");
write_buffer = strcat(write_buffer, data_array.domain_name);
write_buffer = strcat(write_buffer, ",");
write_buffer = strcat(write_buffer, temp_string);
free(temp_string);
// Ensure file has newline char.
index = 0;
newline_present_bool = 0;
while ((newline_present_bool == 0) && index < write_buffer_size) {
if (write_buffer[index] == '\0') {
if (write_buffer[index - 1] != '\n') {
write_buffer[index] = '\n';
}
newline_present_bool = 1;
}
index++;
}
// Save write buffer to file.
temp_string = copy_string_with_buffer(absolute_path, BUFFER_SIZE);
strcat(temp_string, "/");
strcat(temp_string, "sorted.yay");
write_file = fopen(temp_string, "a");
if (write_file == NULL) {
err_sys("Failed to open file at %s", temp_string);
}
fputs(write_buffer, write_file);
fclose(write_file);
free(write_buffer);
free(temp_string);
}