- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.3k
fix(ingest): Handle empty column names from Snowflake access history #15106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
ecd54eb
              2b24093
              eff19c8
              00e5bfd
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -617,6 +617,7 @@ def _parse_audit_log_row( | |
| upstreams = [] | ||
| column_usage = {} | ||
|  | ||
| has_empty_column = False | ||
| for obj in direct_objects_accessed: | ||
| dataset = self.identifiers.gen_dataset_urn( | ||
| self.identifiers.get_dataset_identifier_from_qualified_name( | ||
|  | @@ -626,13 +627,32 @@ def _parse_audit_log_row( | |
|  | ||
| columns = set() | ||
| for modified_column in obj["columns"]: | ||
| columns.add( | ||
| self.identifiers.snowflake_identifier(modified_column["columnName"]) | ||
| ) | ||
| column_name = modified_column["columnName"] | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make it very much visible that we decided to parse the query, for which we would otherwise use info coming directly from audit log, this is for 2 reasons: 
 So to meet above conditions we need to: 
 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your suggestion. It makes sense to me. I've extended the SnowflakeQueriesExtractorReport to track queries with empty column names. Specifically, I've added a counter (num_queries_with_empty_column_name) and a LossyList (queries_with_empty_column_name) to store the IDs of affected queries. When an empty column name is detected, the source now logs an informational message including the query ID and a note about the performance impact, and updates the new report fields before falling back to SQL parsing. | ||
| if not column_name or not column_name.strip(): | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for addressing also the case of non-empty, but containing only white-spaces column names! | ||
| has_empty_column = True | ||
|          | ||
| break | ||
| columns.add(self.identifiers.snowflake_identifier(column_name)) | ||
|  | ||
| if has_empty_column: | ||
| break | ||
|  | ||
| upstreams.append(dataset) | ||
| column_usage[dataset] = columns | ||
|  | ||
| if has_empty_column: | ||
| return ObservedQuery( | ||
| query=query_text, | ||
| session_id=res["session_id"], | ||
| timestamp=timestamp, | ||
| user=user, | ||
| default_db=res["default_db"], | ||
| default_schema=res["default_schema"], | ||
| query_hash=get_query_fingerprint( | ||
| query_text, self.identifiers.platform, fast=True | ||
| ), | ||
| extra_info=extra_info, | ||
| ) | ||
|  | ||
| downstream = None | ||
| column_lineage = None | ||
| for obj in objects_modified: | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -168,13 +168,22 @@ def get_subjects( | |
| query_subject_urns.add(upstream) | ||
| if include_fields: | ||
| for column in sorted(self.column_usage.get(upstream, [])): | ||
| # Skip empty column names to avoid creating invalid URNs | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to print a message here, either  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning logs added. Thank you for the suggestion. | ||
| if not column or not column.strip(): | ||
| continue | ||
| query_subject_urns.add( | ||
| builder.make_schema_field_urn(upstream, column) | ||
| ) | ||
| if downstream_urn: | ||
| query_subject_urns.add(downstream_urn) | ||
| if include_fields: | ||
| for column_lineage in self.column_lineage: | ||
| # Skip empty downstream columns to avoid creating invalid URNs | ||
| if ( | ||
| not column_lineage.downstream.column | ||
| or not column_lineage.downstream.column.strip() | ||
| ): | ||
| continue | ||
| query_subject_urns.add( | ||
| builder.make_schema_field_urn( | ||
| downstream_urn, column_lineage.downstream.column | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -605,6 +605,184 @@ def test_report_counts_with_disabled_features(self): | |
| assert extractor.report.sql_aggregator.num_preparsed_queries == 0 | ||
|  | ||
|  | ||
| class TestSnowflakeQueryParser: | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is awesome! Maybe the comments should be more clear that we are testing the case where Snowflake sends as somehow corrupted results. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the comments. Revised. | ||
| """Tests for the SnowflakeQueriesExtractor._parse_query method.""" | ||
|  | ||
| def test_parse_query_with_empty_column_name_returns_observed_query(self): | ||
| """Test that queries with empty column names in direct_objects_accessed return ObservedQuery.""" | ||
| from datetime import datetime, timezone | ||
|  | ||
| from datahub.ingestion.source.snowflake.snowflake_utils import ( | ||
| SnowflakeIdentifierBuilder, | ||
| ) | ||
| from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery | ||
|  | ||
| mock_connection = Mock() | ||
| config = SnowflakeQueriesExtractorConfig( | ||
| window=BaseTimeWindowConfig( | ||
| start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), | ||
| end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), | ||
| ), | ||
| ) | ||
| mock_report = Mock() | ||
| mock_filters = Mock() | ||
| mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder) | ||
| mock_identifiers.platform = "snowflake" | ||
| mock_identifiers.identifier_config = SnowflakeIdentifierConfig() | ||
| mock_identifiers.gen_dataset_urn = Mock( | ||
| return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)" | ||
| ) | ||
| mock_identifiers.get_dataset_identifier_from_qualified_name = Mock( | ||
| return_value="test_db.test_schema.test_table" | ||
| ) | ||
| mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x) | ||
| mock_identifiers.get_user_identifier = Mock(return_value="test_user") | ||
|  | ||
| extractor = SnowflakeQueriesExtractor( | ||
| connection=mock_connection, | ||
| config=config, | ||
| structured_report=mock_report, | ||
| filters=mock_filters, | ||
| identifiers=mock_identifiers, | ||
| ) | ||
|  | ||
| # Simulate a Snowflake access history row with empty column name | ||
| import json | ||
|  | ||
| row = { | ||
| "QUERY_ID": "test_query_123", | ||
| "ROOT_QUERY_ID": None, | ||
| "QUERY_TEXT": "SELECT * FROM test_table WHERE id = 1", | ||
| "QUERY_TYPE": "SELECT", | ||
| "SESSION_ID": "session_123", | ||
| "USER_NAME": "test_user", | ||
| "ROLE_NAME": "test_role", | ||
| "QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc), | ||
| "QUERY_DURATION": 1000, | ||
| "ROWS_INSERTED": 0, | ||
| "ROWS_UPDATED": 0, | ||
| "ROWS_DELETED": 0, | ||
| "DEFAULT_DB": "test_db", | ||
| "DEFAULT_SCHEMA": "test_schema", | ||
| "QUERY_COUNT": 1, | ||
| "QUERY_SECONDARY_FINGERPRINT": "fingerprint_123", | ||
| "DIRECT_OBJECTS_ACCESSED": json.dumps( | ||
| [ | ||
| { | ||
| "objectName": "test_db.test_schema.test_table", | ||
| "objectDomain": "Table", | ||
| "columns": [ | ||
| {"columnName": "id"}, | ||
| {"columnName": ""}, # Empty column name | ||
| {"columnName": "name"}, | ||
| ], | ||
| } | ||
| ] | ||
| ), | ||
| "OBJECTS_MODIFIED": json.dumps([]), | ||
| "OBJECT_MODIFIED_BY_DDL": None, | ||
| } | ||
|  | ||
| users: dict = {} | ||
|  | ||
| result = extractor._parse_audit_log_row(row, users) | ||
|  | ||
| # Assert that an ObservedQuery is returned when there's an empty column | ||
| assert isinstance(result, ObservedQuery), ( | ||
| f"Expected ObservedQuery but got {type(result)}" | ||
| ) | ||
| assert result.query == "SELECT * FROM test_table WHERE id = 1" | ||
| assert result.session_id == "session_123" | ||
| assert result.default_db == "test_db" | ||
| assert result.default_schema == "test_schema" | ||
|  | ||
| def test_parse_query_with_valid_columns_returns_preparsed_query(self): | ||
| """Test that queries with all valid column names return PreparsedQuery.""" | ||
| from datetime import datetime, timezone | ||
|  | ||
| from datahub.ingestion.source.snowflake.snowflake_utils import ( | ||
| SnowflakeIdentifierBuilder, | ||
| ) | ||
| from datahub.sql_parsing.sql_parsing_aggregator import PreparsedQuery | ||
|  | ||
| mock_connection = Mock() | ||
| config = SnowflakeQueriesExtractorConfig( | ||
| window=BaseTimeWindowConfig( | ||
| start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), | ||
| end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), | ||
| ), | ||
| ) | ||
| mock_report = Mock() | ||
| mock_filters = Mock() | ||
| mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder) | ||
| mock_identifiers.platform = "snowflake" | ||
| mock_identifiers.identifier_config = SnowflakeIdentifierConfig() | ||
| mock_identifiers.gen_dataset_urn = Mock( | ||
| return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)" | ||
| ) | ||
| mock_identifiers.get_dataset_identifier_from_qualified_name = Mock( | ||
| return_value="test_db.test_schema.test_table" | ||
| ) | ||
| mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x) | ||
| mock_identifiers.get_user_identifier = Mock(return_value="test_user") | ||
|  | ||
| extractor = SnowflakeQueriesExtractor( | ||
| connection=mock_connection, | ||
| config=config, | ||
| structured_report=mock_report, | ||
| filters=mock_filters, | ||
| identifiers=mock_identifiers, | ||
| ) | ||
|  | ||
| # Simulate a Snowflake access history row with valid column names | ||
| import json | ||
|  | ||
| row = { | ||
| "QUERY_ID": "test_query_456", | ||
| "ROOT_QUERY_ID": None, | ||
| "QUERY_TEXT": "SELECT id, name FROM test_table", | ||
| "QUERY_TYPE": "SELECT", | ||
| "SESSION_ID": "session_456", | ||
| "USER_NAME": "test_user", | ||
| "ROLE_NAME": "test_role", | ||
| "QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc), | ||
| "QUERY_DURATION": 1000, | ||
| "ROWS_INSERTED": 0, | ||
| "ROWS_UPDATED": 0, | ||
| "ROWS_DELETED": 0, | ||
| "DEFAULT_DB": "test_db", | ||
| "DEFAULT_SCHEMA": "test_schema", | ||
| "QUERY_COUNT": 1, | ||
| "QUERY_SECONDARY_FINGERPRINT": "fingerprint_456", | ||
| "DIRECT_OBJECTS_ACCESSED": json.dumps( | ||
| [ | ||
| { | ||
| "objectName": "test_db.test_schema.test_table", | ||
| "objectDomain": "Table", | ||
| "columns": [ | ||
| {"columnName": "id"}, | ||
| {"columnName": "name"}, | ||
| ], | ||
| } | ||
| ] | ||
| ), | ||
| "OBJECTS_MODIFIED": json.dumps([]), | ||
| "OBJECT_MODIFIED_BY_DDL": None, | ||
| } | ||
|  | ||
| users: dict = {} | ||
|  | ||
| result = extractor._parse_audit_log_row(row, users) | ||
|  | ||
| # Assert that a PreparsedQuery is returned when all columns are valid | ||
| assert isinstance(result, PreparsedQuery), ( | ||
| f"Expected PreparsedQuery but got {type(result)}" | ||
| ) | ||
| assert result.query_text == "SELECT id, name FROM test_table" | ||
|  | ||
|  | ||
| class TestSnowflakeQueriesExtractorStatefulTimeWindowIngestion: | ||
| """Tests for stateful time window ingestion support in queries v2.""" | ||
|  | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| [ | ||
| { | ||
| "entityType": "query", | ||
| "entityUrn": "urn:li:query:test-delete-query", | ||
| "changeType": "UPSERT", | ||
| "aspectName": "queryProperties", | ||
| "aspect": { | ||
| "json": { | ||
| "customProperties": {}, | ||
| "statement": { | ||
| "value": "DELETE FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart AS DBT_INTERNAL_DEST\nWHERE\n (\n unique_key_input\n ) IN (\n SELECT DISTINCT\n unique_key_input\n FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart__dbt_tmp AS DBT_INTERNAL_SOURCE\n )", | ||
| "language": "SQL" | ||
| }, | ||
| "source": "SYSTEM", | ||
| "created": { | ||
| "time": 20000, | ||
| "actor": "urn:li:corpuser:_ingestion" | ||
| }, | ||
| "lastModified": { | ||
| "time": 20000, | ||
| "actor": "urn:li:corpuser:_ingestion" | ||
| } | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "entityType": "query", | ||
| "entityUrn": "urn:li:query:test-delete-query", | ||
| "changeType": "UPSERT", | ||
| "aspectName": "querySubjects", | ||
| "aspect": { | ||
| "json": { | ||
| "subjects": [ | ||
| { | ||
| "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD)" | ||
| }, | ||
| { | ||
| "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD),unique_key_input)" | ||
| }, | ||
| { | ||
| "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart,PROD)" | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "entityType": "query", | ||
| "entityUrn": "urn:li:query:test-delete-query", | ||
| "changeType": "UPSERT", | ||
| "aspectName": "dataPlatformInstance", | ||
| "aspect": { | ||
| "json": { | ||
| "platform": "urn:li:dataPlatform:snowflake" | ||
| } | ||
| } | ||
| } | ||
| ] | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| [ | ||
| { | ||
| "entityType": "query", | ||
| "entityUrn": "urn:li:query:test-select-gsheets-view", | ||
| "changeType": "UPSERT", | ||
| "aspectName": "queryProperties", | ||
| "aspect": { | ||
| "json": { | ||
| "customProperties": {}, | ||
| "statement": { | ||
| "value": "SELECT\n *\nFROM production.dsd_digital_private.gsheets_legacy_views\nWHERE\n id = 123", | ||
| "language": "SQL" | ||
| }, | ||
| "source": "SYSTEM", | ||
| "created": { | ||
| "time": 20000, | ||
| "actor": "urn:li:corpuser:_ingestion" | ||
| }, | ||
| "lastModified": { | ||
| "time": 20000, | ||
| "actor": "urn:li:corpuser:_ingestion" | ||
| } | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "entityType": "query", | ||
| "entityUrn": "urn:li:query:test-select-gsheets-view", | ||
| "changeType": "UPSERT", | ||
| "aspectName": "querySubjects", | ||
| "aspect": { | ||
| "json": { | ||
| "subjects": [ | ||
| { | ||
| "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD)" | ||
| }, | ||
| { | ||
| "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),id)" | ||
| }, | ||
| { | ||
| "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),name)" | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "entityType": "query", | ||
| "entityUrn": "urn:li:query:test-select-gsheets-view", | ||
| "changeType": "UPSERT", | ||
| "aspectName": "dataPlatformInstance", | ||
| "aspect": { | ||
| "json": { | ||
| "platform": "urn:li:dataPlatform:snowflake" | ||
| } | ||
| } | ||
| } | ||
| ] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to introduce a new flag? Why don't we return
ObservedQuerydirectly from the loop, instead of havingbreak-related logic?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout. Right, we don't need to wait for all iteration. Revised.