Skip to content
Snippets Groups Projects
Commit 25d064c6 authored by Brandon Rodriguez's avatar Brandon Rodriguez
Browse files

Implement insert_many query

parent c6ba0a07
Branches
Tags
No related merge requests found
......@@ -56,6 +56,27 @@ class BaseQuery:
results = []
return results
def execute_many(self, query, values, display_query=True):
""""""
if display_query:
self._base.display.query(query)
# Create connection and execute query.
cursor = self._base._connection.cursor()
cursor.executemany(query, values)
# Get results.
results = self._fetch_results(cursor)
# Close connection.
self._base._connection.commit()
cursor.close()
# Return results.
if results is None:
results = []
return results
def _fetch_results(self, cursor):
"""Helper function to fetch query results, based on database type."""
raise NotImplementedError('Please override the connection.query._fetch_results() function.')
......@@ -127,6 +127,57 @@ class BaseRecords:
return results
def insert_many(self, table_name, values_clause, columns_clause=None, display_query=True, display_results=True):
""""""
# Check that provided table name is valid format.
if not self._base.validate.table_name(table_name):
raise ValueError('Invalid table name of "{0}".'.format(table_name))
# Check that provided COLUMNS clause is valid format.
columns_clause = self._base.validate.sanitize_columns_clause(columns_clause)
# Check that provided VALUES clause is valid format.
# Must be array format.
if not isinstance(values_clause, list) and not isinstance(values_clause, tuple):
raise ValueError('VALUES clause for INSERT_MANY queries must be in list/tuple format.')
values_clause = self._base.validate.sanitize_values_clause(values_clause)
# Check for values that might need formatting.
# For example, if we find date/datetime objects, we automatically convert to a str value that won't error.
if isinstance(values_clause, list) or isinstance(values_clause, tuple):
updated_values_clause = ()
# Check each sub-item.
for item in values_clause:
if isinstance(item, datetime.datetime):
# Is a datetime object. Convert to string.
item = item.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(item, datetime.date):
# Is a date object. Convert to string.
item = item.strftime('%Y-%m-%d')
# Add item to updated clause.
updated_values_clause += (item,)
# Replace original clause.
values_clause = updated_values_clause
else:
raise ValueError('In an execute_many, values clause must be a list or tuple.')
values_context = ', '.join('%s' for i in range(len(values_clause[0])))
# Insert record.
query = """
INSERT INTO {0}{1}
VALUES ({2});
""".format(table_name, columns_clause, values_context)
results = self._base.query.execute_many(query, values_clause, display_query=display_query)
if display_results:
self._base.display.results('{0}'.format(results))
return results
def update(self, table_name, values_clause, where_clause, display_query=True, display_results=True):
"""Updates record in provided table.
......
......@@ -878,6 +878,123 @@ class CoreRecordsTestMixin:
self.assertIn((1, test_datetime__2020.replace(microsecond=0), test_date__2020), results)
self.assertIn((2, test_datetime__2021.replace(microsecond=0), test_date__2021), results)
def test__insert_many__success(self):
"""
Test execute_many `INSERT` query.
"""
table_name = 'test_queries__insert_many__success'
# Verify table exists.
try:
self.connector.query.execute('CREATE TABLE {0}{1};'.format(table_name, self._columns_clause__basic))
except self.connector.errors.table_already_exists:
# Table already exists, as we want.
pass
# Verify starting state.
results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name))
self.assertEqual(len(results), 0)
# Generate row values.
row_1 = (1, 'test_name_1', 'test_desc_1')
row_2 = (2, 'test_name_2', 'test_desc_2')
row_3 = (3, 'test_name_3', 'test_desc_3')
row_4 = (4, 'test_name_4', 'test_desc_4')
row_5 = (5, 'test_name_5', 'test_desc_5')
row_6 = (6, 'test_name_6', 'test_desc_6')
row_7 = (7, 'test_name_7', 'test_desc_7')
row_8 = (8, 'test_name_8', 'test_desc_8')
row_9 = (9, 'test_name_9', 'test_desc_9')
row_10 = (10, 'test_name_10', 'test_desc_10')
with self.subTest('Run with one insert'):
# Run test query.
rows = [
row_1,
]
self.connector.records.insert_many(table_name, rows)
# Verify one record returned.
results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name))
self.assertEqual(len(results), 1)
self.assertIn(row_1, results)
# Reset table.
self.connector.tables.drop(table_name)
self.connector.tables.create(table_name, self._columns_clause__basic)
with self.subTest('Run with two inserts'):
# Run test query.
rows = [
row_1,
row_2,
]
self.connector.records.insert_many(table_name, rows)
# Verify one record returned.
results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name))
self.assertEqual(len(results), 2)
self.assertIn(row_1, results)
self.assertIn(row_2, results)
# Reset table.
self.connector.tables.drop(table_name)
self.connector.tables.create(table_name, self._columns_clause__basic)
with self.subTest('Run with five inserts'):
# Run test query.
rows = [
row_1,
row_2,
row_3,
row_4,
row_5,
]
self.connector.records.insert_many(table_name, rows)
# Verify five records returned.
results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name))
self.assertEqual(len(results), 5)
self.assertIn(row_1, results)
self.assertIn(row_2, results)
self.assertIn(row_3, results)
self.assertIn(row_4, results)
self.assertIn(row_5, results)
# Reset table.
self.connector.tables.drop(table_name)
self.connector.tables.create(table_name, self._columns_clause__basic)
with self.subTest('Run with ten inserts'):
# Run test query.
rows = [
row_1,
row_2,
row_3,
row_4,
row_5,
row_6,
row_7,
row_8,
row_9,
row_10,
]
self.connector.records.insert_many(table_name, rows)
# Verify ten records returned.
results = self.connector.query.execute('SELECT * FROM {0};'.format(table_name))
self.assertEqual(len(results), 10)
self.assertIn(row_1, results)
self.assertIn(row_2, results)
self.assertIn(row_3, results)
self.assertIn(row_4, results)
self.assertIn(row_5, results)
self.assertIn(row_6, results)
self.assertIn(row_7, results)
self.assertIn(row_8, results)
self.assertIn(row_9, results)
self.assertIn(row_10, results)
def test__update__basic__success(self):
"""
Test `UPDATE` query.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment