Skip to content

Conversation

@sgomezvillamor
Copy link
Contributor

@sgomezvillamor sgomezvillamor commented Oct 28, 2025

Summary

Fixes a critical issue where BigQuery SQL lineage parsing was failing to generate lineage relationships due to missing graph client, causing schema resolution failures during SQL parsing.

Root Cause: The BigQuery lineage extractor was not receiving the DataHub graph client, preventing it from resolving table schemas that exist in DataHub but are not in the current ingestion cache. This resulted in "Resolved 0 of X table schemas" log traces and missing upstream lineage for datasets.

Changes Made:

  1. Fixed BigQuery lineage extractor to properly accept and pass the graph client to SqlParsingAggregator
  2. Added comprehensive test to verify graph client propagation through the SQL parsing pipeline
  3. Added warning logging when SchemaResolver is initialized without a graph client

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Oct 28, 2025
@codecov
Copy link

codecov bot commented Oct 28, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
4970 1 4969 38
View the top 1 failed test(s) by shortest run time
tests.unit.test_athena_source::test_sanitize_identifier_error_handling_in_generate_partition_profiler_query
Stack Traces | 0.01s run time
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f765389aac0>

    def test_sanitize_identifier_error_handling_in_generate_partition_profiler_query(
        caplog,
    ):
        """Test that ValueError from _sanitize_identifier is handled gracefully in generate_partition_profiler_query."""
        import logging
    
        config = AthenaConfig.parse_obj(
            {
                "aws_region": "us-west-1",
                "query_result_location": "s3://query-result-location/",
                "work_group": "test-workgroup",
                "profiling": {"enabled": True, "partition_profiling_enabled": True},
            }
        )
    
        ctx = PipelineContext(run_id="test")
        source = AthenaSource(config=config, ctx=ctx)
    
        # Add a mock partition to the cache with malicious partition key
        source.table_partition_cache["valid_schema"] = {
            "valid_table": Partitionitem(
                partitions=["year"],
                max_partition={"malicious'; DROP TABLE users; --": "2023"},
            )
        }
    
        # Capture log messages at WARNING level
        with caplog.at_level(logging.WARNING):
            # This should handle the ValueError gracefully and return None, None
            # instead of crashing the entire ingestion process
            result = source.generate_partition_profiler_query(
                "valid_schema", "valid_table", None
            )
    
        # Verify the method returns None, None when sanitization fails
        assert result == (None, None), "Should return None, None when sanitization fails"
    
        # Verify that a warning log message was generated
>       assert len(caplog.records) == 1
E       assert 2 == 1
E        +  where 2 = len([<LogRecord: datahub.sql_parsing.schema_resolver, 30, .../datahub/sql_parsing/schema_resolver.py, 80, "SchemaResolver initialized without DataHub graph client for platform 'athena'. This may result in missing lineage when SQL parsing cannot resolve table schemas that exist in DataHub but are not in the current ingestion cache.">, <LogRecord: datahub.ingestion.source.sql.athena, 30, .../source/sql/athena.py, 764, "Failed to generate partition profiler query for valid_schema.valid_table due to unsafe identifiers: Identifier 'malicious'; DROP TABLE users; --' contains unsafe characters. Only alphanumeric characters, underscores, and periods are allowed.. Partition profiling disabled for this table.">])
E        +    where [<LogRecord: datahub.sql_parsing.schema_resolver, 30, .../datahub/sql_parsing/schema_resolver.py, 80, "SchemaResolver initialized without DataHub graph client for platform 'athena'. This may result in missing lineage when SQL parsing cannot resolve table schemas that exist in DataHub but are not in the current ingestion cache.">, <LogRecord: datahub.ingestion.source.sql.athena, 30, .../source/sql/athena.py, 764, "Failed to generate partition profiler query for valid_schema.valid_table due to unsafe identifiers: Identifier 'malicious'; DROP TABLE users; --' contains unsafe characters. Only alphanumeric characters, underscores, and periods are allowed.. Partition profiling disabled for this table.">] = <_pytest.logging.LogCaptureFixture object at 0x7f765389aac0>.records

tests/unit/test_athena_source.py:1138: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

…ing lineage from unresolved schemas

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@sgomezvillamor sgomezvillamor changed the title adding some debug traces fix(bigquery): pass graph client to lineage extractor to prevent missing lineage from unresolved schemas Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants