diff --git a/src/connectors/core/query.py b/src/connectors/core/query.py index 730ca3417528e68c899c2605d151fb4bdad564d8..800e0386ffaac426842c08c92e079ba77b61e712 100644 --- a/src/connectors/core/query.py +++ b/src/connectors/core/query.py @@ -46,3 +46,95 @@ class BaseQuery(): if results is None: results = [] return results + + def select(self, table_name, select_clause=None): + """""" + # Check that provided table name is valid format. + if not self._base.validate.table_name(table_name): + raise ValueError('Invalid table name of "{0}".'.format(table_name)) + + # Check that provided SELECT clause is valid format. + if select_clause is None: + select_clause = '*' + if not self._base.validate.table_name(select_clause): + raise ValueError('Invalid SELECT clause of "{0}".'.format(select_clause)) + + # Select record. + query = 'SELECT {0} FROM {1};'.format(select_clause, table_name) + results = self.execute(query) + logger.info('{0}'.format(query)) + logger.info('{0}'.format(results)) + + return results + + def insert(self, table_name, values_clause, columns_clause=None): + """""" + # Check that provided table name is valid format. + if not self._base.validate.table_name(table_name): + raise ValueError('Invalid table name of "{0}".'.format(table_name)) + + # Check that provided COLUMNS clause is valid format. + if columns_clause is None: + columns_clause = '' + columns_clause = str(columns_clause).strip() + if not self._base.validate.columns_clause(columns_clause): + raise ValueError('Invalid COLUMNS clause of "{0}".'.format(columns_clause)) + + # Check that provided VALUES clause is valid format. + if not self._base.validate.values_clause(values_clause): + raise ValueError('Invalid VALUES clause of "{0}".'.format(values_clause)) + + # Insert record. + query = """ + INSERT INTO {0}{1} + VALUES {2}; + """.format(table_name, columns_clause, values_clause) + results = self.execute(query) + logger.info('{0}'.format(query)) + logger.info('{0}'.format(results)) + + return results + + def update(self, table_name, values_clause, where_clause): + """""" + # Check that provided table name is valid format. + if not self._base.validate.table_name(table_name): + raise ValueError('Invalid table name of "{0}".'.format(table_name)) + + # Check that provided VALUES clause is valid format. + if not self._base.validate.values_clause(values_clause): + raise ValueError('Invalid VALUES clause of "{0}".'.format(values_clause)) + + # Check that provided WHERE clause is valid format. + if not self._base.validate.columns_clause(where_clause): + raise ValueError('Invalid WHERE clause of "{0}".'.format(where_clause)) + + # Update record. + query = """ + UPDATE {0} + SET {1} + WHERE {2}; + """.format(table_name, values_clause, where_clause) + results = self.execute(query) + logger.info('{0}'.format(query)) + logger.info('{0}'.format(results)) + + return results + + def delete(self, table_name, where_clause): + """""" + # Check that provided table name is valid format. + if not self._base.validate.table_name(table_name): + raise ValueError('Invalid table name of "{0}".'.format(table_name)) + + # Check that provided WHERE clause is valid format. + if not self._base.validate.columns_clause(where_clause): + raise ValueError('Invalid WHERE clause of "{0}".'.format(where_clause)) + + # Delete record. + query = 'DELETE FROM {0} WHERE {1};'.format(table_name, where_clause) + results = self.execute(query) + logger.info('{0}'.format(query)) + logger.info('{0}'.format(results)) + + return results diff --git a/src/connectors/core/validate.py b/src/connectors/core/validate.py index 62d40df8d3900c1186965f67b260cd7223c06bfa..e0fe376c8d5a02b693fc2ea3ed2fd86654fc1809 100644 --- a/src/connectors/core/validate.py +++ b/src/connectors/core/validate.py @@ -32,16 +32,18 @@ class BaseValidate(): """ Validates that provided database name uses set of acceptable characters. :param name: Potential name of database to validate. - :return: True if name is valid | False otherwise. + :return: True if valid | False otherwise. """ # For now, always return as valid. return True + # region Name Validation + def table_name(self, name): """ Validates that provided table name uses set of acceptable characters. :param name: Potential name of table to validate. - :return: True if name is valid | False otherwise. + :return: True if valid | False otherwise. """ # For now, always return as valid. return True @@ -99,3 +101,54 @@ class BaseValidate(): # For now, always return as valid. return columns + + # endregion Name Validation + + # region Clause Validation + + def select_clause(self, clause): + """ + Validates that provided clause follows acceptable format. + :param clause: SELECT clause to validate. + :return: True if valid | False otherwise. + """ + # For now, always return as valid. + return True + + def columns_clause(self, clause): + """ + Validates that provided clause follows acceptable format. + :param clause: COLUMNS clause to validate. + :return: True if valid | False otherwise. + """ + # For now, always return as valid. + return True + + def values_clause(self, clause): + """ + Validates that provided clause follows acceptable format. + :param clause: VALUES clause to validate. + :return: True if valid | False otherwise. + """ + # For now, always return as valid. + return True + + def where_clause(self, clause): + """ + Validates that provided clause follows acceptable format. + :param clause: WHERE clause to validate. + :return: True if valid | False otherwise. + """ + # For now, always return as valid. + return True + + def order_by_clause(self, clause): + """ + Validates that provided clause follows acceptable format. + :param clause: ORDER_BY clause to validate. + :return: True if valid | False otherwise. + """ + # For now, always return as valid. + return True + + # endregion Clause Validation diff --git a/tests/connectors/mysql/test_query.py b/tests/connectors/mysql/test_query.py index f2b5fa32648462d1e04a79bcc2d7550e285a6f09..061e9f9114f455fe8b95de75acb84b2a0f9d3742 100644 --- a/tests/connectors/mysql/test_query.py +++ b/tests/connectors/mysql/test_query.py @@ -3,14 +3,13 @@ Tests for "query" logic of "MySQL" DB Connector class. """ # System Imports. -import unittest +import MySQLdb # User Imports. -from config import mysql_config, sqlite_config -from src.connectors import MysqlDbConnector, PostgresqlDbConnector, SqliteDbConnector +from .test_core import TestMysqlDatabaseParent -class TestMysqlQuery(unittest.TestCase): +class TestMysqlQuery(TestMysqlDatabaseParent): """ Tests "MySQL" DB Connector class query logic. """ @@ -18,3 +17,213 @@ class TestMysqlQuery(unittest.TestCase): def setUpClass(cls): # Run parent setup logic. super().setUpClass() + + # Initialize database for tests. + db_name = 'python__db_connector__test_queries' + cls.connector.database.create(db_name) + cls.connector.database.use(db_name) + + # Define default table columns. + cls._columns_query = """( + id INT NOT NULL AUTO_INCREMENT, + name VARCHAR(100), + description VARCHAR(100), + PRIMARY KEY ( id ) + )""" + + def test__select__success(self): + """ + Test `SELECT` query when table does not exist. + """ + table_name = 'test_queries__select__success' + + # Verify table exists. + try: + self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_query)) + except MySQLdb.OperationalError: + # Table already exists, as we want. + pass + + with self.subTest('SELECT query when table has no records'): + # Run test query. + results = self.connector.tables.show() + + # Verify no records returned. + self.assertGreaterEqual(len(results), 0) + self.assertIn(table_name, results) + + with self.subTest('SHOW query when table has records'): + # Run test query. + row = (1, 'test_name_1', 'test_desc_1') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row)) + results = self.connector.query.select(table_name) + + # Verify one record returned. + self.assertEqual(len(results), 1) + self.assertIn(row, results) + + # Run test query. + row = (2, 'test_name_2', 'test_desc_2') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row)) + results = self.connector.query.select(table_name) + + # Verify two records returned. + self.assertEqual(len(results), 2) + self.assertIn(row, results) + + # Works for 0, 1, and 2. Assume works for all further n+1 values. + + def test__insert__success(self): + """ + Test `INSERT` query when table does not exist. + """ + table_name = 'test_queries__insert__success' + + # Verify table exists. + try: + self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_query)) + except MySQLdb.OperationalError: + # Table already exists, as we want. + pass + + # Verify starting state. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 0) + + # Run test query. + row = (1, 'test_name_1', 'test_desc_1') + self.connector.query.insert(table_name, row) + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + + # Verify one record returned. + self.assertEqual(len(results), 1) + self.assertIn(row, results) + + # Run test query. + row = (2, 'test_name_2', 'test_desc_2') + self.connector.query.insert(table_name, row) + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + + # Verify two records returned. + self.assertEqual(len(results), 2) + self.assertIn(row, results) + + # Works for 0, 1, and 2. Assume works for all further n+1 values. + + def test__update__success(self): + """ + Test `UPDATE` query when table does not exist. + """ + table_name = 'test_queries__update__success' + + # Verify table exists. + try: + self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_query)) + except MySQLdb.OperationalError: + # Table already exists, as we want. + pass + + # Initialize state. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 0) + row_1 = (1, 'test_name_1', 'test_desc_1') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_1)) + row_2 = (2, 'test_name_2', 'test_desc_2') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_2)) + row_3 = (3, 'test_name_3', 'test_desc_3') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_3)) + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 3) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + + with self.subTest('With WHERE clause'): + # Update row 2 and verify change. + self.connector.query.update(table_name, 'name = "updated name"', 'id = 2') + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + old_row_2 = row_2 + row_2 = (2, 'updated name', 'test_desc_2') + self.assertEqual(len(results), 3) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + self.assertNotIn(old_row_2, results) + + # Update row 3 and verify change. + self.connector.query.update(table_name, 'description = "testing aaa"', 'description = "test_desc_3"') + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + old_row_3 = row_3 + row_3 = (3, 'test_name_3', 'testing aaa') + self.assertEqual(len(results), 3) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + self.assertNotIn(old_row_2, results) + self.assertNotIn(old_row_3, results) + + # Update row 1 and verify change. + self.connector.query.update(table_name, 'id = 4', 'id = 1') + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + old_row_1 = row_1 + row_1 = (4, 'test_name_1', 'test_desc_1') + self.assertEqual(len(results), 3) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + self.assertNotIn(old_row_1, results) + self.assertNotIn(old_row_2, results) + self.assertNotIn(old_row_3, results) + + with self.subTest('Without WHERE clause'): + pass + # raise NotImplementedError() + + def test__delete__success(self): + """ + Test `DELETE` query when table does not exist. + """ + table_name = 'test_queries__delete__success' + + # Verify table exists. + try: + self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_query)) + except MySQLdb.OperationalError: + # Table already exists, as we want. + pass + + # Initialize state. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 0) + row_1 = (1, 'test_name_1', 'test_desc_1') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_1)) + row_2 = (2, 'test_name_2', 'test_desc_2') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_2)) + row_3 = (3, 'test_name_3', 'test_desc_3') + self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_3)) + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 3) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + + # Remove record 2. + self.connector.query.delete(table_name, 'id = 2') + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 2) + self.assertIn(row_1, results) + self.assertNotIn(row_2, results) + self.assertIn(row_3, results) + + # Remove record 1. + self.connector.query.delete(table_name, 'id = 1') + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 1) + self.assertNotIn(row_1, results) + self.assertNotIn(row_2, results) + self.assertIn(row_3, results) + + # Remove record 1. + self.connector.query.delete(table_name, 'id = 3') + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 0)