Skip to content

Commit 194a730

Browse files
committed
[DOP-28871] Fix handling Spark unknown job name
1 parent 9b8b2cf commit 194a730

File tree

6 files changed

+31
-15
lines changed

6 files changed

+31
-15
lines changed

data_rentgen/dto/job.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
from copy import copy
67
from dataclasses import dataclass, field
78

89
from data_rentgen.dto.job_type import JobTypeDTO
@@ -22,14 +23,17 @@ def unique_key(self) -> tuple:
2223

2324
def merge(self, new: JobDTO) -> JobDTO:
2425
self.id = new.id or self.id
25-
self.location.merge(new.location)
26-
27-
if self.name == "unknown" and new.name != "unknown":
28-
# Workaround for https://github.com/OpenLineage/OpenLineage/issues/3846
29-
self.name = new.name
26+
self.location = self.location.merge(new.location)
3027

3128
if new.type and self.type:
32-
self.type.merge(new.type)
29+
self.type = self.type.merge(new.type)
3330
else:
3431
self.type = new.type or self.type
32+
33+
if self.name == "unknown" and new.name != "unknown":
34+
# Workaround for https://github.com/OpenLineage/OpenLineage/issues/3846
35+
result = copy(self)
36+
result.name = new.name
37+
return result
38+
3539
return self

data_rentgen/dto/operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ def unique_key(self) -> tuple:
5858
return (self.id,)
5959

6060
def merge(self, new: OperationDTO) -> OperationDTO:
61-
self.run.merge(new.run)
61+
self.run = self.run.merge(new.run)
6262
if self.sql_query and new.sql_query:
63-
self.sql_query.merge(new.sql_query)
63+
self.sql_query = self.sql_query.merge(new.sql_query)
6464
else:
6565
self.sql_query = new.sql_query or self.sql_query
6666

data_rentgen/dto/output.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ def generate_id(self) -> UUID:
6363
return generate_incremental_uuid(self.created_at, ".".join(id_components))
6464

6565
def merge(self, new: OutputDTO) -> OutputDTO:
66-
self.operation.merge(new.operation)
67-
self.dataset.merge(new.dataset)
66+
self.operation = self.operation.merge(new.operation)
67+
self.dataset = self.dataset.merge(new.dataset)
6868

6969
if self.schema and new.schema:
70-
self.schema.merge(new.schema)
70+
self.schema = self.schema.merge(new.schema)
7171
else:
7272
self.schema = new.schema or self.schema
7373

data_rentgen/dto/run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ def unique_key(self) -> tuple:
5353
return (self.id,)
5454

5555
def merge(self, new: RunDTO) -> RunDTO:
56-
self.job.merge(new.job)
56+
self.job = self.job.merge(new.job)
5757
if new.parent_run and self.parent_run:
58-
self.parent_run.merge(new.parent_run)
58+
self.parent_run = self.parent_run.merge(new.parent_run)
5959
else:
6060
self.parent_run = new.parent_run or self.parent_run
6161

6262
if new.user and self.user:
63-
self.user.merge(new.user)
63+
self.user = self.user.merge(new.user)
6464
else:
6565
self.user = new.user or self.user
6666

tests/test_consumer/test_extractors/fixtures/spark_dto.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ def extracted_spark_app_job(
4141
)
4242

4343

44+
@pytest.fixture
45+
def extracted_spark_unknown_job(
46+
extracted_spark_location: LocationDTO,
47+
) -> JobDTO:
48+
return JobDTO(
49+
name="unknown",
50+
location=extracted_spark_location,
51+
type=JobTypeDTO(type="SPARK_APPLICATION"),
52+
)
53+
54+
4455
@pytest.fixture
4556
def extracted_spark_app_run(
4657
extracted_spark_app_job: JobDTO,

tests/test_consumer/test_extractors/test_extractors_batch_spark.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def test_extractors_extract_batch_spark_openlineage_emitted_unknown_name(
100100
spark_operation_run_event_stop: OpenLineageRunEvent,
101101
extracted_spark_location: LocationDTO,
102102
extracted_spark_app_job: JobDTO,
103+
extracted_spark_unknown_job: JobDTO,
103104
extracted_user: UserDTO,
104105
extracted_spark_app_run: RunDTO,
105106
extracted_spark_operation: OperationDTO,
@@ -119,7 +120,7 @@ def test_extractors_extract_batch_spark_openlineage_emitted_unknown_name(
119120
extracted_spark_location,
120121
]
121122

122-
assert extracted.jobs() == [extracted_spark_app_job]
123+
assert extracted.jobs() == [extracted_spark_app_job, extracted_spark_unknown_job]
123124
assert extracted.users() == [extracted_user]
124125
assert extracted.runs() == [extracted_spark_app_run]
125126
assert extracted.operations() == [extracted_spark_operation]

0 commit comments

Comments
 (0)