Skip to content

Commit ca661d8

Browse files
committed
[DOP-23708] Cache datasets for column lineage extraction
1 parent 9e569df commit ca661d8

File tree

8 files changed

+130
-80
lines changed

8 files changed

+130
-80
lines changed

data_rentgen/consumer/extractors/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
extract_dataset_and_symlinks,
1010
)
1111
from data_rentgen.consumer.extractors.input import extract_input
12-
from data_rentgen.consumer.extractors.job import extract_job
12+
from data_rentgen.consumer.extractors.job import extract_job, extract_parent_job
1313
from data_rentgen.consumer.extractors.operation import extract_operation
1414
from data_rentgen.consumer.extractors.output import extract_output
15-
from data_rentgen.consumer.extractors.run import extract_run, extract_run_minimal
15+
from data_rentgen.consumer.extractors.run import extract_parent_run, extract_run
1616
from data_rentgen.consumer.extractors.schema import extract_schema
1717

1818
__all__ = [
@@ -26,7 +26,8 @@
2626
"extract_job",
2727
"extract_operation",
2828
"extract_output",
29+
"extract_parent_job",
30+
"extract_parent_run",
2931
"extract_run",
30-
"extract_run_minimal",
3132
"extract_schema",
3233
]

data_rentgen/consumer/extractors/batch.py

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -156,123 +156,133 @@ def add_schema(self, schema: SchemaDTO):
156156
def add_user(self, user: UserDTO):
157157
self._add(self._users, user)
158158

159-
def _get_location(self, location_key: tuple) -> LocationDTO:
159+
def get_location(self, location_key: tuple) -> LocationDTO:
160160
return self._locations[location_key]
161161

162-
def _get_schema(self, schema_key: tuple) -> SchemaDTO:
162+
def get_schema(self, schema_key: tuple) -> SchemaDTO:
163163
return self._schemas[schema_key]
164164

165-
def _get_user(self, user_key: tuple) -> UserDTO:
165+
def get_user(self, user_key: tuple) -> UserDTO:
166166
return self._users[user_key]
167167

168-
def _get_dataset(self, dataset_key: tuple) -> DatasetDTO:
168+
def get_dataset(self, dataset_key: tuple) -> DatasetDTO:
169169
dataset = self._datasets[dataset_key]
170-
dataset.location = self._get_location(dataset.location.unique_key)
170+
dataset.location = self.get_location(dataset.location.unique_key)
171171
return dataset
172172

173-
def _get_dataset_symlink(self, dataset_symlink_key: tuple) -> DatasetSymlinkDTO:
173+
def get_dataset_symlink(self, dataset_symlink_key: tuple) -> DatasetSymlinkDTO:
174174
dataset_symlink = self._dataset_symlinks[dataset_symlink_key]
175-
dataset_symlink.from_dataset = self._get_dataset(dataset_symlink.from_dataset.unique_key)
176-
dataset_symlink.to_dataset = self._get_dataset(dataset_symlink.to_dataset.unique_key)
175+
dataset_symlink.from_dataset = self.get_dataset(dataset_symlink.from_dataset.unique_key)
176+
dataset_symlink.to_dataset = self.get_dataset(dataset_symlink.to_dataset.unique_key)
177177
return dataset_symlink
178178

179-
def _get_job(self, job_key: tuple) -> JobDTO:
179+
def get_job(self, job_key: tuple) -> JobDTO:
180180
job = self._jobs[job_key]
181-
job.location = self._get_location(job.location.unique_key)
181+
job.location = self.get_location(job.location.unique_key)
182182
return job
183183

184-
def _get_run(self, run_key: tuple) -> RunDTO:
184+
def get_run(self, run_key: tuple) -> RunDTO:
185185
run = self._runs[run_key]
186-
run.job = self._get_job(run.job.unique_key)
186+
run.job = self.get_job(run.job.unique_key)
187187
if run.parent_run:
188-
run.parent_run = self._get_run(run.parent_run.unique_key)
188+
run.parent_run = self.get_run(run.parent_run.unique_key)
189189
if run.user:
190-
run.user = self._get_user(run.user.unique_key)
190+
run.user = self.get_user(run.user.unique_key)
191191
return run
192192

193-
def _get_operation(self, operation_key: tuple) -> OperationDTO:
193+
def get_operation(self, operation_key: tuple) -> OperationDTO:
194194
operation = self._operations[operation_key]
195-
operation.run = self._get_run(operation.run.unique_key)
195+
operation.run = self.get_run(operation.run.unique_key)
196196
return operation
197197

