diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index a9e13e063b7f41..76bcac49395db7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -78,6 +78,7 @@ ConnectionWrapper, FileBackedList, ) +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -169,6 +170,10 @@ class SnowflakeQueriesExtractorReport(Report): num_stream_queries_observed: int = 0 num_create_temp_view_queries_observed: int = 0 num_users: int = 0 + num_queries_with_empty_column_name: int = 0 + queries_with_empty_column_name: LossyList[str] = dataclasses.field( + default_factory=LossyList + ) @dataclass @@ -626,9 +631,28 @@ def _parse_audit_log_row( columns = set() for modified_column in obj["columns"]: - columns.add( - self.identifiers.snowflake_identifier(modified_column["columnName"]) - ) + column_name = modified_column["columnName"] + # An empty column name in the audit log would cause an error when creating column URNs. + # To avoid this and still extract lineage, the raw query text is parsed as a fallback. + if not column_name or not column_name.strip(): + query_id = res["query_id"] + self.report.num_queries_with_empty_column_name += 1 + self.report.queries_with_empty_column_name.append(query_id) + logger.info(f"Query {query_id} has empty column name in audit log.") + + return ObservedQuery( + query=query_text, + session_id=res["session_id"], + timestamp=timestamp, + user=user, + default_db=res["default_db"], + default_schema=res["default_schema"], + query_hash=get_query_fingerprint( + query_text, self.identifiers.platform, fast=True + ), + extra_info=extra_info, + ) + columns.add(self.identifiers.snowflake_identifier(column_name)) upstreams.append(dataset) column_usage[dataset] = columns diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index a093ce4596a119..deb4e4127845f4 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -168,6 +168,12 @@ def get_subjects( query_subject_urns.add(upstream) if include_fields: for column in sorted(self.column_usage.get(upstream, [])): + # Skip empty column names to avoid creating invalid URNs + if not column or not column.strip(): + logger.warning( + f"Skipping empty upstream column name for query {self.query_id} on upstream {upstream}" + ) + continue query_subject_urns.add( builder.make_schema_field_urn(upstream, column) ) @@ -175,6 +181,15 @@ def get_subjects( query_subject_urns.add(downstream_urn) if include_fields: for column_lineage in self.column_lineage: + # Skip empty downstream columns to avoid creating invalid URNs + if ( + not column_lineage.downstream.column + or not column_lineage.downstream.column.strip() + ): + logger.warning( + f"Skipping empty downstream column name for query {self.query_id} on downstream {downstream_urn}" + ) + continue query_subject_urns.add( builder.make_schema_field_urn( downstream_urn, column_lineage.downstream.column diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py index 010de18f1dacfa..76fe68dc8ba03b 100644 --- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py @@ -1,4 +1,5 @@ -import datetime +import json +from datetime import datetime, timezone from typing import Optional from unittest.mock import Mock, patch @@ -21,9 +22,13 @@ SnowflakeQueriesExtractorConfig, ) from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery +from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeIdentifierBuilder, +) from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantQueriesRunSkipHandler, ) +from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery, PreparsedQuery class TestBuildAccessHistoryDatabaseFilterCondition: @@ -158,8 +163,8 @@ def test_build_access_history_database_filter_condition( """Test the _build_access_history_database_filter_condition method with various inputs.""" # Create a QueryLogQueryBuilder instance to test the method builder = QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=2), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=2), bucket_duration=BucketDuration.HOUR, dedup_strategy=QueryDedupStrategyType.STANDARD, database_pattern=database_pattern, @@ -176,8 +181,8 @@ class TestQueryLogQueryBuilder: def test_non_implemented_strategy(self): with pytest.raises(NotImplementedError): QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=1), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=1), bucket_duration=BucketDuration.HOUR, deny_usernames=None, dedup_strategy="DUMMY", # type: ignore[arg-type] @@ -186,8 +191,8 @@ def test_non_implemented_strategy(self): def test_fetch_query_for_all_strategies(self): for strategy in QueryDedupStrategyType: query = QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=1), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=1), bucket_duration=BucketDuration.HOUR, dedup_strategy=strategy, ).build_enriched_query_log_query() @@ -199,8 +204,8 @@ def test_query_with_database_pattern_filtering(self): database_pattern = AllowDenyPattern(allow=["PROD_.*"], deny=[".*_TEMP"]) query = QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=2), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=2), bucket_duration=BucketDuration.HOUR, deny_usernames=None, dedup_strategy=QueryDedupStrategyType.STANDARD, @@ -215,8 +220,8 @@ def test_query_with_additional_database_names(self): additional_database_names = ["SPECIAL_DB", "ANALYTICS_DB"] query = QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=2), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=2), bucket_duration=BucketDuration.HOUR, dedup_strategy=QueryDedupStrategyType.NONE, additional_database_names=additional_database_names, @@ -231,8 +236,8 @@ def test_query_with_combined_database_filtering(self): additional_database_names = ["SPECIAL_DB"] query = QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=2), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=2), bucket_duration=BucketDuration.HOUR, deny_usernames=None, dedup_strategy=QueryDedupStrategyType.STANDARD, @@ -350,8 +355,8 @@ def test_build_user_filter(self, deny_usernames, allow_usernames, expected): """Test the _build_user_filter method with various combinations of deny and allow patterns.""" # Create a QueryLogQueryBuilder instance to test the method builder = QueryLogQueryBuilder( - start_time=datetime.datetime(year=2021, month=1, day=1), - end_time=datetime.datetime(year=2021, month=1, day=2), + start_time=datetime(year=2021, month=1, day=1), + end_time=datetime(year=2021, month=1, day=2), bucket_duration=BucketDuration.HOUR, deny_usernames=deny_usernames, allow_usernames=allow_usernames, @@ -423,8 +428,8 @@ def _create_mock_extractor( config = SnowflakeQueriesExtractorConfig( window=BaseTimeWindowConfig( - start_time=datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc), - end_time=datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), ), include_lineage=include_lineage, include_queries=include_queries, @@ -605,6 +610,169 @@ def test_report_counts_with_disabled_features(self): assert extractor.report.sql_aggregator.num_preparsed_queries == 0 +class TestSnowflakeQueryParser: + """ + Tests for SnowflakeQueriesExtractor._parse_audit_log_row, + focusing on handling of corrupted audit log results from Snowflake. + """ + + def test_parse_query_with_empty_column_name_returns_observed_query(self): + """Test that a corrupted audit log entry with an empty column name falls back to ObservedQuery.""" + mock_connection = Mock() + config = SnowflakeQueriesExtractorConfig( + window=BaseTimeWindowConfig( + start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), + ), + ) + mock_report = Mock() + mock_filters = Mock() + mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder) + mock_identifiers.platform = "snowflake" + mock_identifiers.identifier_config = SnowflakeIdentifierConfig() + mock_identifiers.gen_dataset_urn = Mock( + return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)" + ) + mock_identifiers.get_dataset_identifier_from_qualified_name = Mock( + return_value="test_db.test_schema.test_table" + ) + mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x) + mock_identifiers.get_user_identifier = Mock(return_value="test_user") + + extractor = SnowflakeQueriesExtractor( + connection=mock_connection, + config=config, + structured_report=mock_report, + filters=mock_filters, + identifiers=mock_identifiers, + ) + + # Simulate a Snowflake access history row with empty column name + row = { + "QUERY_ID": "test_query_123", + "ROOT_QUERY_ID": None, + "QUERY_TEXT": "SELECT * FROM test_table WHERE id = 1", + "QUERY_TYPE": "SELECT", + "SESSION_ID": "session_123", + "USER_NAME": "test_user", + "ROLE_NAME": "test_role", + "QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + "END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc), + "QUERY_DURATION": 1000, + "ROWS_INSERTED": 0, + "ROWS_UPDATED": 0, + "ROWS_DELETED": 0, + "DEFAULT_DB": "test_db", + "DEFAULT_SCHEMA": "test_schema", + "QUERY_COUNT": 1, + "QUERY_SECONDARY_FINGERPRINT": "fingerprint_123", + "DIRECT_OBJECTS_ACCESSED": json.dumps( + [ + { + "objectName": "test_db.test_schema.test_table", + "objectDomain": "Table", + "columns": [ + {"columnName": "id"}, + {"columnName": ""}, # Empty column name + {"columnName": "name"}, + ], + } + ] + ), + "OBJECTS_MODIFIED": json.dumps([]), + "OBJECT_MODIFIED_BY_DDL": None, + } + + users: dict = {} + + result = extractor._parse_audit_log_row(row, users) + + # Assert that an ObservedQuery is returned when there's an empty column + assert isinstance(result, ObservedQuery), ( + f"Expected ObservedQuery but got {type(result)}" + ) + assert result.query == "SELECT * FROM test_table WHERE id = 1" + assert result.session_id == "session_123" + assert result.default_db == "test_db" + assert result.default_schema == "test_schema" + + def test_parse_query_with_valid_columns_returns_preparsed_query(self): + """Test that queries with all valid column names return PreparsedQuery.""" + mock_connection = Mock() + config = SnowflakeQueriesExtractorConfig( + window=BaseTimeWindowConfig( + start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), + ), + ) + mock_report = Mock() + mock_filters = Mock() + mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder) + mock_identifiers.platform = "snowflake" + mock_identifiers.identifier_config = SnowflakeIdentifierConfig() + mock_identifiers.gen_dataset_urn = Mock( + return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)" + ) + mock_identifiers.get_dataset_identifier_from_qualified_name = Mock( + return_value="test_db.test_schema.test_table" + ) + mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x) + mock_identifiers.get_user_identifier = Mock(return_value="test_user") + + extractor = SnowflakeQueriesExtractor( + connection=mock_connection, + config=config, + structured_report=mock_report, + filters=mock_filters, + identifiers=mock_identifiers, + ) + + # Simulate a Snowflake access history row with valid column names + row = { + "QUERY_ID": "test_query_456", + "ROOT_QUERY_ID": None, + "QUERY_TEXT": "SELECT id, name FROM test_table", + "QUERY_TYPE": "SELECT", + "SESSION_ID": "session_456", + "USER_NAME": "test_user", + "ROLE_NAME": "test_role", + "QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + "END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc), + "QUERY_DURATION": 1000, + "ROWS_INSERTED": 0, + "ROWS_UPDATED": 0, + "ROWS_DELETED": 0, + "DEFAULT_DB": "test_db", + "DEFAULT_SCHEMA": "test_schema", + "QUERY_COUNT": 1, + "QUERY_SECONDARY_FINGERPRINT": "fingerprint_456", + "DIRECT_OBJECTS_ACCESSED": json.dumps( + [ + { + "objectName": "test_db.test_schema.test_table", + "objectDomain": "Table", + "columns": [ + {"columnName": "id"}, + {"columnName": "name"}, + ], + } + ] + ), + "OBJECTS_MODIFIED": json.dumps([]), + "OBJECT_MODIFIED_BY_DDL": None, + } + + users: dict = {} + + result = extractor._parse_audit_log_row(row, users) + + # Assert that a PreparsedQuery is returned when all columns are valid + assert isinstance(result, PreparsedQuery), ( + f"Expected PreparsedQuery but got {type(result)}" + ) + assert result.query_text == "SELECT id, name FROM test_table" + + class TestSnowflakeQueriesExtractorStatefulTimeWindowIngestion: """Tests for stateful time window ingestion support in queries v2.""" @@ -620,8 +788,8 @@ def _create_mock_extractor( config = SnowflakeQueriesExtractorConfig( window=BaseTimeWindowConfig( - start_time=datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc), - end_time=datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), bucket_duration=bucket_duration, ), include_usage_statistics=include_usage_statistics, @@ -646,12 +814,8 @@ def _create_mock_extractor( def test_time_window_adjusted_with_handler(self): """Test that time window is adjusted when handler is provided.""" - adjusted_start_time = datetime.datetime( - 2021, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc - ) - adjusted_end_time = datetime.datetime( - 2021, 1, 2, 12, 0, 0, tzinfo=datetime.timezone.utc - ) + adjusted_start_time = datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + adjusted_end_time = datetime(2021, 1, 2, 12, 0, 0, tzinfo=timezone.utc) mock_handler = Mock(spec=RedundantQueriesRunSkipHandler) mock_handler.suggest_run_time_window.return_value = ( @@ -669,10 +833,8 @@ def test_time_window_adjusted_with_handler(self): def test_time_window_not_adjusted_without_handler(self): """Test that time window is not adjusted when no handler is provided.""" - original_start_time = datetime.datetime( - 2021, 1, 1, tzinfo=datetime.timezone.utc - ) - original_end_time = datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc) + original_start_time = datetime(2021, 1, 1, tzinfo=timezone.utc) + original_end_time = datetime(2021, 1, 2, tzinfo=timezone.utc) extractor = self._create_mock_extractor( redundant_run_skip_handler=None, @@ -684,13 +846,11 @@ def test_time_window_not_adjusted_without_handler(self): def test_bucket_alignment_with_usage_statistics(self): """Test that start_time is aligned to bucket boundaries when usage statistics are enabled.""" # Start time at 14:30 should be aligned to beginning of day (00:00) - start_time_with_offset = datetime.datetime( - 2021, 1, 1, 14, 30, 0, tzinfo=datetime.timezone.utc - ) + start_time_with_offset = datetime(2021, 1, 1, 14, 30, 0, tzinfo=timezone.utc) mock_handler = Mock(spec=RedundantQueriesRunSkipHandler) mock_handler.suggest_run_time_window.return_value = ( start_time_with_offset, - datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + datetime(2021, 1, 2, tzinfo=timezone.utc), ) extractor = self._create_mock_extractor( @@ -699,24 +859,18 @@ def test_bucket_alignment_with_usage_statistics(self): ) # Start time should be aligned to beginning of day - expected_aligned_start = datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ) + expected_aligned_start = datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc) assert extractor.start_time == expected_aligned_start # End time should remain unchanged - assert extractor.end_time == datetime.datetime( - 2021, 1, 2, tzinfo=datetime.timezone.utc - ) + assert extractor.end_time == datetime(2021, 1, 2, tzinfo=timezone.utc) def test_no_bucket_alignment_without_usage_statistics(self): """Test that start_time is NOT aligned when usage statistics are disabled.""" - start_time_with_offset = datetime.datetime( - 2021, 1, 1, 14, 30, 0, tzinfo=datetime.timezone.utc - ) + start_time_with_offset = datetime(2021, 1, 1, 14, 30, 0, tzinfo=timezone.utc) mock_handler = Mock(spec=RedundantQueriesRunSkipHandler) mock_handler.suggest_run_time_window.return_value = ( start_time_with_offset, - datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + datetime(2021, 1, 2, tzinfo=timezone.utc), ) extractor = self._create_mock_extractor( @@ -726,19 +880,15 @@ def test_no_bucket_alignment_without_usage_statistics(self): # Start time should NOT be aligned assert extractor.start_time == start_time_with_offset - assert extractor.end_time == datetime.datetime( - 2021, 1, 2, tzinfo=datetime.timezone.utc - ) + assert extractor.end_time == datetime(2021, 1, 2, tzinfo=timezone.utc) def test_bucket_alignment_hourly_with_usage_statistics(self): """Test that start_time is aligned to hour boundaries when hourly buckets are configured.""" - start_time_with_offset = datetime.datetime( - 2021, 1, 1, 14, 30, 45, tzinfo=datetime.timezone.utc - ) + start_time_with_offset = datetime(2021, 1, 1, 14, 30, 45, tzinfo=timezone.utc) mock_handler = Mock(spec=RedundantQueriesRunSkipHandler) mock_handler.suggest_run_time_window.return_value = ( start_time_with_offset, - datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + datetime(2021, 1, 2, tzinfo=timezone.utc), ) extractor = self._create_mock_extractor( @@ -747,20 +897,16 @@ def test_bucket_alignment_hourly_with_usage_statistics(self): bucket_duration=BucketDuration.HOUR, ) - expected_aligned_start = datetime.datetime( - 2021, 1, 1, 14, 0, 0, tzinfo=datetime.timezone.utc - ) + expected_aligned_start = datetime(2021, 1, 1, 14, 0, 0, tzinfo=timezone.utc) assert extractor.start_time == expected_aligned_start - assert extractor.end_time == datetime.datetime( - 2021, 1, 2, tzinfo=datetime.timezone.utc - ) + assert extractor.end_time == datetime(2021, 1, 2, tzinfo=timezone.utc) def test_state_updated_after_successful_extraction(self): """Test that state is updated after successful extraction when handler is provided.""" mock_handler = Mock(spec=RedundantQueriesRunSkipHandler) mock_handler.suggest_run_time_window.return_value = ( - datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc), - datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + datetime(2021, 1, 1, tzinfo=timezone.utc), + datetime(2021, 1, 2, tzinfo=timezone.utc), ) extractor = self._create_mock_extractor( @@ -775,8 +921,8 @@ def test_state_updated_after_successful_extraction(self): list(extractor.get_workunits_internal()) mock_handler.update_state.assert_called_once_with( - datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc), - datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + datetime(2021, 1, 1, tzinfo=timezone.utc), + datetime(2021, 1, 2, tzinfo=timezone.utc), BucketDuration.DAY, ) @@ -797,8 +943,8 @@ def test_queries_extraction_always_runs_with_handler(self): """Test that queries extraction always runs even with a skip handler.""" mock_handler = Mock(spec=RedundantQueriesRunSkipHandler) mock_handler.suggest_run_time_window.return_value = ( - datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc), - datetime.datetime(2021, 1, 2, tzinfo=datetime.timezone.utc), + datetime(2021, 1, 1, tzinfo=timezone.utc), + datetime(2021, 1, 2, tzinfo=timezone.utc), ) extractor = self._create_mock_extractor( diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json new file mode 100644 index 00000000000000..78f01d2561030c --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json @@ -0,0 +1,58 @@ +[ +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-delete-query", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "DELETE FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart AS DBT_INTERNAL_DEST\nWHERE\n (\n unique_key_input\n ) IN (\n SELECT DISTINCT\n unique_key_input\n FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart__dbt_tmp AS DBT_INTERNAL_SOURCE\n )", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-delete-query", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD),unique_key_input)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-delete-query", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +} +] diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json new file mode 100644 index 00000000000000..ca904a32b0725e --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json @@ -0,0 +1,58 @@ +[ +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-select-gsheets-view", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "SELECT\n *\nFROM production.dsd_digital_private.gsheets_legacy_views\nWHERE\n id = 123", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-select-gsheets-view", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),id)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),name)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-select-gsheets-view", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +} +] diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 98ed52c6c30903..c0575dbc72f767 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -1297,3 +1297,141 @@ def test_partial_empty_downstream_column_in_snowflake_lineage( RESOURCE_DIR / "test_partial_empty_downstream_column_in_snowflake_lineage_golden.json", ) + + +@freeze_time(FROZEN_TIME) +def test_empty_column_in_query_subjects( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that QuerySubjects with empty column names doesn't create invalid URNs. + + This simulates a scenario where Snowflake's access_history may contain empty + column names, which should not result in invalid schemaField URNs. + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + generate_queries=True, + generate_query_subject_fields=True, + ) + + downstream_urn = DatasetUrn( + "snowflake", "production.dca_core.snowplow_user_engagement_mart" + ).urn() + upstream_urn = DatasetUrn( + "snowflake", "production.dca_core.snowplow_user_engagement_mart__dbt_tmp" + ).urn() + + # Simulate a query where Snowflake's access_history contains empty column names. + preparsed_query = PreparsedQuery( + query_id="test-delete-query", + query_text=( + "delete from PRODUCTION.DCA_CORE.snowplow_user_engagement_mart " + "as DBT_INTERNAL_DEST where (unique_key_input) in (" + "select distinct unique_key_input from " + "PRODUCTION.DCA_CORE.snowplow_user_engagement_mart__dbt_tmp " + "as DBT_INTERNAL_SOURCE)" + ), + upstreams=[upstream_urn], + downstream=downstream_urn, + column_lineage=[ + # This simulates a case where an empty column name might be present in the audit log. + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column=""), + upstreams=[ColumnRef(table=upstream_urn, column="unique_key_input")], + ), + ], + column_usage={ + upstream_urn: {"unique_key_input", ""}, # Empty column from Snowflake + }, + query_type=QueryType.DELETE, + timestamp=_ts(20), + ) + + aggregator.add_preparsed_query(preparsed_query) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + query_mcpws = [ + mcpw + for mcpw in mcpws + if mcpw.entityUrn and mcpw.entityUrn.startswith("urn:li:query:") + ] + + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, query_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR / "test_empty_column_in_query_subjects_golden.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_empty_column_in_query_subjects_only_column_usage( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that QuerySubjects with empty columns ONLY in column_usage doesn't create invalid URNs. + + This simulates the exact customer scenario where: + - Snowflake returns empty columns in direct_objects_accessed (column_usage) + - But NO empty columns in objects_modified (column_lineage is empty or valid) + + This is the scenario that would send invalid URNs to GMS rather than crash in Python, + matching the customer's error: "Provided urn urn:li:schemaField:(...,) is invalid" + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + generate_queries=True, + generate_query_subject_fields=True, + generate_query_usage_statistics=True, + usage_config=BaseUsageConfig( + bucket_duration=BucketDuration.DAY, + start_time=parse_user_datetime("2024-02-06T00:00:00Z"), + end_time=parse_user_datetime("2024-02-07T00:00:00Z"), + ), + ) + + # Simulate table name from user: production.dsd_digital_private.gsheets_legacy_views + upstream_urn = DatasetUrn( + "snowflake", "production.dsd_digital_private.gsheets_legacy_views" + ).urn() + + # Simulate a SELECT query (no downstream) where the audit log contains empty column names. + preparsed_query = PreparsedQuery( + query_id="test-select-gsheets-view", + query_text="SELECT * FROM production.dsd_digital_private.gsheets_legacy_views WHERE id = 123", + upstreams=[upstream_urn], + downstream=None, # SELECT query has no downstream + column_lineage=[], # No column lineage because no downstream + column_usage={ + # Simulate a case where an empty column name is present in the audit log. + upstream_urn: {"id", "name", ""}, # Empty column from Snowflake! + }, + query_type=QueryType.SELECT, + timestamp=_ts(20), + ) + + aggregator.add_preparsed_query(preparsed_query) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + query_mcpws = [ + mcpw + for mcpw in mcpws + if mcpw.entityUrn and mcpw.entityUrn.startswith("urn:li:query:") + ] + + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, query_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR + / "test_empty_column_in_query_subjects_only_column_usage_golden.json", + )