diff --git a/py_dbcn/connectors/core/query.py b/py_dbcn/connectors/core/query.py index 9e0fd66d2ca3ce0732a00608263de62ebbbfddb7..b6153e786214af208785cd73652756aad0c156f5 100644 --- a/py_dbcn/connectors/core/query.py +++ b/py_dbcn/connectors/core/query.py @@ -56,6 +56,27 @@ class BaseQuery: results = [] return results + def execute_many(self, query, values, display_query=True): + """""" + if display_query: + self._base.display.query(query) + + # Create connection and execute query. + cursor = self._base._connection.cursor() + cursor.executemany(query, values) + + # Get results. + results = self._fetch_results(cursor) + + # Close connection. + self._base._connection.commit() + cursor.close() + + # Return results. + if results is None: + results = [] + return results + def _fetch_results(self, cursor): """Helper function to fetch query results, based on database type.""" raise NotImplementedError('Please override the connection.query._fetch_results() function.') diff --git a/py_dbcn/connectors/core/records.py b/py_dbcn/connectors/core/records.py index 5df0e27da5ffc24ca19ba2e18cb0d92a078d2081..52b187a00d43e7f0975e8ccb3504560f014d94a4 100644 --- a/py_dbcn/connectors/core/records.py +++ b/py_dbcn/connectors/core/records.py @@ -127,6 +127,57 @@ class BaseRecords: return results + def insert_many(self, table_name, values_clause, columns_clause=None, display_query=True, display_results=True): + """""" + # Check that provided table name is valid format. + if not self._base.validate.table_name(table_name): + raise ValueError('Invalid table name of "{0}".'.format(table_name)) + + # Check that provided COLUMNS clause is valid format. + columns_clause = self._base.validate.sanitize_columns_clause(columns_clause) + + # Check that provided VALUES clause is valid format. + # Must be array format. + if not isinstance(values_clause, list) and not isinstance(values_clause, tuple): + raise ValueError('VALUES clause for INSERT_MANY queries must be in list/tuple format.') + values_clause = self._base.validate.sanitize_values_clause(values_clause) + + # Check for values that might need formatting. + # For example, if we find date/datetime objects, we automatically convert to a str value that won't error. + if isinstance(values_clause, list) or isinstance(values_clause, tuple): + updated_values_clause = () + + # Check each sub-item. + for item in values_clause: + + if isinstance(item, datetime.datetime): + # Is a datetime object. Convert to string. + item = item.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(item, datetime.date): + # Is a date object. Convert to string. + item = item.strftime('%Y-%m-%d') + + # Add item to updated clause. + updated_values_clause += (item,) + + # Replace original clause. + values_clause = updated_values_clause + else: + raise ValueError('In an execute_many, values clause must be a list or tuple.') + + values_context = ', '.join('%s' for i in range(len(values_clause[0]))) + + # Insert record. + query = """ + INSERT INTO {0}{1} + VALUES ({2}); + """.format(table_name, columns_clause, values_context) + results = self._base.query.execute_many(query, values_clause, display_query=display_query) + if display_results: + self._base.display.results('{0}'.format(results)) + + return results + def update(self, table_name, values_clause, where_clause, display_query=True, display_results=True): """Updates record in provided table. diff --git a/tests/connectors/core/test_records.py b/tests/connectors/core/test_records.py index eb3dcba10e0c15daad852be3d92a7b48c25bcdf0..c1de4bc6341750fe1e47cb9213806b9be8336be2 100644 --- a/tests/connectors/core/test_records.py +++ b/tests/connectors/core/test_records.py @@ -878,6 +878,123 @@ class CoreRecordsTestMixin: self.assertIn((1, test_datetime__2020.replace(microsecond=0), test_date__2020), results) self.assertIn((2, test_datetime__2021.replace(microsecond=0), test_date__2021), results) + def test__insert_many__success(self): + """ + Test execute_many `INSERT` query. + """ + table_name = 'test_queries__insert_many__success' + + # Verify table exists. + try: + self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_clause__basic)) + except self.connector.errors.table_already_exists: + # Table already exists, as we want. + pass + + # Verify starting state. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 0) + + # Generate row values. + row_1 = (1, 'test_name_1', 'test_desc_1') + row_2 = (2, 'test_name_2', 'test_desc_2') + row_3 = (3, 'test_name_3', 'test_desc_3') + row_4 = (4, 'test_name_4', 'test_desc_4') + row_5 = (5, 'test_name_5', 'test_desc_5') + row_6 = (6, 'test_name_6', 'test_desc_6') + row_7 = (7, 'test_name_7', 'test_desc_7') + row_8 = (8, 'test_name_8', 'test_desc_8') + row_9 = (9, 'test_name_9', 'test_desc_9') + row_10 = (10, 'test_name_10', 'test_desc_10') + + with self.subTest('Run with one insert'): + # Run test query. + rows = [ + row_1, + ] + self.connector.records.insert_many(table_name, rows) + + # Verify one record returned. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 1) + self.assertIn(row_1, results) + + # Reset table. + self.connector.tables.drop(table_name) + self.connector.tables.create(table_name, self._columns_clause__basic) + + with self.subTest('Run with two inserts'): + # Run test query. + rows = [ + row_1, + row_2, + ] + self.connector.records.insert_many(table_name, rows) + + # Verify one record returned. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 2) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + + # Reset table. + self.connector.tables.drop(table_name) + self.connector.tables.create(table_name, self._columns_clause__basic) + + with self.subTest('Run with five inserts'): + # Run test query. + rows = [ + row_1, + row_2, + row_3, + row_4, + row_5, + ] + self.connector.records.insert_many(table_name, rows) + + # Verify five records returned. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 5) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + self.assertIn(row_4, results) + self.assertIn(row_5, results) + + # Reset table. + self.connector.tables.drop(table_name) + self.connector.tables.create(table_name, self._columns_clause__basic) + + with self.subTest('Run with ten inserts'): + # Run test query. + rows = [ + row_1, + row_2, + row_3, + row_4, + row_5, + row_6, + row_7, + row_8, + row_9, + row_10, + ] + self.connector.records.insert_many(table_name, rows) + + # Verify ten records returned. + results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name)) + self.assertEqual(len(results), 10) + self.assertIn(row_1, results) + self.assertIn(row_2, results) + self.assertIn(row_3, results) + self.assertIn(row_4, results) + self.assertIn(row_5, results) + self.assertIn(row_6, results) + self.assertIn(row_7, results) + self.assertIn(row_8, results) + self.assertIn(row_9, results) + self.assertIn(row_10, results) + def test__update__basic__success(self): """ Test `UPDATE` query.