Skip to content

Commit 39bdd6e

Browse files
committed
[DOP-22532] adding filters for Jobs
1 parent cc75570 commit 39bdd6e

File tree

8 files changed

+316
-2
lines changed

8 files changed

+316
-2
lines changed

data_rentgen/db/repositories/job.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,20 @@ async def paginate(
7676
page_size: int,
7777
job_ids: Collection[int],
7878
search_query: str | None,
79+
location_id: int | None,
80+
job_type: str | None,
7981
) -> PaginationDTO[Job]:
8082
where = []
8183
if job_ids:
8284
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]
8385

8486
query: Select | CompoundSelect
8587
order_by: list[ColumnElement | SQLColumnExpression]
88+
if job_type:
89+
where.append(Job.type == job_type) # type: ignore[arg-type]
90+
if location_id:
91+
where.append(Job.location_id == location_id) # type: ignore[arg-type]
92+
8693
if search_query:
8794
tsquery = make_tsquery(search_query)
8895

data_rentgen/db/repositories/job_type.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from collections.abc import Sequence
34

45
from sqlalchemy import (
56
any_,
@@ -39,6 +40,11 @@ async def create(self, job_type_dto: JobTypeDTO) -> JobType:
3940
await self._lock("job_type", job_type_dto.type)
4041
return await self._get(job_type_dto) or await self._create(job_type_dto)
4142

43+
async def get_job_types(self) -> Sequence[str]:
44+
query = select(JobType.type).distinct(JobType.type)
45+
result = await self._session.scalars(query)
46+
return result.all()
47+
4248
async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None:
4349
return await self._session.scalar(get_one_query, {"type": job_type_dto.type})
4450

data_rentgen/server/api/v1/router/job.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JobDetailedResponseV1,
1313
JobLineageQueryV1,
1414
JobPaginateQueryV1,
15+
JobTypesResponseV1,
1516
LineageResponseV1,
1617
PageResponseV1,
1718
)
@@ -36,6 +37,8 @@ async def paginate_jobs(
3637
page_size=query_args.page_size,
3738
job_ids=query_args.job_id,
3839
search_query=query_args.search_query,
40+
location_id=query_args.location_id,
41+
job_type=query_args.job_type,
3942
)
4043
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)
4144

@@ -57,3 +60,12 @@ async def get_jobs_lineage(
5760
)
5861

5962
return build_lineage_response(lineage)
63+
64+
65+
@router.get("/job_types", summary="Get distinct types of Jobs")
66+
async def get_job_types(
67+
job_service: Annotated[JobService, Depends()],
68+
current_user: Annotated[User, Depends(get_user())],
69+
) -> JobTypesResponseV1:
70+
job_types = await job_service.get_job_types()
71+
return JobTypesResponseV1(job_types=sorted(job_types))

data_rentgen/server/schemas/v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JobDetailedResponseV1,
1313
JobPaginateQueryV1,
1414
JobResponseV1,
15+
JobTypesResponseV1,
1516
)
1617
from data_rentgen.server.schemas.v1.lineage import (
1718
ColumnLineageInteractionTypeV1,
@@ -87,6 +88,7 @@
8788
"JobLineageQueryV1",
8889
"JobPaginateQueryV1",
8990
"JobResponseV1",
91+
"JobTypesResponseV1",
9092
"LineageDirectionV1",
9193
"LineageEntityKindV1",
9294
"LineageEntityV1",

data_rentgen/server/schemas/v1/job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ class JobDetailedResponseV1(BaseModel):
2626
model_config = ConfigDict(from_attributes=True)
2727

2828

29+
class JobTypesResponseV1(BaseModel):
30+
"""JobTypes"""
31+
32+
job_types: list[str] = Field(description="List of distinct job types")
33+
34+
model_config = ConfigDict(from_attributes=True)
35+
36+
2937
class JobPaginateQueryV1(PaginateQueryV1):
3038
"""Query params for Jobs paginate request."""
3139

@@ -35,5 +43,13 @@ class JobPaginateQueryV1(PaginateQueryV1):
3543
min_length=3,
3644
description="Search query",
3745
)
46+
job_type: str | None = Field(
47+
default=None,
48+
description="Filter for searching",
49+
)
50+
location_id: int | None = Field(
51+
default=None,
52+
description="",
53+
)
3854

