Skip to content

Commit 47fc601

Browse files
treff7esclaude
andcommitted
feat(kafka-connect): add column-level lineage support for MongoDB source connector
MongoSourceConnector.extract_lineages() now calls _extract_fine_grained_lineage() to produce column-level lineage when use_schema_resolver and schema_resolver_finegrained_lineage are enabled. The base class method resolves MongoDB collection schemas from DataHub and creates 1:1 field mappings. Prerequisite: MongoDB collections must be ingested into DataHub before Kafka Connect ingestion for schema resolution to work. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4ff800c commit 47fc601

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/source_connectors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2121,11 +2121,19 @@ def extract_lineages(self) -> List[KafkaConnectLineage]:
21212121
if found:
21222122
table_name = get_dataset_name(found.group(1), found.group(2))
21232123

2124+
fine_grained = self._extract_fine_grained_lineage(
2125+
source_dataset=table_name,
2126+
source_platform=source_platform,
2127+
target_dataset=topic,
2128+
target_platform=KAFKA,
2129+
)
2130+
21242131
lineage = KafkaConnectLineage(
21252132
source_dataset=table_name,
21262133
source_platform=source_platform,
21272134
target_dataset=topic,
21282135
target_platform=KAFKA,
2136+
fine_grained_lineages=fine_grained,
21292137
)
21302138
lineages.append(lineage)
21312139
return lineages

metadata-ingestion/tests/unit/test_kafka_connect.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,65 @@ def test_mongo_source_lineage_topic_parsing(self) -> None:
604604
assert lineages[0].source_dataset == "my-new-database.users"
605605
assert lineages[1].source_dataset == "-leading-hyphen._leading-underscore"
606606

607+
def test_mongo_source_fine_grained_lineage(self) -> None:
608+
"""Test MongoDB connector produces column-level lineage when schema resolver is available."""
609+
config = KafkaConnectSourceConfig(
610+
connect_uri="http://test:8083",
611+
cluster_name="test",
612+
use_schema_resolver=True,
613+
schema_resolver_finegrained_lineage=True,
614+
)
615+
report = KafkaConnectSourceReport()
616+
617+
manifest = ConnectorManifest(
618+
name="mongo-source-connector",
619+
type="source",
620+
config={
621+
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
622+
"topic.prefix": "prod.mongo",
623+
},
624+
tasks=[],
625+
topic_names=["prod.mongo.mydb.users"],
626+
)
627+
628+
mock_resolver = Mock()
629+
mock_resolver.resolve_table.return_value = (
630+
"urn:li:dataset:(urn:li:dataPlatform:mongodb,mydb.users,PROD)",
631+
{"_id": "string", "name": "string", "email": "string"},
632+
)
633+
634+
connector = MongoSourceConnector(
635+
connector_manifest=manifest,
636+
config=config,
637+
report=report,
638+
schema_resolver=mock_resolver,
639+
)
640+
641+
lineages = connector.extract_lineages()
642+
643+
assert len(lineages) == 1
644+
assert lineages[0].source_dataset == "mydb.users"
645+
assert lineages[0].target_dataset == "prod.mongo.mydb.users"
646+
assert lineages[0].fine_grained_lineages is not None
647+
assert len(lineages[0].fine_grained_lineages) == 3
648+
649+
def test_mongo_source_no_fine_grained_without_schema_resolver(self) -> None:
650+
"""Test MongoDB connector skips column-level lineage when schema resolver is disabled."""
651+
connector_config: Dict[str, str] = {
652+
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
653+
"topic.prefix": "prod.mongo.avro",
654+
}
655+
656+
manifest = self.create_mock_manifest(connector_config)
657+
config = create_mock_kafka_connect_config()
658+
report = Mock(spec=KafkaConnectSourceReport)
659+
660+
connector = MongoSourceConnector(manifest, config, report)
661+
lineages = connector.extract_lineages()
662+
663+
assert len(lineages) == 2
664+
assert all(lin.fine_grained_lineages is None for lin in lineages)
665+
607666

608667
class TestConfluentCloudConnectors:
609668
"""Test Confluent Cloud connector compatibility with Platform connectors."""

0 commit comments

Comments
 (0)