diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index d3b94d3808240f..9bfc78efffe08a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -164,6 +164,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): schema_resolver=self.sql_parser_schema_resolver, identifiers=self.identifiers, redundant_run_skip_handler=redundant_lineage_run_skip_handler, + graph=self.ctx.graph, ) redundant_usage_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = None @@ -331,6 +332,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: filters=self.filters, identifiers=self.identifiers, redundant_run_skip_handler=redundant_queries_run_skip_handler, + graph=self.ctx.graph, schema_resolver=self.sql_parser_schema_resolver, discovered_tables=self.bq_schema_extractor.table_refs, ) as queries_extractor, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index d6d79784b43297..329f351bf6729c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -221,6 +221,7 @@ def __init__( schema_resolver: SchemaResolver, identifiers: BigQueryIdentifierBuilder, redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None, + graph: Optional["DataHubGraph"] = None, ): self.config = config self.report = report @@ -246,6 +247,7 @@ def __init__( platform_instance=self.identifiers.identifier_config.platform_instance, env=self.identifiers.identifier_config.env, schema_resolver=self.schema_resolver, + graph=graph, eager_graph_load=False, generate_lineage=True, generate_queries=True, diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index eca043ac579222..345c946b041240 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -1,4 +1,5 @@ import contextlib +import logging import pathlib from dataclasses import dataclass from typing import Dict, List, Optional, Protocol, Set, Tuple @@ -19,6 +20,8 @@ from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path +logger = logging.getLogger(__name__) + # A lightweight table schema: column -> type mapping. SchemaInfo = Dict[str, str] @@ -72,6 +75,14 @@ def __init__( self.graph = graph self.report = report + # Log warning if no graph client is provided for schema resolution + if graph is None: + logger.warning( + f"SchemaResolver initialized without DataHub graph client for platform '{platform}'. " + "This may result in missing lineage when SQL parsing cannot resolve table schemas " + "that exist in DataHub but are not in the current ingestion cache." + ) + # Init cache, potentially restoring from a previous run. shared_conn = None if _cache_filename: @@ -168,6 +179,13 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: self._track_cache_hit() return urn_mixed, schema_info + # Log resolution failure details + logger.debug( + f"Schema resolution failed for table {table}. Tried URNs: " + f"primary={urn}, lower={urn_lower}, mixed={urn_mixed}. " + f"Cache size: {len(self._schema_cache)}, " + f"Graph client available: {self.graph is not None}" + ) # Track cache miss for the final attempt self._track_cache_miss() @@ -194,16 +212,25 @@ def _track_cache_miss(self) -> None: def _resolve_schema_info(self, urn: str) -> Optional[SchemaInfo]: if urn in self._schema_cache: - return self._schema_cache[urn] + cached_value = self._schema_cache[urn] + if cached_value is None: + logger.debug(f"URN {urn} found in cache but value is None") + return cached_value # TODO: For bigquery partitioned tables, add the pseudo-column _PARTITIONTIME # or _PARTITIONDATE where appropriate. if self.graph: + logger.debug(f"Attempting to fetch schema for {urn} from DataHub graph") schema_info = self._fetch_schema_info(self.graph, urn) if schema_info: + logger.debug(f"Successfully fetched schema for {urn} from graph") self._save_to_cache(urn, schema_info) return schema_info + else: + logger.debug(f"No schema found for {urn} in DataHub graph") + else: + logger.debug(f"No graph client available to fetch schema for {urn}") self._save_to_cache(urn, None) return None diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 59914e8802e825..de3b447c2025db 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1442,6 +1442,11 @@ def _sqlglot_lineage_inner( table_name_urn_mapping[qualified_table] = urn if schema_info: table_name_schema_mapping[qualified_table] = schema_info + else: + sql_str = str(sql) if not isinstance(sql, str) else sql + logger.debug( + f"Failed to resolve schema for table {qualified_table} -> URN: {urn} in statement: {sql_str[:200]}..." + ) # Also include the original, non-qualified table name in the urn mapping. table_name_urn_mapping[table] = urn @@ -1615,6 +1620,12 @@ def _sqlglot_lineage_nocache( override_dialect=override_dialect, ) except Exception as e: + sql_str = str(sql) if not isinstance(sql, str) else sql + logger.error( + f"SQL parsing failed with table_error: {type(e).__name__}: {e}. " + f"SQL statement: {sql_str[:500]}...", + exc_info=True, + ) return SqlParsingResult.make_from_error(e) except BaseException as e: # Check if this is a PanicException from SQLGlot's Rust tokenizer @@ -1630,10 +1641,16 @@ def _sqlglot_lineage_nocache( # KeyboardInterrupt) rather than Exception, so it bypasses normal exception handling. # Avoid catching BaseException, as it includes KeyboardInterrupt # and would prevent Ctrl+C from working. + sql_str = str(sql) if not isinstance(sql, str) else sql wrapped_exception = Exception( f"pyo3_runtime.PanicException during SQL parsing: {e}" ) wrapped_exception.__cause__ = e + logger.error( + f"SQL parsing failed with table_error: PanicException: {e}. " + f"SQL statement: {sql_str[:500]}...", + exc_info=True, + ) return SqlParsingResult.make_from_error(wrapped_exception) else: # Re-raise other BaseException types (SystemExit, KeyboardInterrupt, etc.) @@ -1733,6 +1750,11 @@ def create_lineage_sql_parsed_result( override_dialect=override_dialect, ) except Exception as e: + logger.error( + f"SQL lineage parsing failed with table_error: {type(e).__name__}: {e}. " + f"Query: {str(query)[:500]}...", + exc_info=True, + ) return SqlParsingResult.make_from_error(e) finally: if needs_close: diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py index f494ed78211dcf..a883426040076f 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py @@ -356,3 +356,36 @@ def test_lineage_for_external_table_path_not_matching_specs( ) assert upstream_lineage is None + + +def test_lineage_extractor_graph_client_passed_to_aggregator( + mock_datahub_graph_instance, +): + """Test that the graph client is properly passed to SqlParsingAggregator for schema resolution.""" + pipeline_context = PipelineContext(run_id="test-graph-client") + pipeline_context.graph = mock_datahub_graph_instance + + config = BigQueryV2Config.parse_obj( + { + "project_id": "my_project", + "include_table_lineage": True, + "lineage_use_sql_parser": True, + } + ) + + report = BigQueryV2Report() + identifiers = BigQueryIdentifierBuilder(config, report) + + extractor = BigqueryLineageExtractor( + config, + report, + schema_resolver=SchemaResolver( + platform="bigquery", env="PROD", graph=pipeline_context.graph + ), + identifiers=identifiers, + graph=pipeline_context.graph, + ) + + # Verify that the aggregator's schema resolver has the graph client + assert extractor.aggregator is not None + assert extractor.aggregator._schema_resolver.graph is pipeline_context.graph