3955
model_config = ConfigDict(extra="forbid")

data_rentgen/server/services/job.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
from collections.abc import Collection
3+
from collections.abc import Collection, Sequence
44
from dataclasses import dataclass
55
from typing import Annotated
66

@@ -31,12 +31,16 @@ async def paginate(
3131
page_size: int,
3232
job_ids: Collection[int],
3333
search_query: str | None,
34+
location_id: int | None,
35+
job_type: str | None,
3436
) -> JobServicePaginatedResult:
3537
pagination = await self._uow.job.paginate(
3638
page=page,
3739
page_size=page_size,
3840
job_ids=job_ids,
3941
search_query=search_query,
42+
location_id=location_id,
43+
job_type=job_type,
4044
)
4145

4246
return JobServicePaginatedResult(
@@ -45,3 +49,6 @@ async def paginate(
4549
total_count=pagination.total_count,
4650
items=[JobServiceResult(id=job.id, data=job) for job in pagination.items],
4751
)
52+
53+
async def get_job_types(self) -> Sequence[str]:
54+
return await self._uow.job_type.get_job_types()

tests/test_server/fixtures/factories/job.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,3 +215,70 @@ async def jobs_search(
215215

216216
async with async_session_maker() as async_session:
217217
await clean_db(async_session)
218+
219+
220+
@pytest_asyncio.fixture
221+
async def jobs_search_with_different_types(
222+
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
223+
) -> AsyncGenerator[tuple[Job]]:
224+
async with async_session_maker() as async_session:
225+
location = await create_location(async_session)
226+
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
227+
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
228+
229+
job_with_dag_type = await create_job(
230+
async_session,
231+
location_id=location.id,
232+
job_type_id=airflow_dag_type.id,
233+
job_kwargs={"name": "airflow-dag"},
234+
)
235+
job_with_task_type = await create_job(
236+
async_session,
237+
location_id=location.id,
238+
job_type_id=airflow_task_type.id,
239+
job_kwargs={"name": "airflow-task"},
240+
)
241+
async_session.expunge_all()
242+
243+
yield (job_with_dag_type, job_with_task_type)
244+
245+
async with async_session_maker() as async_session:
246+
await clean_db(async_session)
247+
248+
249+
@pytest_asyncio.fixture
250+
async def jobs_search_with_different_locations(
251+
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
252+
) -> AsyncGenerator[tuple[Job]]:
253+
async with async_session_maker() as async_session:
254+
cluster_location = await create_location(async_session, location_kwargs={"name": "my-cluster", "type": "yarn"})
255+
airflow_location = await create_location(
256+
async_session,
257+
location_kwargs={"name": "airflow-host", "type": "http"},
258+
)
259+
job_type = await create_job_type(async_session)
260+
cluster_job = await create_job(
261+
async_session,
262+
location_id=cluster_location.id,
263+
job_type_id=job_type.id,
264+
job_kwargs={"name": "my-job_cluster"},
265+
)
266+
dag_job = await create_job(
267+
async_session,
268+
location_id=airflow_location.id,
269+
job_type_id=job_type.id,
270+
job_kwargs={"name": "my-job_dag"},
271+
)
272+
task_job = await create_job(
273+
async_session,
274+
location_id=airflow_location.id,
275+
job_type_id=job_type.id,
276+
job_kwargs={"name": "my-job_task"},
277+
)
278+
279+
async_session.expunge_all()
280+
281+
yield (cluster_job, dag_job, task_job)
282+
283+
async with async_session_maker() as async_session:
284+
await clean_db(async_session)

0 commit comments

Comments
 (0)