Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
schema_resolver=self.sql_parser_schema_resolver,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_lineage_run_skip_handler,
graph=self.ctx.graph,
)

redundant_usage_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = None
Expand Down Expand Up @@ -331,6 +332,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
filters=self.filters,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_queries_run_skip_handler,
graph=self.ctx.graph,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(
schema_resolver: SchemaResolver,
identifiers: BigQueryIdentifierBuilder,
redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None,
graph: Optional["DataHubGraph"] = None,
):
self.config = config
self.report = report
Expand All @@ -246,6 +247,7 @@ def __init__(
platform_instance=self.identifiers.identifier_config.platform_instance,
env=self.identifiers.identifier_config.env,
schema_resolver=self.schema_resolver,
graph=graph,
eager_graph_load=False,
generate_lineage=True,
generate_queries=True,
Expand Down
29 changes: 28 additions & 1 deletion metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import logging
import pathlib
from dataclasses import dataclass
from typing import Dict, List, Optional, Protocol, Set, Tuple
Expand All @@ -19,6 +20,8 @@
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict
from datahub.utilities.urns.field_paths import get_simple_field_path_from_v2_field_path

logger = logging.getLogger(__name__)

# A lightweight table schema: column -> type mapping.
SchemaInfo = Dict[str, str]

Expand Down Expand Up @@ -72,6 +75,14 @@ def __init__(
self.graph = graph
self.report = report

# Log warning if no graph client is provided for schema resolution
if graph is None:
logger.warning(
f"SchemaResolver initialized without DataHub graph client for platform '{platform}'. "
"This may result in missing lineage when SQL parsing cannot resolve table schemas "
"that exist in DataHub but are not in the current ingestion cache."
)

# Init cache, potentially restoring from a previous run.
shared_conn = None
if _cache_filename:
Expand Down Expand Up @@ -168,6 +179,13 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
self._track_cache_hit()
return urn_mixed, schema_info

# Log resolution failure details
logger.debug(
f"Schema resolution failed for table {table}. Tried URNs: "
f"primary={urn}, lower={urn_lower}, mixed={urn_mixed}. "
f"Cache size: {len(self._schema_cache)}, "
f"Graph client available: {self.graph is not None}"
)
# Track cache miss for the final attempt
self._track_cache_miss()

Expand All @@ -194,16 +212,25 @@ def _track_cache_miss(self) -> None:

def _resolve_schema_info(self, urn: str) -> Optional[SchemaInfo]:
if urn in self._schema_cache:
return self._schema_cache[urn]
cached_value = self._schema_cache[urn]
if cached_value is None:
logger.debug(f"URN {urn} found in cache but value is None")
return cached_value

# TODO: For bigquery partitioned tables, add the pseudo-column _PARTITIONTIME
# or _PARTITIONDATE where appropriate.

if self.graph:
logger.debug(f"Attempting to fetch schema for {urn} from DataHub graph")
schema_info = self._fetch_schema_info(self.graph, urn)
if schema_info:
logger.debug(f"Successfully fetched schema for {urn} from graph")
self._save_to_cache(urn, schema_info)
return schema_info
else:
logger.debug(f"No schema found for {urn} in DataHub graph")
else:
logger.debug(f"No graph client available to fetch schema for {urn}")

self._save_to_cache(urn, None)
return None
Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,11 @@ def _sqlglot_lineage_inner(
table_name_urn_mapping[qualified_table] = urn
if schema_info:
table_name_schema_mapping[qualified_table] = schema_info
else:
sql_str = str(sql) if not isinstance(sql, str) else sql
logger.debug(
f"Failed to resolve schema for table {qualified_table} -> URN: {urn} in statement: {sql_str[:200]}..."
)

# Also include the original, non-qualified table name in the urn mapping.
table_name_urn_mapping[table] = urn
Expand Down Expand Up @@ -1615,6 +1620,12 @@ def _sqlglot_lineage_nocache(
override_dialect=override_dialect,
)
except Exception as e:
sql_str = str(sql) if not isinstance(sql, str) else sql
logger.error(
f"SQL parsing failed with table_error: {type(e).__name__}: {e}. "
f"SQL statement: {sql_str[:500]}...",
exc_info=True,
)
return SqlParsingResult.make_from_error(e)
except BaseException as e:
# Check if this is a PanicException from SQLGlot's Rust tokenizer
Expand All @@ -1630,10 +1641,16 @@ def _sqlglot_lineage_nocache(
# KeyboardInterrupt) rather than Exception, so it bypasses normal exception handling.
# Avoid catching BaseException, as it includes KeyboardInterrupt
# and would prevent Ctrl+C from working.
sql_str = str(sql) if not isinstance(sql, str) else sql
wrapped_exception = Exception(
f"pyo3_runtime.PanicException during SQL parsing: {e}"
)
wrapped_exception.__cause__ = e
logger.error(
f"SQL parsing failed with table_error: PanicException: {e}. "
f"SQL statement: {sql_str[:500]}...",
exc_info=True,
)
return SqlParsingResult.make_from_error(wrapped_exception)
else:
# Re-raise other BaseException types (SystemExit, KeyboardInterrupt, etc.)
Expand Down Expand Up @@ -1733,6 +1750,11 @@ def create_lineage_sql_parsed_result(
override_dialect=override_dialect,
)
except Exception as e:
logger.error(
f"SQL lineage parsing failed with table_error: {type(e).__name__}: {e}. "
f"Query: {str(query)[:500]}...",
exc_info=True,
)
return SqlParsingResult.make_from_error(e)
finally:
if needs_close:
Expand Down
33 changes: 33 additions & 0 deletions metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,36 @@ def test_lineage_for_external_table_path_not_matching_specs(
)

assert upstream_lineage is None


def test_lineage_extractor_graph_client_passed_to_aggregator(
mock_datahub_graph_instance,
):
"""Test that the graph client is properly passed to SqlParsingAggregator for schema resolution."""
pipeline_context = PipelineContext(run_id="test-graph-client")
pipeline_context.graph = mock_datahub_graph_instance

config = BigQueryV2Config.parse_obj(
{
"project_id": "my_project",
"include_table_lineage": True,
"lineage_use_sql_parser": True,
}
)

report = BigQueryV2Report()
identifiers = BigQueryIdentifierBuilder(config, report)

extractor = BigqueryLineageExtractor(
config,
report,
schema_resolver=SchemaResolver(
platform="bigquery", env="PROD", graph=pipeline_context.graph
),
identifiers=identifiers,
graph=pipeline_context.graph,
)

# Verify that the aggregator's schema resolver has the graph client
assert extractor.aggregator is not None
assert extractor.aggregator._schema_resolver.graph is pipeline_context.graph
Loading