From a05d0fbba60135bb815be854b28a8241aa3e1bd9 Mon Sep 17 00:00:00 2001
From: Brandon Rodriguez <brodriguez8774@gmail.com>
Date: Thu, 20 Jan 2022 22:44:53 -0500
Subject: [PATCH] Expand table connection logic, and create initial associated
 unittests

---
 src/connectors/core.py   |  26 ++++++++-
 tests/connectors/core.py | 118 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 141 insertions(+), 3 deletions(-)
 create mode 100644 tests/connectors/core.py

diff --git a/src/connectors/core.py b/src/connectors/core.py
index 5cb9b34..dae4332 100644
--- a/src/connectors/core.py
+++ b/src/connectors/core.py
@@ -270,15 +270,22 @@ class BaseTables():
         results = self._base.query.execute(query)
         logger.info('results: {0}'.format(results))
 
-    def create(self, table_name):
+    def create(self, table_name, table_columns):
         """
         Creates new table with provided name.
         :param table_name: Desired name of new table.
+        :param table_columns: Column values for new table.
         """
         # First, check that provided name is valid format.
         if not self._base.validate.table_name(table_name):
             raise ValueError('Invalid table name of "{0}".'.format(table_name))
 
+        # Check that provided columns are valid format.
+        orig_table_columns = table_columns
+        table_columns = self._base.validate.table_columns(table_columns)
+        if table_columns is None:
+            raise ValueError('Invalid table columns of "{0}"'.format(orig_table_columns))
+
         # Get list of valid tables.
         available_tables = self._get()
 
@@ -288,8 +295,8 @@ class BaseTables():
             raise ValueError('Table with name "{0}" already exists'.format(table_name))
 
         # Create new table.
-        raise NotImplemented('Function needs column-definition handling.')
-        query = 'CREATE TABLE {0};'.format(table_name)
+        # raise NotImplemented('Function needs column-definition handling.')
+        query = 'CREATE TABLE {0} {1};'.format(table_name, table_columns)
         self._base.query.execute(query)
         logger.info('Created table "{0}".'.format(table_name))
 
@@ -347,3 +354,16 @@ class BaseValidate():
         """
         # For now, always return as valid.
         return True
+
+    def table_columns(self, columns):
+        """
+        Validates that provided column values match expected syntax.
+        :param columns: Str or dict of columns to validate.
+        :return: True if columns are valid | False otherwise.
+        """
+        # Add parenthesis if either side is missing them.
+        if columns[0] != '(' or columns[-1] != ')':
+            columns = '(' + columns + ')'
+
+        # For now, always return as valid.
+        return columns
diff --git a/tests/connectors/core.py b/tests/connectors/core.py
new file mode 100644
index 0000000..88e2f1e
--- /dev/null
+++ b/tests/connectors/core.py
@@ -0,0 +1,118 @@
+"""
+Tests for core connector logic.
+"""
+
+# System Imports.
+import unittest
+
+# User Imports.
+from config import mysql_config, sqlite_config
+from src.connectors import MySqlConnector, SqliteConnector
+
+
+class TestConnectorCoreTables(unittest.TestCase):
+    """
+    Tests
+    """
+    @classmethod
+    def setUpClass(cls):
+        # Run parent setup logic.
+        super().setUpClass()
+
+        cls.test_database_name = 'python__db_connector__mysql_unittest__{0}'.format(cls.__name__)
+
+        # Initialize db connection objects.
+        cls.mysql_connector = MySqlConnector(
+            mysql_config['host'],
+            mysql_config['port'],
+            mysql_config['user'],
+            mysql_config['password'],
+            mysql_config['name'],
+            debug=True,
+        )
+
+        # Check that expected database exists.
+        results = cls.mysql_connector.database.show()
+        if cls.test_database_name not in results:
+            cls.mysql_connector.database.create(cls.test_database_name)
+            print('Created database "{0}".'.format(cls.test_database_name))
+
+        # Select desired database for usage.
+        cls.mysql_connector.database.use(cls.test_database_name)
+
+        # Check that database has no tables.
+        results = cls.mysql_connector.tables.show()
+        if len(results) > 0:
+            for result in results:
+                print('Clearing existing tables for "{0}".'.format(cls.test_database_name))
+                cls.mysql_connector.tables.drop(result)
+
+    @classmethod
+    def tearDownClass(cls):
+        # Run parent setup logic.
+        super().tearDownClass()
+
+        # Remove expected database.
+        cls.mysql_connector.database.drop(cls.test_database_name)
+
+    def setUp(self):
+        # Run parent setup logic.
+        super().setUp()
+
+    def test__create_table___col_str(self):
+        """
+        Tests that connector object properly creates new tables, via str of column data.
+        """
+        table_name = 'test_create_table'
+
+        # Check that expected table DOES NOT yet exist in database.
+        results = self.mysql_connector.tables.show()
+        self.assertNotIn(table_name, results)
+
+        # Attempt to generate table.
+        self.mysql_connector.tables.create(table_name, 'id INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)')
+
+        # Check that expected table now exists in database.
+        results = self.mysql_connector.tables.show()
+        self.assertIn(table_name, results)
+
+    # def test__create_table___col_dict(self):
+    #     """
+    #     Tests that connector object properly creates new tables, via dict of column data.
+    #     """
+    #     table_name = 'test_create_table'
+    #
+    #     # Check that expected table DOES NOT yet exist in database.
+    #     results = self.mysql_connector.tables.show()
+    #     self.assertNotIn(table_name, results)
+    #
+    #     # Attempt to generate table.
+    #     self.mysql_connector.tables.create(table_name, 'id INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)')
+    #
+    #     # Check that expected table now exists in database.
+    #     results = self.mysql_connector.tables.show()
+    #     self.assertIn(table_name, results)
+
+    def test__drop_table(self):
+        """
+        Tests that connector object properly drops tables.
+        """
+        table_name = 'test_drop_table'
+
+        # Check that expected table DOES NOT yet exist in database.
+        results = self.mysql_connector.tables.show()
+        self.assertNotIn(table_name, results)
+
+        # Attempt to generate table.
+        self.mysql_connector.tables.create(table_name, 'id INT(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)')
+
+        # Check that expected table now exists in database.
+        results = self.mysql_connector.tables.show()
+        self.assertIn(table_name, results)
+
+        # Attempt to remove table.
+        self.mysql_connector.tables.drop(table_name)
+
+        # Check that expected table was removed.
+        results = self.mysql_connector.tables.show()
+        self.assertNotIn(table_name, results)
-- 
GitLab