198-
def _get_input(self, input_key: tuple) -> InputDTO:
198+
def get_input(self, input_key: tuple) -> InputDTO:
199199
input_ = self._inputs[input_key]
200-
input_.operation = self._get_operation(input_.operation.unique_key)
201-
input_.dataset = self._get_dataset(input_.dataset.unique_key)
200+
input_.operation = self.get_operation(input_.operation.unique_key)
201+
input_.dataset = self.get_dataset(input_.dataset.unique_key)
202202
if input_.schema:
203-
input_.schema = self._get_schema(input_.schema.unique_key)
203+
input_.schema = self.get_schema(input_.schema.unique_key)
204204
return input_
205205

206-
def _get_output(self, output_key: tuple) -> OutputDTO:
206+
def get_output(self, output_key: tuple) -> OutputDTO:
207207
output = self._outputs[output_key]
208-
output.operation = self._get_operation(output.operation.unique_key)
209-
output.dataset = self._get_dataset(output.dataset.unique_key)
208+
output.operation = self.get_operation(output.operation.unique_key)
209+
output.dataset = self.get_dataset(output.dataset.unique_key)
210210
if output.schema:
211-
output.schema = self._get_schema(output.schema.unique_key)
211+
output.schema = self.get_schema(output.schema.unique_key)
212212
return output
213213

214-
def _get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO:
214+
def get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO:
215215
lineage = self._column_lineage[output_key]
216-
lineage.operation = self._get_operation(lineage.operation.unique_key)
217-
lineage.source_dataset = self._get_dataset(lineage.source_dataset.unique_key)
218-
lineage.target_dataset = self._get_dataset(lineage.target_dataset.unique_key)
216+
lineage.operation = self.get_operation(lineage.operation.unique_key)
217+
lineage.source_dataset = self.get_dataset(lineage.source_dataset.unique_key)
218+
lineage.target_dataset = self.get_dataset(lineage.target_dataset.unique_key)
219219
return lineage
220220

221221
def locations(self) -> list[LocationDTO]:
222-
return list(map(self._get_location, self._locations))
222+
return list(map(self.get_location, self._locations))
223223

224224
def datasets(self) -> list[DatasetDTO]:
225-
return list(map(self._get_dataset, self._datasets))
225+
return list(map(self.get_dataset, self._datasets))
226226

227227
def dataset_symlinks(self) -> list[DatasetSymlinkDTO]:
228-
return list(map(self._get_dataset_symlink, self._dataset_symlinks))
228+
return list(map(self.get_dataset_symlink, self._dataset_symlinks))
229229

230230
def jobs(self) -> list[JobDTO]:
231-
return list(map(self._get_job, self._jobs))
231+
return list(map(self.get_job, self._jobs))
232232

233233
def runs(self) -> list[RunDTO]:
234-
return list(map(self._get_run, self._runs))
234+
return list(map(self.get_run, self._runs))
235235

236236
def operations(self) -> list[OperationDTO]:
237-
return list(map(self._get_operation, self._operations))
237+
return list(map(self.get_operation, self._operations))
238238

239239
def inputs(self) -> list[InputDTO]:
240-
return list(map(self._get_input, self._inputs))
240+
return list(map(self.get_input, self._inputs))
241241

242242
def outputs(self) -> list[OutputDTO]:
243-
return list(map(self._get_output, self._outputs))
243+
return list(map(self.get_output, self._outputs))
244244

245245
def column_lineage(self) -> list[ColumnLineageDTO]:
246-
return list(map(self._get_column_lineage, self._column_lineage))
246+
return list(map(self.get_column_lineage, self._column_lineage))
247247

248248
def schemas(self) -> list[SchemaDTO]:
249-
return list(map(self._get_schema, self._schemas))
249+
return list(map(self.get_schema, self._schemas))
250250

251251
def users(self) -> list[UserDTO]:
252-
return list(map(self._get_user, self._users))
252+
return list(map(self.get_user, self._users))
253253

254254

255255
def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
256256
result = BatchExtractionResult()
257+
dataset_cache: dict[tuple[str, str], DatasetDTO] = {}
257258

258259
for event in events:
259260
if event.job.facets.jobType and event.job.facets.jobType.jobType == OpenLineageJobType.JOB:
260261
operation = extract_operation(event)
261262
result.add_operation(operation)
263+
262264
for input_dataset in event.inputs:
263-
input_, symlinks = extract_input(operation, input_dataset)
264-
result.add_input(input_)
265-
for symlink in symlinks:
266-
result.add_dataset_symlink(symlink)
265+
input_dto, symlink_dtos = extract_input(operation, input_dataset)
266+
267+
result.add_input(input_dto)
268+
dataset_dto_cache_key = (input_dataset.namespace, input_dataset.name)
269+
dataset_cache[dataset_dto_cache_key] = result.get_dataset(input_dto.dataset.unique_key)
270+
271+
for symlink_dto in symlink_dtos:
272+
result.add_dataset_symlink(symlink_dto)
267273

