Skip to content

Commit db357ea

Browse files
committed
[DOP-33014] Remove job.tag_values not send by integration
1 parent 6cc9669 commit db357ea

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

data_rentgen/db/repositories/dataset.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ async def update(self, existing: Dataset, new: DatasetDTO) -> Dataset:
144144
for tag_value_dto in new.tag_values
145145
],
146146
)
147+
# unlike job tags, OL integrations may provide only part of dataset tags.
148+
# so we only insert them, but never delete old ones
147149
return existing
148150

149151
async def paginate(

data_rentgen/db/repositories/job.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
asc,
1616
bindparam,
1717
cast,
18+
delete,
1819
desc,
1920
distinct,
2021
func,
@@ -83,6 +84,10 @@
8384
)
8485
.on_conflict_do_nothing(index_elements=["job_id", "tag_value_id"])
8586
)
87+
delete_tag_value_query = delete(JobTagValue).where(
88+
JobTagValue.c.job_id == bindparam("job_id"),
89+
~(JobTagValue.c.tag_value_id == any_(bindparam("tag_value_ids"))),
90+
)
8691

8792

8893
class JobRepository(Repository[Job]):
@@ -232,7 +237,8 @@ async def update(self, existing: Job, new: JobDTO) -> Job:
232237
)
233238

234239
if not new.tag_values:
235-
# in cases when jobs have no tag values we can avoid INSERT statements
240+
# in case when jobs have no tag values we can avoid INSERT statements.
241+
# also parent jobs may have no tag values, so we skip updating them.
236242
return existing
237243

238244
# Lock to prevent inserting the same rows from multiple workers
@@ -247,6 +253,14 @@ async def update(self, existing: Job, new: JobDTO) -> Job:
247253
for tag_value_dto in new.tag_values
248254
],
249255
)
256+
257+
# To avoid accumulating too many tag values,
258+
# e.g. upgrading version of Airflow/Spark/OL/etc will keep both old and new version tags,
259+
# we keep only tags for the most recent job run.
260+
await self._session.execute(
261+
delete_tag_value_query,
262+
{"job_id": existing.id, "tag_value_ids": [tag_value_dto.id for tag_value_dto in new.tag_values]},
263+
)
250264
return existing
251265

252266
async def list_by_ids(self, job_ids: Collection[int]) -> list[Job]:

0 commit comments

Comments
 (0)