diff --git a/tests/connectors/core/test_records.py b/tests/connectors/core/test_records.py index 2322f815b43475374c4fcce0d9f62615b39e9f3c..599fde84c5158ff19babacf4e01d1009ae0c9b9b 100644 --- a/tests/connectors/core/test_records.py +++ b/tests/connectors/core/test_records.py @@ -27,17 +27,9 @@ class CoreRecordsTestMixin: """ cls.test_db_name_start = cls.test_db_name_start.format(cls.db_type) - # Define database-specific query values. - cls._basic_table_columns = textwrap.dedent( - """ - ( - id INT(11) NOT NULL AUTO_INCREMENT, - PRIMARY KEY (id) - ) - """ - ).strip() + cls._columns_query__basic = None + cls._columns_query__datetime = None - cls.error_handler__table_does_not_exist = None cls.error_handler__table_already_exists = None def test_error_catch_types(self): @@ -112,8 +104,11 @@ class CoreRecordsTestMixin: self.assertIn(row_3, results) with self.subTest('SELECT query when table column uses "reserved keyword"'): - # "Group" is considered a MySQL keyword. As long as this doesn't raise an error, it worked. - self.connector.tables.add_column(table_name, '`group` VARCHAR(100)') + # "Group" is considered a SQL keyword. As long as this doesn't raise an error, it worked. + self.connector.tables.add_column( + table_name, + '{0}group{0} VARCHAR(100)'.format(self.connector.validate._quote_identifier_format), + ) results = self.connector.records.select(table_name) # Verify two records returned, now with an extra "group" field that shows null. @@ -140,7 +135,10 @@ class CoreRecordsTestMixin: with self.subTest('SELECT with WHERE when table has no records'): # Run test query. - results = self.connector.records.select(table_name, where_clause='description = "test aaa"') + results = self.connector.records.select( + table_name, + where_clause='description = {0}test aaa{0}'.format(self.connector.validate._quote_str_literal_format), + ) # Verify no records returned. self.assertEqual(len(results), 0) @@ -151,7 +149,10 @@ class CoreRecordsTestMixin: self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_1)) # Run test query. - results = self.connector.records.select(table_name, where_clause='description = "test aaa"') + results = self.connector.records.select( + table_name, + where_clause='description = {0}test aaa{0}'.format(self.connector.validate._quote_str_literal_format) + ) # Verify no records returned. self.assertEqual(len(results), 0) @@ -162,7 +163,10 @@ class CoreRecordsTestMixin: self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_2)) # Run test query. - results = self.connector.records.select(table_name, where_clause='description = "test aaa"') + results = self.connector.records.select( + table_name, + where_clause='description = {0}test aaa{0}'.format(self.connector.validate._quote_str_literal_format) + ) # Verify no records returned. self.assertEqual(len(results), 1) @@ -175,7 +179,10 @@ class CoreRecordsTestMixin: self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_3)) # Run test query. - results = self.connector.records.select(table_name, where_clause='description = "test aaa"') + results = self.connector.records.select( + table_name, + where_clause='description = {0}test aaa{0}'.format(self.connector.validate._quote_str_literal_format), + ) # Verify no records returned. self.assertEqual(len(results), 1) @@ -189,7 +196,10 @@ class CoreRecordsTestMixin: self.connector.query.execute('INSERT INTO {0} VALUES {1};'.format(table_name, row_4)) # Run test query. - results = self.connector.records.select(table_name, where_clause='description = "test aaa"') + results = self.connector.records.select( + table_name, + where_clause='description = {0}test aaa{0}'.format(self.connector.validate._quote_str_literal_format), + ) # Verify no records returned. self.assertEqual(len(results), 2) @@ -243,7 +253,7 @@ class CoreRecordsTestMixin: # Verify table exists. try: - self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_query__datetime)) + self.connector.query.execute('CREATE TABLE {0} {1};'.format(table_name, self._columns_query__datetime)) except self.error_handler__table_already_exists: # Table already exists, as we want. pass @@ -327,7 +337,11 @@ class CoreRecordsTestMixin: with self.subTest('With WHERE clause'): # Update row 2 and verify change. - self.connector.records.update(table_name, 'name = "updated name"', 'id = 2') + self.connector.records.update( + table_name, + 'name = {0}updated name{0}'.format(self.connector.validate._quote_str_literal_format), + 'id = 2', + ) results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) old_row_2 = row_2 row_2 = (2, 'updated name', 'test_desc_2') @@ -338,7 +352,11 @@ class CoreRecordsTestMixin: self.assertNotIn(old_row_2, results) # Update row 3 and verify change. - self.connector.records.update(table_name, 'description = "testing aaa"', 'description = "test_desc_3"') + self.connector.records.update( + table_name, + 'description = {0}testing aaa{0}'.format(self.connector.validate._quote_str_literal_format), + 'description = {0}test_desc_3{0}'.format(self.connector.validate._quote_str_literal_format), + ) results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) old_row_3 = row_3 row_3 = (3, 'test_name_3', 'testing aaa') @@ -367,7 +385,11 @@ class CoreRecordsTestMixin: # Note, to help with database integrity (and prevent accidentally updating all rows via accidental clause # omission), we still need to provide the where clause. It's a required arg. # But providing a blank clause means we're intentionally setting it to empty, and thus is allowed. - self.connector.records.update(table_name, 'name = "test name"', '') + self.connector.records.update( + table_name, + 'name = {0}test name{0}'.format(self.connector.validate._quote_str_literal_format), + '', + ) # Verify all rows updated. results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) @@ -459,12 +481,31 @@ class CoreRecordsTestMixin: self.connector.records.update( table_name, ( - 'test_datetime = "{0}", '.format(updated_test_datetime_str__2021) + - 'test_date = "{0}"'.format(updated_test_date_str__2021) + 'test_datetime = {1}{0}{1}, '.format( + updated_test_datetime_str__2021, + self.connector.validate._quote_str_literal_format, + ) + + 'test_date = {1}{0}{1}'.format( + updated_test_date_str__2021, + self.connector.validate._quote_str_literal_format, + ) ), 'id = 2', ) results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + + # Sanitize by removing microsecond. Makes it easier to test, while still getting enough accuracy. + sanitized_results = () + for row_index in range(len(results)): + sanitized_row = () + for col_index in range(len(results[row_index])): + try: + sanitized_row += (results[row_index][col_index].replace(microsecond=0),) + except: + sanitized_row += (results[row_index][col_index],) + sanitized_results += (sanitized_row,) + results = sanitized_results + row_2 = (2, updated_test_datetime__2021.replace(microsecond=0), updated_test_date__2021) old_row_2 = (2, test_datetime__2021.replace(microsecond=0), test_date__2021) self.assertEqual(len(results), 3) @@ -483,12 +524,31 @@ class CoreRecordsTestMixin: self.connector.records.update( table_name, ( - 'test_datetime = "{0}", '.format(updated_test_datetime__2022) + - 'test_date = "{0}"'.format(updated_test_date__2022) + 'test_datetime = {1}{0}{1}, '.format( + updated_test_datetime__2022, + self.connector.validate._quote_str_literal_format, + ) + + 'test_date = {1}{0}{1}'.format( + updated_test_date__2022, + self.connector.validate._quote_str_literal_format, + ) ), 'id = 3', ) results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + + # Sanitize by removing microsecond. Makes it easier to test, while still getting enough accuracy. + sanitized_results = () + for row_index in range(len(results)): + sanitized_row = () + for col_index in range(len(results[row_index])): + try: + sanitized_row += (results[row_index][col_index].replace(microsecond=0),) + except: + sanitized_row += (results[row_index][col_index],) + sanitized_results += (sanitized_row,) + results = sanitized_results + row_3 = (3, updated_test_datetime__2022.replace(microsecond=0), updated_test_date__2022) old_row_3 = (3, test_datetime__2022.replace(microsecond=0), test_date__2022) self.assertEqual(len(results), 3) diff --git a/tests/connectors/mysql/test_records.py b/tests/connectors/mysql/test_records.py index ab753402bf9694d53fe52000b51c89ed385cb7e9..bf849386f6d68bc1b3f1aa129d2ae39e0545747a 100644 --- a/tests/connectors/mysql/test_records.py +++ b/tests/connectors/mysql/test_records.py @@ -57,7 +57,6 @@ class TestMysqlRecords(TestMysqlDatabaseParent, CoreRecordsTestMixin): """ ).strip() - cls.error_handler__table_does_not_exist = cls.db_error_handler.OperationalError cls.error_handler__table_already_exists = cls.db_error_handler.OperationalError def test_error_catch_types(self): @@ -73,7 +72,7 @@ class TestMysqlRecords(TestMysqlDatabaseParent, CoreRecordsTestMixin): with self.subTest('Verify handling when database already exists'): # Make sure we're using a table name that is not already created. table_name = 'test_table' - self.connector.tables.create(table_name, self._basic_table_columns) + self.connector.tables.create(table_name, self._columns_query__basic) results = self.connector.tables.show() if table_name not in results: @@ -81,4 +80,4 @@ class TestMysqlRecords(TestMysqlDatabaseParent, CoreRecordsTestMixin): # Check that we use the correct handler. with self.assertRaises(self.error_handler__table_already_exists): - self.connector.query.execute('CREATE TABLE {0} {1};'.format(table_name, self._basic_table_columns)) + self.connector.query.execute('CREATE TABLE {0} {1};'.format(table_name, self._columns_query__basic)) diff --git a/tests/connectors/postgresql/test_records.py b/tests/connectors/postgresql/test_records.py index 661ebcff96a77639e85c2de9dff57892799754e1..d5ce428d06071bcd616d11d727e27a38efd21fcf 100644 --- a/tests/connectors/postgresql/test_records.py +++ b/tests/connectors/postgresql/test_records.py @@ -3,14 +3,14 @@ Tests for "records" logic of "PostgreSQL" DB Connector class. """ # System Imports. -import unittest +import textwrap # Internal Imports. -from config import mysql_config, sqlite_config -from py_dbcn.connectors import MysqlDbConnector, PostgresqlDbConnector, SqliteDbConnector +from .test_core import TestPostgresqlDatabaseParent +from tests.connectors.core.test_records import CoreRecordsTestMixin -class TestPostgresqlRecords(unittest.TestCase): +class TestPostgresqlRecords(TestPostgresqlDatabaseParent, CoreRecordsTestMixin): """ Tests "PostgreSQL" DB Connector class record logic. """ @@ -18,3 +18,64 @@ class TestPostgresqlRecords(unittest.TestCase): def setUpClass(cls): # Run parent setup logic. super().setUpClass() + + # Also call CoreTestMixin setup logic. + cls.set_up_class() + + # Define database name to use in tests. + cls.test_db_name = '{0}test_records'.format(cls.test_db_name_start) + + # Initialize database for tests. + cls.connector.database.create(cls.test_db_name) + cls.connector.database.use(cls.test_db_name) + + # Check that database has no tables. + results = cls.connector.tables.show() + if len(results) > 0: + for result in results: + cls.connector.tables.drop(result) + + # Define default table columns. + cls._columns_query__basic = textwrap.dedent( + """ + ( + id serial PRIMARY KEY, + name VARCHAR(100), + description VARCHAR(100) + ) + """ + ).strip() + cls._columns_query__datetime = textwrap.dedent( + """ + ( + id serial PRIMARY KEY, + test_datetime TIMESTAMP, + test_date DATE + ) + """ + ).strip() + + cls.error_handler__table_already_exists = cls.db_error_handler.errors.DuplicateTable + + def test_error_catch_types(self): + """Tests to ensure database ERROR types are properly caught. + + Ex: MySQL and PostgreSQL interfaces do not catch "Database does not exist" errors the same. + These tests make sure this error (and others) are properly caught, regardless of what database is + being called. + """ + # Call parent logic. + super().test_error_catch_types() + + with self.subTest('Verify handling when database already exists'): + # Make sure we're using a table name that is not already created. + table_name = 'test_table' + self.connector.tables.create(table_name, self._columns_query__basic) + + results = self.connector.tables.show() + if table_name not in results: + raise AssertionError('Table not yet present. Incorrect name provided.') + + # Check that we use the correct handler. + with self.assertRaises(self.error_handler__table_already_exists): + self.connector.query.execute('CREATE TABLE {0} {1};'.format(table_name, self._columns_query__basic))