268274
for output_dataset in event.outputs:
269-
output, symlinks = extract_output(operation, output_dataset)
270-
result.add_output(output)
271-
for symlink in symlinks:
272-
result.add_dataset_symlink(symlink)
275+
output_dto, symlink_dtos = extract_output(operation, output_dataset)
276+
277+
result.add_output(output_dto)
278+
dataset_dto_cache_key = (output_dataset.namespace, output_dataset.name)
279+
dataset_cache[dataset_dto_cache_key] = result.get_dataset(output_dto.dataset.unique_key)
280+
281+
for symlink_dto in symlink_dtos:
282+
result.add_dataset_symlink(symlink_dto)
273283

274284
for dataset in event.inputs + event.outputs:
275-
column_lineage = extract_column_lineage(operation, dataset)
285+
column_lineage = extract_column_lineage(operation, dataset, dataset_cache)
276286
for item in column_lineage:
277287
result.add_column_lineage(item)
278288

data_rentgen/consumer/extractors/column_lineage.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@
44
import logging
55
from collections import defaultdict
66

7-
from data_rentgen.consumer.extractors.dataset import extract_dataset
7+
from data_rentgen.consumer.extractors.dataset import extract_dataset_ref
88
from data_rentgen.consumer.openlineage.dataset import OpenLineageDataset
99
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
10+
OpenLineageColumnLineageDatasetFacetFieldRef,
1011
OpenLineageColumnLineageDatasetFacetFieldTransformation,
1112
)
1213
from data_rentgen.dto import (
1314
ColumnLineageDTO,
1415
DatasetColumnRelationDTO,
1516
DatasetColumnRelationTypeDTO,
1617
)
18+
from data_rentgen.dto.dataset import DatasetDTO
1719
from data_rentgen.dto.operation import OperationDTO
1820

1921
logger = logging.getLogger(__name__)
@@ -49,11 +51,29 @@ def extract_dataset_column_relation_type(
4951
return result or DatasetColumnRelationTypeDTO.UNKNOWN
5052

5153

52-
def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageDataset) -> list[ColumnLineageDTO]:
53-
target_dataset_dto = extract_dataset(target_dataset)
54+
def resolve_dataset_ref(
55+
dataset_ref: OpenLineageDataset | OpenLineageColumnLineageDatasetFacetFieldRef,
56+
dataset_dto_cache: dict[tuple[str, str], DatasetDTO],
57+
):
58+
# extracting dataset for every column is expensive. cache it as much as we can
59+
dataset_cache_key = (dataset_ref.namespace, dataset_ref.name)
60+
if dataset_cache_key not in dataset_dto_cache:
61+
# https://github.com/OpenLineage/OpenLineage/issues/2938#issuecomment-2320377260
62+
dataset_dto_cache[dataset_cache_key] = extract_dataset_ref(dataset_ref)
63+
return dataset_dto_cache[dataset_cache_key]
64+
65+
66+
def extract_column_lineage(
67+
operation: OperationDTO,
68+
target_dataset: OpenLineageDataset,
69+
dataset_cache: dict[tuple[str, str], DatasetDTO] | None = None,
70+
) -> list[ColumnLineageDTO]:
5471
if not target_dataset.facets.columnLineage:
5572
return []
5673

74+
dataset_cache = dataset_cache or {}
75+
target_dataset_dto = resolve_dataset_ref(target_dataset, dataset_cache)
76+
5777
# Grouping column lineage by source+target dataset. This is unique combination within operation,
5878
# so we can use it to generate the same fingerprint for all dataset column relations
5979
datasets = {target_dataset_dto.unique_key: target_dataset_dto}
@@ -62,7 +82,7 @@ def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageD
6282
# direct lineage (source_column -> target_column)
6383
for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items():
6484
for input_field in raw_column_lineage.inputFields:
65-
source_dataset_dto = extract_dataset(input_field)
85+
source_dataset_dto = resolve_dataset_ref(input_field, dataset_cache)
6686
datasets[source_dataset_dto.unique_key] = source_dataset_dto
6787

6888
column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
@@ -84,7 +104,7 @@ def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageD
84104
# indirect lineage (source_column -> target_dataset),
85105
# added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true
86106
for input_field in target_dataset.facets.columnLineage.dataset:
87-
source_dataset_dto = extract_dataset(input_field)
107+
source_dataset_dto = resolve_dataset_ref(input_field, dataset_cache)
88108
datasets[source_dataset_dto.unique_key] = source_dataset_dto
89109

90110
column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)

