Skip to content

Commit a25bcae

Browse files
committed
[DOP-22363] Small LineageService updates
1 parent 8fc357e commit a25bcae

File tree

4 files changed

+23
-22
lines changed

4 files changed

+23
-22
lines changed

data_rentgen/db/repositories/column_lineage.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,17 @@ async def list_by_job_ids(
7474
job_ids: Sequence[int],
7575
since: datetime,
7676
until: datetime | None,
77-
source_ids: Sequence[int],
78-
target_ids: Sequence[int],
77+
source_dataset_ids: Sequence[int],
78+
target_dataset_ids: Sequence[int],
7979
):
8080
if not job_ids:
8181
return []
8282

8383
where = [
8484
ColumnLineage.created_at >= since,
8585
ColumnLineage.job_id == any_(job_ids), # type: ignore[arg-type]
86-
ColumnLineage.source_dataset_id == any_(source_ids), # type: ignore[arg-type]
87-
ColumnLineage.target_dataset_id == any_(target_ids), # type: ignore[arg-type]
86+
ColumnLineage.source_dataset_id == any_(source_dataset_ids), # type: ignore[arg-type]
87+
ColumnLineage.target_dataset_id == any_(target_dataset_ids), # type: ignore[arg-type]
8888
]
8989
if until:
9090
where.append(ColumnLineage.created_at <= until)
@@ -96,17 +96,17 @@ async def list_by_run_ids(
9696
run_ids: Sequence[UUID],
9797
since: datetime,
9898
until: datetime | None,
99-
source_ids: Sequence[int],
100-
target_ids: Sequence[int],
99+
source_dataset_ids: Sequence[int],
100+
target_dataset_ids: Sequence[int],
101101
):
102102
if not run_ids:
103103
return []
104104

105105
where = [
106106
ColumnLineage.created_at >= since,
107107
ColumnLineage.run_id == any_(run_ids), # type: ignore[arg-type]
108-
ColumnLineage.source_dataset_id == any_(source_ids), # type: ignore[arg-type]
109-
ColumnLineage.target_dataset_id == any_(target_ids), # type: ignore[arg-type]
108+
ColumnLineage.source_dataset_id == any_(source_dataset_ids), # type: ignore[arg-type]
109+
ColumnLineage.target_dataset_id == any_(target_dataset_ids), # type: ignore[arg-type]
110110
]
111111
if until:
112112
where.append(ColumnLineage.created_at <= until)
@@ -115,8 +115,8 @@ async def list_by_run_ids(
115115
async def list_by_operation_ids(
116116
self,
117117
operation_ids: Sequence[UUID],
118-
source_ids: Sequence[int],
119-
target_ids: Sequence[int],
118+
source_dataset_ids: Sequence[int],
119+
target_dataset_ids: Sequence[int],
120120
):
121121
if not operation_ids:
122122
return []
@@ -130,8 +130,8 @@ async def list_by_operation_ids(
130130
ColumnLineage.created_at >= min_created_at,
131131
ColumnLineage.created_at <= max_created_at,
132132
ColumnLineage.operation_id == any_(operation_ids), # type: ignore[arg-type]
133-
ColumnLineage.source_dataset_id == any_(source_ids), # type: ignore[arg-type]
134-
ColumnLineage.target_dataset_id == any_(target_ids), # type: ignore[arg-type]
133+
ColumnLineage.source_dataset_id == any_(source_dataset_ids), # type: ignore[arg-type]
134+
ColumnLineage.target_dataset_id == any_(target_dataset_ids), # type: ignore[arg-type]
135135
]
136136
return await self._get_column_lineage_with_column_relations(where)
137137

data_rentgen/server/services/lineage.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,29 +1100,32 @@ async def _get_column_lineage(
11001100
if not current_result.inputs or not current_result.outputs:
11011101
return result
11021102

1103+
source_dataset_ids = [input_.dataset_id for input_ in current_result.inputs.values()]
1104+
target_dataset_ids = [output.dataset_id for output in current_result.outputs.values()]
1105+
11031106
match granularity:
11041107
case "OPERATION":
11051108
column_lineage_result = await self._uow.column_lineage.list_by_operation_ids(
11061109
operation_ids=sorted(current_result.operations.keys()),
11071110
# return column lineage only for datasets included into response
1108-
source_ids=[input_.dataset_id for input_ in current_result.inputs.values()],
1109-
target_ids=[output.dataset_id for output in current_result.outputs.values()],
1111+
source_dataset_ids=source_dataset_ids,
1112+
target_dataset_ids=target_dataset_ids,
11101113
)
11111114
case "RUN":
11121115
column_lineage_result = await self._uow.column_lineage.list_by_run_ids(
11131116
run_ids=sorted(current_result.runs.keys()),
11141117
since=since,
11151118
until=until,
1116-
source_ids=[input_.dataset_id for input_ in current_result.inputs.values()],
1117-
target_ids=[output.dataset_id for output in current_result.outputs.values()],
1119+
source_dataset_ids=source_dataset_ids,
1120+
target_dataset_ids=target_dataset_ids,
11181121
)
11191122
case "JOB":
11201123
column_lineage_result = await self._uow.column_lineage.list_by_job_ids(
11211124
job_ids=sorted(current_result.jobs.keys()),
11221125
since=since,
11231126
until=until,
1124-
source_ids=[input_.dataset_id for input_ in current_result.inputs.values()],
1125-
target_ids=[output.dataset_id for output in current_result.outputs.values()],
1127+
source_dataset_ids=source_dataset_ids,
1128+
target_dataset_ids=target_dataset_ids,
11261129
)
11271130
case _:
11281131
msg = f"Unknown granularity for column lineage: {granularity}"

tests/test_server/fixtures/factories/dataset.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,8 @@ async def datasets_with_symlinks(
156156
await clean_db(async_session)
157157

158158

159-
@pytest_asyncio.fixture(params=[{}])
159+
@pytest_asyncio.fixture
160160
async def datasets_search(
161-
request: pytest.FixtureRequest,
162161
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
163162
) -> AsyncGenerator[tuple[dict[str, Dataset], dict[str, Dataset], dict[str, Dataset]], None]:
164163
"""

tests/test_server/fixtures/factories/job.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,8 @@ async def jobs(
9797
await clean_db(async_session)
9898

9999

100-
@pytest_asyncio.fixture(params=[{}])
100+
@pytest_asyncio.fixture
101101
async def jobs_search(
102-
request: pytest.FixtureRequest,
103102
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
104103
) -> AsyncGenerator[tuple[dict[str, Job], dict[str, Job], dict[str, Job]], None]:
105104
"""

0 commit comments

Comments
 (0)