From 5205d5d225df0360fd410cf98e81ba04362f1e54 Mon Sep 17 00:00:00 2001 From: Brandon Rodriguez <brodriguez8774@gmail.com> Date: Sun, 20 Nov 2022 11:43:46 -0500 Subject: [PATCH] Add partial rework of validation logic Trying to figure out how it should be long term. --- py_dbcn/connectors/core/validate.py | 215 ++++++++-- py_dbcn/connectors/mysql/validate.py | 20 + py_dbcn/connectors/postgresql/validate.py | 9 + tests/connectors/core/test_validate.py | 462 ++++++++++++++++++---- 4 files changed, 593 insertions(+), 113 deletions(-) diff --git a/py_dbcn/connectors/core/validate.py b/py_dbcn/connectors/core/validate.py index dc8fdfa..51a363a 100644 --- a/py_dbcn/connectors/core/validate.py +++ b/py_dbcn/connectors/core/validate.py @@ -49,8 +49,9 @@ class BaseValidate: self._quote_order_by_format = None self._quote_str_literal_format = None self._reserved_function_names = None + self._reserved_keywords = None - # region Name Validation + # region Validation Functions def _identifier(self, identifier): """Generalized validation for "identifier naming conventions". @@ -90,6 +91,15 @@ class BaseValidate: pattern = re.compile('^([0-9a-zA-Z$_])+$') if not re.match(pattern, identifier): return (False, """does not match acceptable characters.\n Identifier is: {0}""".format(identifier)) + + # Check against known keyword values. Cannot use keywords without quotes. + if identifier.upper() in self._reserved_keywords: + return ( + False, + """matches a known keyword. Must be quoted to use this value. Identifier is: {0}""".format( + identifier, + ), + ) else: # Check against "quoted patterns". pattern = re.compile(u'^([\u0001-\u007F])+$', flags=re.UNICODE) @@ -247,21 +257,107 @@ class BaseValidate: # Passed checks. return True - # endregion Name Validation + def validate_select_clause(self, identifier): + """""" + + def validate_columns_clause(self, identifier): + """""" + # Ensure we have our reserved lists defined for this database type. + if not self._reserved_function_names: + raise ValueError('Reserved function list is not defined.') + if not self._reserved_keywords: + raise ValueError('Reserved keyword list is not defined.') + + # Ensure provided identifier is not null. + if identifier is None: + raise TypeError('Invalid table column. Is None.') + identifier = str(identifier).strip() - # region Clause Validation + # Check if value is quoted. + is_quoted = self._is_quoted(identifier) + + # Validate using "general identifier" logic. + results = self._identifier(identifier) + + if results[0] is False: + if is_quoted: + raise ValueError(u'Invalid table column of {0}. Column {1}'.format(str(identifier), results[1])) + else: + raise ValueError(u'Invalid table column of "{0}". Column {1}'.format(str(identifier), results[1])) + + # Passed checks. + return True + + def validate_where_clause(self, identifier): + """""" + + def validate_values_clause(self, identifier): + """""" + + def validate_order_by_clause(self, identifier): + """""" + # Ensure we have our reserved lists defined for this database type. + if not self._reserved_function_names: + raise ValueError('Reserved function list is not defined.') + if not self._reserved_keywords: + raise ValueError('Reserved keyword list is not defined.') + + # Ensure provided identifier is not null. + if identifier is None: + raise TypeError('Invalid table column. Is None.') + identifier = str(identifier).strip() + + if identifier.upper().endswith(' ASC'): + identifier = identifier[:-3].strip() + elif identifier.upper().endswith(' DESC'): + identifier = identifier[:-4].strip() + + # Check if value is quoted. + is_quoted = self._is_quoted(identifier) - def sanitize_select_identifier_clause(self, clause): + # Validate using "general identifier" logic. + results = self._identifier(identifier) + + if results[0] is False: + if is_quoted: + raise ValueError(u'Invalid table column of {0}. Column {1}'.format(str(identifier), results[1])) + else: + raise ValueError(u'Invalid table column of "{0}". Column {1}'.format(str(identifier), results[1])) + + # Passed checks. + return True + + def validate_limit_by_clause(self, identifier): + """""" + + # endregion Validation Functions + + # region Sanitization Functions + + def sanitize_select_identifier_clause(self, clause, as_str=True): """ Validates that provided clause follows acceptable format. :param clause: SELECT clause to validate. + :param as_str: Bool indicating if return value should be formatted as a str. Otherwise is list. :return: Properly formatted clause if possible, otherwise error. """ if not self._reserved_function_names: raise ValueError('Reserved keyword list is not defined.') - # Validate. - return self._inner_sanitize_columns(clause, allow_wildcard=True) + # Sanitize overall clause. + clause = self._inner_sanitize_columns(clause, allow_wildcard=True) + + # Check that each inner clause item is valid. + for item in clause: + self.validate_select_clause(item) + + # All items in clause were valid. Return validated and sanitized SELECT clause. + if as_str: + # Re-concatenate into single expected str format. + return ', '.join(clause) + else: + # Return as list. + return clause def sanitize_where_clause(self, clause): """ @@ -291,17 +387,30 @@ class BaseValidate: return clause - def sanitize_columns_clause(self, clause): + def sanitize_columns_clause(self, clause, as_str=True): """ Validates that provided clause follows acceptable format. :param clause: COLUMNS clause to validate. + :param as_str: Bool indicating if return value should be formatted as a str. Otherwise is list. :return: Properly formatted clause if possible, otherwise error. """ if not self._reserved_function_names: raise ValueError('Reserved keyword list is not defined.') - # Validate. - return self._inner_sanitize_columns(clause, allow_wildcard=False) + # Sanitize overall clause. + clause = self._inner_sanitize_columns(clause, allow_wildcard=False) + + # Check that each inner clause item is valid. + for item in clause: + self.validate_columns_clause(item) + + # All items in clause were valid. Return validated and sanitized SELECT clause. + if as_str: + # Re-concatenate into single expected str format. + return ', '.join(clause) + else: + # Return as list. + return clause def sanitize_values_clause(self, clause): """ @@ -408,10 +517,11 @@ class BaseValidate: # # Return formatted clause. # return ' VALUES ({0})'.format(', '.join(clause)) - def sanitize_order_by_clause(self, clause): + def sanitize_order_by_clause(self, clause, as_str=True): """ Validates that provided clause follows acceptable format. :param clause: ORDER_BY clause to validate. + :param as_str: Bool indicating if return value should be formatted as a str. Otherwise is list. :return: Properly formatted clause if possible, otherwise error. """ if not self._reserved_function_names: @@ -436,8 +546,18 @@ class BaseValidate: if clause == '': return '' - # Return formatted clause. - return '\nORDER BY {0}'.format(clause) + # Check that each inner clause item is valid. + for item in clause: + self.validate_order_by_clause(item) + + # All items in clause were valid. Return validated and sanitized SELECT clause. + if as_str: + # Re-concatenate into single expected str format. + clause = ', '.join(clause) + return '\nORDER BY {0}'.format(clause) + else: + # Return as list. + return clause def sanitize_limit_clause(self, clause): """ @@ -476,15 +596,20 @@ class BaseValidate: # Return formatted clause. return clause - # endregion Clause Validation + # endregion Sanitization Functions # region Helper Functions def _is_quoted(self, value): """Checks if provided value is quoted. - Aka, these are three "quoted" values: "id", `first_name`, 'last_name' - These are three not "quoted" values: id, first_name, last_name + Aka, these are three "quoted" values: "id", `first_name`, 'last_name' + These are not "quoted" values: + id, first_name, last_name + "id' + 'id" + `id' + etc... """ is_quoted = False if isinstance(value, str): @@ -497,8 +622,15 @@ class BaseValidate: return is_quoted - def _inner_sanitize_columns(self, clause, allow_wildcard=False, order_by=False): - """""" + def _inner_sanitize_columns(self, clause, allow_wildcard=False, order_by=False, as_str=False): + """Common logic used by multiple functions to sanitize columns-like values. + + :param clause: Clause to sanitize. + :param allow_wildcard: Bool indicating if wildcard is allowed for this instance. + :param order_by: Bool indicating if this is an order_by instance. + :param as_str: Bool indicating if return value should be formatted as a str. Otherwise is list. + :return: Str or List of sanitized values. + """ if allow_wildcard: quote_format = self._quote_identifier_format elif order_by: @@ -524,6 +656,18 @@ class BaseValidate: # Handle for all other types. clause = str(clause).strip() + # Check for descriptor values. + if clause.upper().startswith('COLUMNS ') or clause.upper().startswith('COLUMNS('): + clause = clause[7:].strip() + if clause.upper().startswith('WHERE ') or clause.upper().startswith('WHERE('): + clause = clause[5:].strip() + if clause.upper().startswith('VALUES ') or clause.upper().startswith('VALUES('): + clause = clause[6:].strip() + if clause.upper().startswith('ORDER BY ') or clause.upper().startswith('ORDER BY('): + clause = clause[8:].strip() + if clause.upper().startswith('LIMIT ') or clause.upper().startswith('LIMIT('): + clause = clause[5:].strip() + # Check for outer parens. if ( len(clause) > 1 @@ -603,28 +747,20 @@ class BaseValidate: raise ValueError('SELECT clause provided * with other params. * is only valid alone.') # Validate individual identifier. - item_identifier = item order_by_descriptor = '' if item != '*': if order_by: # To check identifier, trim possible ASC/DESC values. - if item_identifier.lower().endswith(' asc'): + if item.lower().endswith(' asc'): # Handle for ASC syntax. - item_identifier = item_identifier[:-4].rstrip() - item = item_identifier + item = item[:-4].rstrip() order_by_descriptor = ' ASC' - if item_identifier.lower().endswith(' desc'): + if item.lower().endswith(' desc'): # Handle for DESC syntax. - item_identifier = item_identifier[:-5].rstrip() - item = item_identifier + item = item[:-5].rstrip() order_by_descriptor = ' DESC' - # Check for valid identifier. - results = self._identifier(item_identifier) - if results[0] is False: - raise ValueError('Invalid identifier. Identifier {0}'.format(results[1])) - - # If we made it this far, item is valid. Escape with backticks and readd. + # If we made it this far, item is valid. Escape with proper quote format and readd. is_quoted = self._is_quoted(item) if is_quoted: # Was already quoted, but may not be with expected format. Reformat to guaranteed use expected format. @@ -632,7 +768,12 @@ class BaseValidate: elif item == '*': pass else: - # Was not quoted. Add quotes. + # Was not quoted. + # First double check that we don't have mismatched quotes. + if len(item) > 1 and item[0] in ['\'', '"', '`'] and item[-1] in ['\'', '"', '`']: + raise ValueError('Found mismatching quotes for identifier {0}'.format(item)) + + # Add quotes. item = '{1}{0}{1}{2}{3}'.format(item, quote_format, cast_identifier, order_by_descriptor) # Re-add function values. @@ -641,10 +782,12 @@ class BaseValidate: # Append updated value to clause. new_clause.append(item) - # All items in clause were valid. Re-concatenate into single expected str format. - clause = ', '.join(new_clause) - - # Return validated and sanitized SELECT clause. - return clause + # All items in clause were valid. Return validated and sanitized SELECT clause. + if as_str: + # Re-concatenate into single expected str format. + return ', '.join(new_clause) + else: + # Return as list. + return new_clause # endregion Helper Functions diff --git a/py_dbcn/connectors/mysql/validate.py b/py_dbcn/connectors/mysql/validate.py index c4fb05e..0c06117 100644 --- a/py_dbcn/connectors/mysql/validate.py +++ b/py_dbcn/connectors/mysql/validate.py @@ -136,6 +136,26 @@ class MysqlValidate(BaseValidate): 'YEAR', ] + # Keywords that cannot be used as identifiers, such as column names, unless quoted. + # We don't define the comprehensive list here, but get many common ones. + # See https://dev.mysql.com/doc/refman/8.0/en/keywords.html + self._reserved_keywords = list(self._reserved_function_names) + self._reserved_keywords += [ + 'ADD', + 'ALL', + 'ALWAYS', + 'ANALYZE', + 'AND', + 'ANY', + 'AS', + 'ASC', + 'ASCI', + 'AUTO_INCREMENT', + 'AVG', + + 'DESC', + ] + # Initialize database string-quote types. # Aka, what the database says is "okay" to surround string values with. self._quote_column_format = QUOTE_COLUMN_FORMAT diff --git a/py_dbcn/connectors/postgresql/validate.py b/py_dbcn/connectors/postgresql/validate.py index ee6d5e4..a5aa884 100644 --- a/py_dbcn/connectors/postgresql/validate.py +++ b/py_dbcn/connectors/postgresql/validate.py @@ -99,6 +99,15 @@ class PostgresqlValidate(BaseValidate): 'VARIANCE', 'YEAR', ] + # Keywords that cannot be used as identifiers, such as column names, unless quoted. + # We don't define the comprehensive list here, but get many common ones. + # See https://www.postgresql.org/docs/current/sql-keywords-appendix.html + self._reserved_keywords = list(self._reserved_function_names) + self._reserved_keywords += [ + 'ASC', + 'AS', + 'DESC', + ] # Initialize database string-quote types. # Aka, what the database says is "okay" to surround string values with. diff --git a/tests/connectors/core/test_validate.py b/tests/connectors/core/test_validate.py index fe9ddc7..65b86b7 100644 --- a/tests/connectors/core/test_validate.py +++ b/tests/connectors/core/test_validate.py @@ -32,6 +32,18 @@ class CoreValidateTestMixin: cls._quote_select_identifier_format = None cls._quote_str_literal_format = None + # def sql_injection(self): + # with self.subTest('SQL Injection - Drop database'): + # with self.assertRaises(): + # self.connector. + # self.connector.validate.validate_select_clause('DROP DATABASE {0}'.format(self.test_db_name_start)) + # + # with self.subTest('SQL Injection - Drop table'): + # with self.assertRaises(): + # self.connector.validate.validate_select_clause('DROP DATABASE {0}'.format(self.test_db_name_start)) + + + def test__column_quote_format(self): raise NotImplementedError('Check for column quote formatting not implemented.') @@ -44,7 +56,7 @@ class CoreValidateTestMixin: def test__str_literal_quote_format(self): raise NotImplementedError('Check for str literal quote formatting not implemented.') - # region Name Validation + # region Validation Functions def test__identifier__success(self): """ @@ -561,46 +573,6 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError): self.connector.validate.table_columns({'id': 'INT;'}) - def test__table_column__success(self): - """ - Test "table column" validation, when it should succeed. - """ - with self.subTest('"Permitted characters in unquoted Identifiers"'): - # Ensure capital letters validate. - self.assertTrue(self.connector.validate.table_column('ABCDEFGHIJKLMNOPQRSTUVWXYZ')) - - # Ensure lowercase characters validate. - self.assertTrue(self.connector.validate.table_column('abcdefghijklmnopqrstuvwxyz')) - - # Ensure integer characters validate. - self.assertTrue(self.connector.validate.table_column('0123456789')) - - # Ensure dollar and underscore validate. - self.assertTrue(self.connector.validate.table_column('_$')) - - with self.subTest('At max length - unquoted'): - test_str = 'Aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' - self.assertText(len(test_str), 64) - self.assertTrue(self.connector.validate.table_column(test_str)) - - with self.subTest('At max length - quoted'): - test_str = '`Aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`' - self.assertText(len(test_str), 66) - self.assertTrue(self.connector.validate.table_column(test_str)) - - with self.subTest( - '"Permitted characters in quoted identifiers include the full Unicode Basic Multilingual Plane (BMP), ' - 'except U+0000"' - ): - for index in range(127): - # Skip "unacceptable" values. - if (index + 1) in self.unallowed_unicode_index_list: - continue - - # Test value. - test_str = u'`' + chr(index + 1) + u'`' - self.assertTrue(self.connector.validate.table_column(test_str)) - def test__table_column__failure(self): """ Test "table column" validation, when it should fail. @@ -703,9 +675,241 @@ class CoreValidateTestMixin: self.assertIn('Invalid table column of ', str(err.exception)) self.assertIn('. Column does not match acceptable characters.', str(err.exception)) - # endregion Name Validation + def test__validate_select_clause__success(self): + """""" + + def test__validate_select_clause__failure(self): + """""" + + def test__validate_columns_clause__success(self): + """ + Test "table column" individual value validation, when it should succeed. + """ + with self.subTest('"Permitted characters in unquoted Identifiers"'): + # Ensure capital letters validate. + self.assertTrue(self.connector.validate.validate_columns_clause('ABCDEFGHIJKLMNOPQRSTUVWXYZ')) + + # Ensure lowercase characters validate. + self.assertTrue(self.connector.validate.validate_columns_clause('abcdefghijklmnopqrstuvwxyz')) + + # Ensure integer characters validate. + self.assertTrue(self.connector.validate.validate_columns_clause('0123456789')) + + # Ensure dollar and underscore validate. + self.assertTrue(self.connector.validate.validate_columns_clause('_$')) + + with self.subTest('At max length - unquoted'): + test_str = 'Aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' + self.assertText(len(test_str), 64) + self.assertTrue(self.connector.validate.validate_columns_clause(test_str)) - # region Clause Validation + with self.subTest('At max length - quoted'): + test_str = '`Aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`' + self.assertText(len(test_str), 66) + self.assertTrue(self.connector.validate.validate_columns_clause(test_str)) + + with self.subTest( + '"Permitted characters in quoted identifiers include the full Unicode Basic Multilingual Plane (BMP), ' + 'except U+0000"' + ): + for index in range(127): + # Skip "unacceptable" values. + if (index + 1) in self.unallowed_unicode_index_list: + continue + + # Test value. + test_str = u'`' + chr(index + 1) + u'`' + self.assertTrue(self.connector.validate.validate_columns_clause(test_str)) + + with self.subTest('Basic common column values - As-is'): + self.assertTrue(self.connector.validate.validate_columns_clause('name')) + self.assertTrue(self.connector.validate.validate_columns_clause('description')) + self.assertTrue(self.connector.validate.validate_columns_clause('id')) + self.assertTrue(self.connector.validate.validate_columns_clause('code')) + self.assertTrue(self.connector.validate.validate_columns_clause('size')) + self.assertTrue(self.connector.validate.validate_columns_clause('type')) + self.assertTrue(self.connector.validate.validate_columns_clause('quantity')) + self.assertTrue(self.connector.validate.validate_columns_clause('qty')) + self.assertTrue(self.connector.validate.validate_columns_clause('status')) + self.assertTrue(self.connector.validate.validate_columns_clause('order')) + self.assertTrue(self.connector.validate.validate_columns_clause('order_id')) + self.assertTrue(self.connector.validate.validate_columns_clause('invoice')) + self.assertTrue(self.connector.validate.validate_columns_clause('invoice_id')) + self.assertTrue(self.connector.validate.validate_columns_clause('load')) + self.assertTrue(self.connector.validate.validate_columns_clause('load_id')) + self.assertTrue(self.connector.validate.validate_columns_clause('location')) + self.assertTrue(self.connector.validate.validate_columns_clause('location_id')) + self.assertTrue(self.connector.validate.validate_columns_clause('product')) + self.assertTrue(self.connector.validate.validate_columns_clause('product_id')) + self.assertTrue(self.connector.validate.validate_columns_clause('item')) + self.assertTrue(self.connector.validate.validate_columns_clause('item_id')) + self.assertTrue(self.connector.validate.validate_columns_clause('date_created')) + self.assertTrue(self.connector.validate.validate_columns_clause('date_modified')) + self.assertTrue(self.connector.validate.validate_columns_clause('last_edited')) + self.assertTrue(self.connector.validate.validate_columns_clause('last_active')) + self.assertTrue(self.connector.validate.validate_columns_clause('last_activity')) + self.assertTrue(self.connector.validate.validate_columns_clause('active')) + self.assertTrue(self.connector.validate.validate_columns_clause('is_active')) + + with self.subTest('Basic common column values - With Single Quotes'): + # Standard Values. + self.assertTrue(self.connector.validate.validate_columns_clause("'name'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'description'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'code'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'size'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'type'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'quantity'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'qty'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'status'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'order'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'order_id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'invoice'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'invoice_id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'load'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'load_id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'location'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'location_id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'product'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'product_id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'item'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'item_id'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'date_created'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'date_modified'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'last_edited'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'last_active'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'last_activity'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'active'")) + self.assertTrue(self.connector.validate.validate_columns_clause("'is_active'")) + + # Keyword values that fail without quotes, but succeed here. + self.assertTrue(self.connector.validate.validate_columns_clause("'desc'")) + + with self.subTest('Basic common column values - With Double Quotes'): + # Standard values. + self.assertTrue(self.connector.validate.validate_columns_clause('"name"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"description"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"code"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"size"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"type"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"quantity"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"qty"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"status"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"order"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"order_id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"invoice"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"invoice_id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"load"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"load_id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"location"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"location_id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"product"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"product_id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"item"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"item_id"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"date_created"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"date_modified"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"last_edited"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"last_active"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"last_activity"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"active"')) + self.assertTrue(self.connector.validate.validate_columns_clause('"is_active"')) + + # Keyword values that fail without quotes, but succeed here. + self.assertTrue(self.connector.validate.validate_columns_clause('"desc"')) + + with self.subTest('Basic common column values - With Backtick Quotes'): + # Standard values. + self.assertTrue(self.connector.validate.validate_columns_clause('`name`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`description`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`code`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`size`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`type`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`quantity`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`qty`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`status`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`order`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`order_id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`invoice`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`invoice_id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`load`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`load_id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`location`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`location_id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`product`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`product_id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`item`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`item_id`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`date_created`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`date_modified`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`last_edited`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`last_active`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`last_activity`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`active`')) + self.assertTrue(self.connector.validate.validate_columns_clause('`is_active`')) + + # Keyword values that fail without quotes, but succeed here. + self.assertTrue(self.connector.validate.validate_columns_clause('`desc`')) + + with self.subTest('Special cases'): + # Contains space. + self.assertTrue(self.connector.validate.validate_columns_clause('"test value"')) + + # Contains inner apostraphe. + self.assertTrue(self.connector.validate.validate_columns_clause('"Customer\'s Price"')) + + # Contains inner quotes. + self.assertTrue(self.connector.validate.validate_columns_clause('"John "Spaceman" Johnny"')) + + def test__validate_columns_clause__failure(self): + """""" + # # Test none. + # self.assertFalse(self.connector.validate.validate_columns_clause(None)) + # + # # Test empty str. + # self.assertFalse(self.connector.validate.validate_columns_clause('')) + + with self.subTest('Common column keyword values that will fail without quotes'): + with self.assertRaises(ValueError) as err: + self.assertFalse(self.connector.validate.validate_columns_clause('desc')) + self.assertText( + ( + 'Invalid table column of "desc". Column matches a known keyword. ' + 'Must be quoted to use this value. Identifier is: desc' + ), + str(err.exception) + ) + + def test__validate_where_clause__success(self): + """""" + + def test__validate_where_clause__failure(self): + """""" + + def test__validate_values_clause__success(self): + """""" + + def test__validate_values_clause__failure(self): + """""" + + def test__validate_order_by_clause__success(self): + """""" + + def test__validate_order_by_clause__failure(self): + """""" + + def test__validate_limit_by_clause__success(self): + """""" + + def test__validate_limit_by_clause__failure(self): + """""" + + # endregion Validation Functions + + + # region Sanitization Functions def test__sanitize_select_identifier_clause__success(self): """ @@ -1184,7 +1388,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_select_identifier_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id\'', + 'Found mismatching quotes for identifier "id\'', str(err.exception), ) @@ -1193,7 +1397,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_select_identifier_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id"', + 'Found mismatching quotes for identifier \'id"', str(err.exception), ) @@ -1202,7 +1406,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_select_identifier_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id\'', + 'Found mismatching quotes for identifier `id\'', str(err.exception), ) @@ -1211,7 +1415,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_select_identifier_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id`', + 'Found mismatching quotes for identifier \'id`', str(err.exception), ) @@ -1220,7 +1424,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_select_identifier_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id`', + 'Found mismatching quotes for identifier "id`', str(err.exception), ) @@ -1229,7 +1433,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_select_identifier_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id"', + 'Found mismatching quotes for identifier `id"', str(err.exception), ) @@ -1268,6 +1472,13 @@ class CoreValidateTestMixin: result = self.connector.validate.sanitize_columns_clause(' id ') self.assertText(result, self._quote_columns_format.format('id')) + # Single val provided and COLUMNS. + result = self.connector.validate.sanitize_columns_clause('COLUMNS (id)') + self.assertText(result, self._quote_columns_format.format('id')) + # With extra whitespace. + result = self.connector.validate.sanitize_columns_clause(' COLUMNS ( id ) ') + self.assertText(result, self._quote_columns_format.format('id')) + # Two vals provided. result = self.connector.validate.sanitize_columns_clause('id, name') self.assertText( @@ -1716,7 +1927,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_columns_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id\'', + 'Found mismatching quotes for identifier "id\'', str(err.exception), ) @@ -1725,7 +1936,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_columns_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id"', + 'Found mismatching quotes for identifier \'id"', str(err.exception), ) @@ -1734,7 +1945,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_columns_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id\'', + 'Found mismatching quotes for identifier `id\'', str(err.exception), ) @@ -1743,7 +1954,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_columns_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id`', + 'Found mismatching quotes for identifier \'id`', str(err.exception), ) @@ -1752,7 +1963,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_columns_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id`', + 'Found mismatching quotes for identifier "id`', str(err.exception), ) @@ -1761,7 +1972,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_columns_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id"', + 'Found mismatching quotes for identifier `id"', str(err.exception), ) @@ -3008,7 +3219,7 @@ class CoreValidateTestMixin: # # identifier = """\"id'""" # # result = self.connector.validate.sanitize_values_clause(identifier) # # self.assertText( - # # 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id\'', + # # 'Found mismatching quotes for identifier "id\'', # # result, # # ) # # @@ -3016,7 +3227,7 @@ class CoreValidateTestMixin: # # identifier = """'id\"""" # # result = self.connector.validate.sanitize_values_clause(identifier) # # self.assertText( - # # 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id"', + # # 'Found mismatching quotes for identifier \'id"', # # result, # # ) # # @@ -3024,7 +3235,7 @@ class CoreValidateTestMixin: # # identifier = "`id'" # # result = self.connector.validate.sanitize_values_clause(identifier) # # self.assertText( - # # 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id\'', + # # 'Found mismatching quotes for identifier `id\'', # # result, # # ) # # @@ -3032,7 +3243,7 @@ class CoreValidateTestMixin: # # identifier = "'id`" # # result = self.connector.validate.sanitize_values_clause(identifier) # # self.assertText( - # # 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id`', + # # 'Found mismatching quotes for identifier \'id`', # # result, # # ) # # @@ -3040,7 +3251,7 @@ class CoreValidateTestMixin: # # identifier = '"id`' # # result = self.connector.validate.sanitize_values_clause(identifier) # # self.assertText( - # # 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id`', + # # 'Found mismatching quotes for identifier "id`', # # result, # # ) # # @@ -3048,7 +3259,7 @@ class CoreValidateTestMixin: # # identifier = '`id"' # # result = self.connector.validate.sanitize_values_clause(identifier) # # self.assertText( - # # 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id"', + # # 'Found mismatching quotes for identifier `id"', # # result, # # ) @@ -3615,7 +3826,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_order_by_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id\'', + 'Found mismatching quotes for identifier "id\'', str(err.exception), ) @@ -3624,7 +3835,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_order_by_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id"', + 'Found mismatching quotes for identifier \'id"', str(err.exception), ) @@ -3633,7 +3844,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_order_by_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id\'', + 'Found mismatching quotes for identifier `id\'', str(err.exception), ) @@ -3642,7 +3853,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_order_by_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: \'id`', + 'Found mismatching quotes for identifier \'id`', str(err.exception), ) @@ -3651,7 +3862,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_order_by_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: "id`', + 'Found mismatching quotes for identifier "id`', str(err.exception), ) @@ -3660,7 +3871,7 @@ class CoreValidateTestMixin: with self.assertRaises(ValueError) as err: self.connector.validate.sanitize_order_by_clause(identifier) self.assertText( - 'Invalid identifier. Identifier does not match acceptable characters.\n Identifier is: `id"', + 'Found mismatching quotes for identifier `id"', str(err.exception), ) @@ -3754,7 +3965,8 @@ class CoreValidateTestMixin: self.connector.validate.sanitize_limit_clause('abc') self.assertText('The LIMIT clause expects a positive integer.', str(err.exception)) - # endregion Clause Validation + # endregion Sanitization Functions + # region Helper Functions @@ -3772,7 +3984,7 @@ class CoreValidateTestMixin: self.assertTrue(self.connector.validate._is_quoted("'True False'")) self.assertTrue(self.connector.validate._is_quoted('`True False`')) - # And commas. + # With commas. self.assertTrue(self.connector.validate._is_quoted('"True, False"')) self.assertTrue(self.connector.validate._is_quoted("'True, False'")) self.assertTrue(self.connector.validate._is_quoted('`True, False`')) @@ -3787,13 +3999,109 @@ class CoreValidateTestMixin: # Integer type. self.assertFalse(self.connector.validate._is_quoted(1)) - # Basic strings. - self.assertFalse(self.connector.validate._is_quoted('True')) - - # With spaces. - self.assertFalse(self.connector.validate._is_quoted('True False')) - - # And commas. - self.assertFalse(self.connector.validate._is_quoted('True, False')) + with self.subTest('Str defined by single quotes'): + # Basic strings. + self.assertFalse(self.connector.validate._is_quoted('True')) + + # With spaces. + self.assertFalse(self.connector.validate._is_quoted('True False')) + + # With commas. + self.assertFalse(self.connector.validate._is_quoted('True, False')) + + # With mismatching quote types. + self.assertFalse(self.connector.validate._is_quoted('\'True"')) + self.assertFalse(self.connector.validate._is_quoted('"True\'')) + self.assertFalse(self.connector.validate._is_quoted('`True"')) + self.assertFalse(self.connector.validate._is_quoted('"True`')) + self.assertFalse(self.connector.validate._is_quoted('`True\'')) + self.assertFalse(self.connector.validate._is_quoted('\'True`')) + + # With value mid-string. + self.assertFalse(self.connector.validate._is_quoted('we\'re')) + self.assertFalse(self.connector.validate._is_quoted('\'twas')) + self.assertFalse(self.connector.validate._is_quoted('Marcus\'')) + self.assertFalse(self.connector.validate._is_quoted('Marcus\' Market')) + self.assertFalse(self.connector.validate._is_quoted('Marcus \'Fresh\' Market')) + self.assertFalse(self.connector.validate._is_quoted('we"re')) + self.assertFalse(self.connector.validate._is_quoted('"twas')) + self.assertFalse(self.connector.validate._is_quoted('Marcus"')) + self.assertFalse(self.connector.validate._is_quoted('Marcus" Market')) + self.assertFalse(self.connector.validate._is_quoted('Marcus "Fresh" Market')) + self.assertFalse(self.connector.validate._is_quoted('we`re')) + self.assertFalse(self.connector.validate._is_quoted('`twas')) + self.assertFalse(self.connector.validate._is_quoted('Marcus`')) + self.assertFalse(self.connector.validate._is_quoted('Marcus` Market')) + self.assertFalse(self.connector.validate._is_quoted('Marcus `Fresh` Market')) + + with self.subTest('Str defined by double quotes'): + # Basic strings. + self.assertFalse(self.connector.validate._is_quoted("True")) + + # With spaces. + self.assertFalse(self.connector.validate._is_quoted("True False")) + + # With commas. + self.assertFalse(self.connector.validate._is_quoted("True, False")) + + # With mismatching quote types. + self.assertFalse(self.connector.validate._is_quoted("'True\"")) + self.assertFalse(self.connector.validate._is_quoted("\"True'")) + self.assertFalse(self.connector.validate._is_quoted("`True\"")) + self.assertFalse(self.connector.validate._is_quoted("\"True`")) + self.assertFalse(self.connector.validate._is_quoted("`True\'")) + self.assertFalse(self.connector.validate._is_quoted("'True`")) + + # With value mid-string. + self.assertFalse(self.connector.validate._is_quoted("we're")) + self.assertFalse(self.connector.validate._is_quoted("'twas")) + self.assertFalse(self.connector.validate._is_quoted("Marcus'")) + self.assertFalse(self.connector.validate._is_quoted("Marcus' Market")) + self.assertFalse(self.connector.validate._is_quoted("Marcus 'Fresh' Market")) + self.assertFalse(self.connector.validate._is_quoted("we\"re")) + self.assertFalse(self.connector.validate._is_quoted("\"twas")) + self.assertFalse(self.connector.validate._is_quoted("Marcus\"")) + self.assertFalse(self.connector.validate._is_quoted("Marcus\" Market")) + self.assertFalse(self.connector.validate._is_quoted("Marcus \"Fresh\" Market")) + self.assertFalse(self.connector.validate._is_quoted("we`re")) + self.assertFalse(self.connector.validate._is_quoted("`twas")) + self.assertFalse(self.connector.validate._is_quoted("Marcus`")) + self.assertFalse(self.connector.validate._is_quoted("Marcus` Market")) + self.assertFalse(self.connector.validate._is_quoted("Marcus `Fresh` Market")) + + with self.subTest('Str defined by triple quotes'): + # Basic strings. + self.assertFalse(self.connector.validate._is_quoted("""True""")) + + # With spaces. + self.assertFalse(self.connector.validate._is_quoted("""True False""")) + + # With commas. + self.assertFalse(self.connector.validate._is_quoted("""True, False""")) + + # With mismatching quote types. + self.assertFalse(self.connector.validate._is_quoted("""'True\"""")) + self.assertFalse(self.connector.validate._is_quoted("""\"True'""")) + self.assertFalse(self.connector.validate._is_quoted("""`True\"""")) + self.assertFalse(self.connector.validate._is_quoted("""\"True`""")) + self.assertFalse(self.connector.validate._is_quoted("""`True\'""")) + self.assertFalse(self.connector.validate._is_quoted("""'True`""")) + + # With value mid-string. + self.assertFalse(self.connector.validate._is_quoted("""we're""")) + self.assertFalse(self.connector.validate._is_quoted("""'twas""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus'""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus' Market""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus 'Fresh' Market""")) + self.assertFalse(self.connector.validate._is_quoted("""we"re""")) + self.assertFalse(self.connector.validate._is_quoted(""""twas""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus\"""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus" Market""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus "Fresh" Market""")) + self.assertFalse(self.connector.validate._is_quoted("""we`re""")) + self.assertFalse(self.connector.validate._is_quoted("""`twas""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus`""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus` Market""")) + self.assertFalse(self.connector.validate._is_quoted("""Marcus `Fresh` Market""")) # endregion Helper Functions -- GitLab