data_rentgen/consumer/extractors/dataset.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,28 @@ def connect_dataset_with_symlinks(
5858
return sorted(result, key=lambda x: x.type)
5959

6060

61-
def extract_dataset(dataset: OpenLineageDatasetLike) -> DatasetDTO:
62-
name_with_partitions = PARTITION_PATH_PATTERN.match(dataset.name)
63-
name = name_with_partitions.group(1) if name_with_partitions else dataset.name
61+
def strip_partitions_from_path(name: str):
62+
# convert /some/long/path/with=partition/another=abc to /some/long/path
63+
if "=" not in name or "/" not in name:
64+
return name
65+
66+
name_with_partitions = PARTITION_PATH_PATTERN.match(name)
67+
return name_with_partitions.group(1) if name_with_partitions else name
68+
69+
70+
def extract_dataset_ref(dataset: OpenLineageDatasetLike) -> DatasetDTO:
6471
return DatasetDTO(
65-
name=name,
72+
name=strip_partitions_from_path(dataset.name),
6673
location=extract_dataset_location(dataset),
67-
format=extract_dataset_format(dataset),
6874
)
6975

7076

77+
def extract_dataset(dataset: OpenLineageDataset) -> DatasetDTO:
78+
dataset_dto = extract_dataset_ref(dataset)
79+
dataset_dto.format = extract_dataset_format(dataset)
80+
return dataset_dto
81+
82+
7183
def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDTO, list[DatasetSymlinkDTO]]:
7284
dataset_dto = extract_dataset(dataset)
7385
if not dataset.facets.symlinks:
@@ -91,7 +103,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT
91103
"Only the first one will be used for replacement. Symlink name: %s",
92104
table_symlinks[0].name,
93105
)
94-
table_dataset_dto = extract_dataset(table_symlinks[0])
106+
table_dataset_dto = extract_dataset_ref(table_symlinks[0])
95107
return (
96108
table_dataset_dto,
97109
connect_dataset_with_symlinks(
@@ -103,7 +115,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT
103115

104116
symlinks = []
105117
for symlink_identifier in dataset.facets.symlinks.identifiers:
106-
symlink_dto = extract_dataset(symlink_identifier)
118+
symlink_dto = extract_dataset_ref(symlink_identifier)
107119
symlinks.extend(
108120
connect_dataset_with_symlinks(
109121
dataset_dto,
@@ -134,10 +146,7 @@ def extract_dataset_location(dataset: OpenLineageDatasetLike) -> LocationDTO:
134146
)
135147

136148

137-
def extract_dataset_format(dataset: OpenLineageDatasetLike) -> str | None:
138-
if isinstance(dataset, (OpenLineageSymlinkIdentifier, OpenLineageColumnLineageDatasetFacetFieldRef)):
139-
return None
140-
149+
def extract_dataset_format(dataset: OpenLineageDataset) -> str | None:
141150
match dataset.facets.storage:
142151
case OpenLineageStorageDatasetFacet(storageLayer="default", fileFormat=file_format):
143152
# See https://github.com/OpenLineage/OpenLineage/issues/2770

data_rentgen/consumer/extractors/job.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@
88
from data_rentgen.dto import JobDTO, JobTypeDTO, LocationDTO
99

1010

11-
def extract_job(job: OpenLineageJob | OpenLineageParentJob) -> JobDTO:
11+
def extract_parent_job(job: OpenLineageParentJob) -> JobDTO:
12+
return JobDTO(
13+
name=job.name,
14+
location=extract_job_location(job),
15+
)
16+
17+
18+
def extract_job(job: OpenLineageJob) -> JobDTO:
1219
return JobDTO(
1320
name=job.name,
1421
location=extract_job_location(job),
@@ -27,8 +34,8 @@ def extract_job_location(job: OpenLineageJob | OpenLineageParentJob) -> Location
2734
)
2835

2936

30-
def extract_job_type(job: OpenLineageJob | OpenLineageParentJob) -> JobTypeDTO | None:
31-
if isinstance(job, OpenLineageJob) and job.facets.jobType:
37+
def extract_job_type(job: OpenLineageJob) -> JobTypeDTO | None:
38+
if job.facets.jobType:
3239
job_type = job.facets.jobType.jobType
3340
integration_type = job.facets.jobType.integration
3441
return JobTypeDTO(f"{integration_type}_{job_type}")

data_rentgen/consumer/extractors/operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from data_rentgen.consumer.extractors.run import extract_run_minimal
4+
from data_rentgen.consumer.extractors.run import extract_parent_run
55
from data_rentgen.consumer.openlineage.run_event import (
66
OpenLineageRunEvent,
77
OpenLineageRunEventType,
@@ -11,7 +11,7 @@
1111

1212
def extract_operation(event: OpenLineageRunEvent) -> OperationDTO:
1313
# operation always has parent
14-
run = extract_run_minimal(event.run.facets.parent) # type: ignore[arg-type]
14+
run = extract_parent_run(event.run.facets.parent) # type: ignore[arg-type]
1515

1616
# in some cases, operation name may contain raw SELECT query with newlines
1717
operation_name = " ".join(line.strip() for line in event.job.name.splitlines()).strip()

0 commit comments

Comments
 (0)