diff --git a/src/connectors/core.py b/src/connectors/core.py index 5cb9b34cfb253b010da796e173abb15ec0f6f99a..dae43324fdeefbedf6dd4d29c8ee00ced6fbae9a 100644 --- a/src/connectors/core.py +++ b/src/connectors/core.py @@ -270,15 +270,22 @@ class BaseTables(): results = self._base.query.execute(query) logger.info('results: {0}'.format(results)) - def create(self, table_name): + def create(self, table_name, table_columns): """ Creates new table with provided name. :param table_name: Desired name of new table. + :param table_columns: Column values for new table. """ # First, check that provided name is valid format. if not self._base.validate.table_name(table_name): raise ValueError('Invalid table name of "{0}".'.format(table_name)) + # Check that provided columns are valid format. + orig_table_columns = table_columns + table_columns = self._base.validate.table_columns(table_columns) + if table_columns is None: + raise ValueError('Invalid table columns of "{0}"'.format(orig_table_columns)) + # Get list of valid tables. available_tables = self._get() @@ -288,8 +295,8 @@ class BaseTables(): raise ValueError('Table with name "{0}" already exists'.format(table_name)) # Create new table. - raise NotImplemented('Function needs column-definition handling.') - query = 'CREATE TABLE {0};'.format(table_name) + # raise NotImplemented('Function needs column-definition handling.') + query = 'CREATE TABLE {0} {1};'.format(table_name, table_columns) self._base.query.execute(query) logger.info('Created table "{0}".'.format(table_name)) @@ -347,3 +354,16 @@ class BaseValidate(): """ # For now, always return as valid. return True + + def table_columns(self, columns): + """ + Validates that provided column values match expected syntax. + :param columns: Str or dict of columns to validate. + :return: True if columns are valid | False otherwise. + """ + # Add parenthesis if either side is missing them. + if columns[0] != '(' or columns[-1] != ')': + columns = '(' + columns + ')' + + # For now, always return as valid. + return columns diff --git a/tests/connectors/core.py b/tests/connectors/core.py new file mode 100644 index 0000000000000000000000000000000000000000..88e2f1efbef7b54e11697038663b9de86a92cd1a --- /dev/null +++ b/tests/connectors/core.py @@ -0,0 +1,118 @@ +""" +Tests for core connector logic. +""" + +# System Imports. +import unittest + +# User Imports. +from config import mysql_config, sqlite_config +from src.connectors import MySqlConnector, SqliteConnector + + +class TestConnectorCoreTables(unittest.TestCase): + """ + Tests + """ + @classmethod + def setUpClass(cls): + # Run parent setup logic. + super().setUpClass() + + cls.test_database_name = 'python__db_connector__mysql_unittest__{0}'.format(cls.__name__) + + # Initialize db connection objects. + cls.mysql_connector = MySqlConnector( + mysql_config['host'], + mysql_config['port'], + mysql_config['user'], + mysql_config['password'], + mysql_config['name'], + debug=True, + ) + + # Check that expected database exists. + results = cls.mysql_connector.database.show() + if cls.test_database_name not in results: + cls.mysql_connector.database.create(cls.test_database_name) + print('Created database "{0}".'.format(cls.test_database_name)) + + # Select desired database for usage. + cls.mysql_connector.database.use(cls.test_database_name) + + # Check that database has no tables. + results = cls.mysql_connector.tables.show() + if len(results) > 0: + for result in results: + print('Clearing existing tables for "{0}".'.format(cls.test_database_name)) + cls.mysql_connector.tables.drop(result) + + @classmethod + def tearDownClass(cls): + # Run parent setup logic. + super().tearDownClass() + + # Remove expected database. + cls.mysql_connector.database.drop(cls.test_database_name) + + def setUp(self): + # Run parent setup logic. + super().setUp() + + def test__create_table___col_str(self): + """ + Tests that connector object properly creates new tables, via str of column data. + """ + table_name = 'test_create_table' + + # Check that expected table DOES NOT yet exist in database. + results = self.mysql_connector.tables.show() + self.assertNotIn(table_name, results) + + # Attempt to generate table. + self.mysql_connector.tables.create(table_name, 'id INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)') + + # Check that expected table now exists in database. + results = self.mysql_connector.tables.show() + self.assertIn(table_name, results) + + # def test__create_table___col_dict(self): + # """ + # Tests that connector object properly creates new tables, via dict of column data. + # """ + # table_name = 'test_create_table' + # + # # Check that expected table DOES NOT yet exist in database. + # results = self.mysql_connector.tables.show() + # self.assertNotIn(table_name, results) + # + # # Attempt to generate table. + # self.mysql_connector.tables.create(table_name, 'id INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)') + # + # # Check that expected table now exists in database. + # results = self.mysql_connector.tables.show() + # self.assertIn(table_name, results) + + def test__drop_table(self): + """ + Tests that connector object properly drops tables. + """ + table_name = 'test_drop_table' + + # Check that expected table DOES NOT yet exist in database. + results = self.mysql_connector.tables.show() + self.assertNotIn(table_name, results) + + # Attempt to generate table. + self.mysql_connector.tables.create(table_name, 'id INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)') + + # Check that expected table now exists in database. + results = self.mysql_connector.tables.show() + self.assertIn(table_name, results) + + # Attempt to remove table. + self.mysql_connector.tables.drop(table_name) + + # Check that expected table was removed. + results = self.mysql_connector.tables.show() + self.assertNotIn(table_name, results)