Skip to content

Commit c819d90

Browse files
committed
[DOP-23708] Add legacy indirect lineage support
1 parent 912ab0c commit c819d90

File tree

6 files changed

+75
-32
lines changed

6 files changed

+75
-32
lines changed

data_rentgen/consumer/extractors/column_lineage.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
logger = logging.getLogger(__name__)
2222

23+
TRANSFORMATION_TYPE_DIRECT = "DIRECT"
24+
2325
TRANSFORMATION_SUBTYPE_MAP_MASKING = {
2426
"TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION_MASKING,
2527
"AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION_MASKING,
@@ -77,51 +79,65 @@ def extract_column_lineage(
7779
# Grouping column lineage by source+target dataset. This is unique combination within operation,
7880
# so we can use it to generate the same fingerprint for all dataset column relations
7981
datasets = {target_dataset_dto.unique_key: target_dataset_dto}
80-
dataset_column_relations = defaultdict(list)
82+
dataset_column_relations: dict[tuple, dict[tuple, DatasetColumnRelationDTO]] = defaultdict(dict)
8183

8284
# direct lineage (source_column -> target_column)
8385
for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items():
8486
for input_field in raw_column_lineage.inputFields:
8587
source_dataset_dto = resolve_dataset_ref(input_field, dataset_cache)
8688
datasets[source_dataset_dto.unique_key] = source_dataset_dto
8789

88-
column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
89-
for transformation in input_field.transformations:
90-
# OL integration for Spark before v1.23 (or with columnLineage.datasetLineageEnabled=false, which is still default) # noqa: E501
91-
# produced INDIRECT lineage for each combination source_column x target_column,
92-
# which is amlost the cartesian join. It is VERY expensive to handle, just ignore.
93-
# See https://github.com/OpenLineage/OpenLineage/pull/3097
94-
if transformation.type == "INDIRECT":
95-
continue
90+
dataset_relation_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
91+
dataset_column_relation = dataset_column_relations[dataset_relation_key]
9692

93+
for transformation in input_field.transformations:
94+
# OL integration for Spark before v1.23
95+
# or with columnLineage.datasetLineageEnabled=false (which is still default)
96+
# produces INDIRECT lineage for each combination source_column x target_column,
97+
# which is almost a cartesian product.
98+
# There are a lot of duplicates here, trying to avoid them by merging items immediately.
9799
column_relation = DatasetColumnRelationDTO(
98100
type=extract_dataset_column_relation_type(transformation),
99101
source_column=input_field.field,
100-
target_column=field,
102+
target_column=field if transformation.type == TRANSFORMATION_TYPE_DIRECT else None,
101103
)
102-
dataset_column_relations[column_lineage_key].append(column_relation)
104+
column_relation_key = column_relation.unique_key
105+
106+
existing_column_relation = dataset_column_relation.get(column_relation_key)
107+
if existing_column_relation:
108+
dataset_column_relation[column_relation_key] = existing_column_relation.merge(column_relation)
109+
else:
110+
dataset_column_relation[column_relation_key] = column_relation
103111

104112
# indirect lineage (source_column -> target_dataset),
105113
# added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true
106114
for input_field in target_dataset.facets.columnLineage.dataset:
107115
source_dataset_dto = resolve_dataset_ref(input_field, dataset_cache)
108116
datasets[source_dataset_dto.unique_key] = source_dataset_dto
109117

110-
column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
118+
dataset_relation_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
119+
dataset_column_relation = dataset_column_relations[dataset_relation_key]
120+
111121
for transformation in input_field.transformations:
112122
column_relation = DatasetColumnRelationDTO(
113123
type=extract_dataset_column_relation_type(transformation),
114124
source_column=input_field.field,
115125
)
116-
dataset_column_relations[column_lineage_key].append(column_relation)
126+
column_relation_key = column_relation.unique_key
127+
128+
existing_column_relation = dataset_column_relation.get(column_relation_key)
129+
if existing_column_relation:
130+
dataset_column_relation[column_relation_key] = existing_column_relation.merge(column_relation)
131+
else:
132+
dataset_column_relation[column_relation_key] = column_relation
117133

118134
# merge results into DTO objects
119135
return [
120136
ColumnLineageDTO(
121137
operation=operation,
122138
source_dataset=datasets[source_dataset_dto_key],
123139
target_dataset=datasets[target_dataset_dto_key],
124-
dataset_column_relations=relations,
140+
dataset_column_relations=list(relations.values()),
125141
)
126142
for (source_dataset_dto_key, target_dataset_dto_key), relations in dataset_column_relations.items()
127143
if dataset_column_relations

data_rentgen/dto/column_lineage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def column_relations(self) -> list[DatasetColumnRelationDTO]:
4141

4242
@cached_property
4343
def fingerprint(self) -> UUID:
44-
id_components = [(*item.unique_key, item.type) for item in self.column_relations]
44+
id_components = sorted((*item.unique_key, item.type) for item in self.column_relations)
4545
str_components = [".".join(map(str, item)) for item in id_components]
4646
return generate_static_uuid(",".join(str_components))
4747

data_rentgen/dto/dataset_column_relation.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ def unique_key(self) -> tuple:
4747
)
4848

4949
def merge(self, new: DatasetColumnRelationDTO) -> DatasetColumnRelationDTO:
50+
if new.fingerprint is None and new.type.value & self.type.value:
51+
return self
52+
5053
return DatasetColumnRelationDTO(
5154
source_column=self.source_column,
5255
target_column=self.target_column,

data_rentgen/dto/location.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def unique_key(self) -> tuple:
1717
return (self.type, self.name)
1818

1919
def merge(self, new: LocationDTO) -> LocationDTO:
20-
if new.id is None and self.addresses == new.addresses:
20+
if new.id is None and new.addresses.issubset(self.addresses):
2121
# locations aren't changed that much, reuse them if possible
2222
return self
2323

tests/test_consumer/test_extractors/fixtures/column_lineage_facets.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,6 @@ def output_event_with_one_to_two_direct_and_indirect_column_lineage() -> OpenLin
9696
),
9797
],
9898
),
99-
OpenLineageColumnLineageDatasetFacetFieldRef(
100-
namespace="hive://test-hadoop:9083",
101-
name="mydb.mytable",
102-
field="source_col_1",
103-
transformations=[
104-
OpenLineageColumnLineageDatasetFacetFieldTransformation(
105-
type="INDIRECT",
106-
subtype="JOIN",
107-
),
108-
],
109-
),
11099
OpenLineageColumnLineageDatasetFacetFieldRef(
111100
namespace="hive://test-hadoop:9083",
112101
name="mydb.mytable",
@@ -127,6 +116,11 @@ def output_event_with_one_to_two_direct_and_indirect_column_lineage() -> OpenLin
127116
name="mydb.mytable",
128117
field="source_col_2",
129118
transformations=[
119+
OpenLineageColumnLineageDatasetFacetFieldTransformation(
120+
type="INDIRECT",
121+
subtype="JOIN",
122+
masking=False,
123+
),
130124
OpenLineageColumnLineageDatasetFacetFieldTransformation(
131125
type="INDIRECT",
132126
subtype="SORT",
@@ -172,6 +166,17 @@ def output_event_with_direct_and_legacy_indirect_column_lineage() -> OpenLineage
172166
),
173167
],
174168
),
169+
OpenLineageColumnLineageDatasetFacetFieldRef(
170+
namespace="hive://test-hadoop:9083",
171+
name="mydb.mytable",
172+
field="source_col_4",
173+
transformations=[
174+
OpenLineageColumnLineageDatasetFacetFieldTransformation(
175+
type="INDIRECT",
176+
subtype="WINDOW",
177+
),
178+
],
179+
),
175180
],
176181
),
177182
"column_2": OpenLineageColumnLineageDatasetFacetField(
@@ -187,6 +192,17 @@ def output_event_with_direct_and_legacy_indirect_column_lineage() -> OpenLineage
187192
),
188193
],
189194
),
195+
OpenLineageColumnLineageDatasetFacetFieldRef(
196+
namespace="hive://test-hadoop:9083",
197+
name="mydb.mytable",
198+
field="source_col_2",
199+
transformations=[
200+
OpenLineageColumnLineageDatasetFacetFieldTransformation(
201+
type="INDIRECT",
202+
subtype="JOIN",
203+
),
204+
],
205+
),
190206
OpenLineageColumnLineageDatasetFacetFieldRef(
191207
namespace="hive://test-hadoop:9083",
192208
name="mydb.mytable",

tests/test_consumer/test_extractors/test_extractors_column_lineage.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,6 @@ def test_extractors_extract_legacy_indirect_column_lineage(
130130
extracted_hdfs_dataset,
131131
output_event_with_direct_and_legacy_indirect_column_lineage,
132132
):
133-
"""
134-
The output event contains Indirect column lineage in legacy format (inside 'fields', item).
135-
This test check's that this data is not included in column lineage.
136-
"""
137133
operation = extracted_spark_operation
138134

139135
column_lineage = extract_column_lineage(
@@ -152,12 +148,24 @@ def test_extractors_extract_legacy_indirect_column_lineage(
152148
target_column="column_1",
153149
fingerprint=None,
154150
),
151+
DatasetColumnRelationDTO(
152+
type=DatasetColumnRelationTypeDTO.JOIN,
153+
source_column="source_col_2",
154+
target_column=None,
155+
fingerprint=None,
156+
),
155157
DatasetColumnRelationDTO(
156158
type=DatasetColumnRelationTypeDTO.AGGREGATION,
157159
source_column="source_col_3",
158160
target_column="column_2",
159161
fingerprint=None,
160162
),
163+
DatasetColumnRelationDTO(
164+
type=DatasetColumnRelationTypeDTO.WINDOW,
165+
source_column="source_col_4",
166+
target_column=None,
167+
fingerprint=None,
168+
),
161169
],
162170
),
163171
]
@@ -194,7 +202,7 @@ def test_extractors_extract_indirect_column_lineage(
194202
fingerprint=None,
195203
),
196204
DatasetColumnRelationDTO(
197-
type=DatasetColumnRelationTypeDTO.SORT,
205+
type=DatasetColumnRelationTypeDTO.JOIN | DatasetColumnRelationTypeDTO.SORT,
198206
source_column="source_col_2",
199207
target_column=None,
200208
fingerprint=None,

0 commit comments

Comments
 (0)