Skip to content

Commit 8dfdc63

Browse files
authored
[DOP-22532] Adding filters for Jobs (#319)
* [DOP-22532] adding filters for Jobs * [DOP-22532] change job type filter to list * [DOP-22532] add changelog and rebase * [DOP-22532] add --lock option for uv pre-commit hook * [DOP-22532] add --lock option for uv pre-commit hook * [DOP-22532] add --lock option for uv pre-commit hook * [DOP-22532] rename /job_type -> /types * [DOP-22532] offline option for pre-commit * [DOP-22532] turn off uv-lock in pre-commit.ci
1 parent 9ba07f0 commit 8dfdc63

File tree

10 files changed

+269
-7
lines changed

10 files changed

+269
-7
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,4 @@ ci:
113113
- mypy # checked with Github Actions
114114
- docker-compose-check # cannot run on pre-commit.ci
115115
- chmod # failing in pre-commit.ci
116+
- uv-lock # failing in pre-commit.ci in cause of https://github.com/astral-sh/uv/issues/10167

data_rentgen/db/repositories/job.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,20 @@ async def paginate(
7676
page_size: int,
7777
job_ids: Collection[int],
7878
search_query: str | None,
79+
location_id: int | None,
80+
job_type: Collection[str],
7981
) -> PaginationDTO[Job]:
8082
where = []
8183
if job_ids:
8284
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]
8385

8486
query: Select | CompoundSelect
8587
order_by: list[ColumnElement | SQLColumnExpression]
88+
if job_type:
89+
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
90+
if location_id:
91+
where.append(Job.location_id == location_id) # type: ignore[arg-type]
92+
8693
if search_query:
8794
tsquery = make_tsquery(search_query)
8895

data_rentgen/db/repositories/job_type.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from collections.abc import Sequence
34

45
from sqlalchemy import (
56
any_,
@@ -39,6 +40,11 @@ async def create(self, job_type_dto: JobTypeDTO) -> JobType:
3940
await self._lock("job_type", job_type_dto.type)
4041
return await self._get(job_type_dto) or await self._create(job_type_dto)
4142

43+
async def get_job_types(self) -> Sequence[str]:
44+
query = select(JobType.type).distinct(JobType.type)
45+
result = await self._session.scalars(query)
46+
return result.all()
47+
4248
async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None:
4349
return await self._session.scalar(get_one_query, {"type": job_type_dto.type})
4450

data_rentgen/server/api/v1/router/job.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JobDetailedResponseV1,
1313
JobLineageQueryV1,
1414
JobPaginateQueryV1,
15+
JobTypesResponseV1,
1516
LineageResponseV1,
1617
PageResponseV1,
1718
)
@@ -21,7 +22,13 @@
2122
router = APIRouter(
2223
prefix="/jobs",
2324
tags=["Jobs"],
24-
responses=get_error_responses(include={NotAuthorizedSchema, NotAuthorizedRedirectSchema, InvalidRequestSchema}),
25+
responses=get_error_responses(
26+
include={
27+
NotAuthorizedSchema,
28+
NotAuthorizedRedirectSchema,
29+
InvalidRequestSchema,
30+
},
31+
),
2532
)
2633

2734

@@ -36,6 +43,8 @@ async def paginate_jobs(
3643
page_size=query_args.page_size,
3744
job_ids=query_args.job_id,
3845
search_query=query_args.search_query,
46+
location_id=query_args.location_id,
47+
job_type=query_args.job_type,
3948
)
4049
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)
4150

@@ -57,3 +66,12 @@ async def get_jobs_lineage(
5766
)
5867

5968
return build_lineage_response(lineage)
69+
70+
71+
@router.get("/types", summary="Get distinct types of Jobs")
72+
async def get_job_types(
73+
job_service: Annotated[JobService, Depends()],
74+
current_user: Annotated[User, Depends(get_user())],
75+
) -> JobTypesResponseV1:
76+
job_types = await job_service.get_job_types()
77+
return JobTypesResponseV1(job_types=sorted(job_types))

data_rentgen/server/schemas/v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JobDetailedResponseV1,
1313
JobPaginateQueryV1,
1414
JobResponseV1,
15+
JobTypesResponseV1,
1516
)
1617
from data_rentgen.server.schemas.v1.lineage import (
1718
ColumnLineageInteractionTypeV1,
@@ -87,6 +88,7 @@
8788
"JobLineageQueryV1",
8889
"JobPaginateQueryV1",
8990
"JobResponseV1",
91+
"JobTypesResponseV1",
9092
"LineageDirectionV1",
9193
"LineageEntityKindV1",
9294
"LineageEntityV1",

data_rentgen/server/schemas/v1/job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ class JobDetailedResponseV1(BaseModel):
2626
model_config = ConfigDict(from_attributes=True)
2727

2828

29+
class JobTypesResponseV1(BaseModel):
30+
"""JobTypes"""
31+
32+
job_types: list[str] = Field(description="List of distinct job types")
33+
34+
model_config = ConfigDict(from_attributes=True)
35+
36+
2937
class JobPaginateQueryV1(PaginateQueryV1):
3038
"""Query params for Jobs paginate request."""
3139

@@ -35,5 +43,13 @@ class JobPaginateQueryV1(PaginateQueryV1):
3543
min_length=3,
3644
description="Search query",
3745
)
46+
job_type: list[str] = Field(
47+
default_factory=list,
48+
description="Specify job types",
49+
)
50+
location_id: int | None = Field(
51+
default=None,
52+
description="The location id which jobs belong",
53+
)
3854

3955
model_config = ConfigDict(extra="forbid")

data_rentgen/server/services/job.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
from collections.abc import Collection
3+
from collections.abc import Collection, Sequence
44
from dataclasses import dataclass
55
from typing import Annotated
66

@@ -31,12 +31,16 @@ async def paginate(
3131
page_size: int,
3232
job_ids: Collection[int],
3333
search_query: str | None,
34+
location_id: int | None,
35+
job_type: Collection[str],
3436
) -> JobServicePaginatedResult:
3537
pagination = await self._uow.job.paginate(
3638
page=page,
3739
page_size=page_size,
3840
job_ids=job_ids,
3941
search_query=search_query,
42+
location_id=location_id,
43+
job_type=job_type,
4044
)
4145

4246
return JobServicePaginatedResult(
@@ -45,3 +49,6 @@ async def paginate(
4549
total_count=pagination.total_count,
4650
items=[JobServiceResult(id=job.id, data=job) for job in pagination.items],
4751
)
52+
53+
async def get_job_types(self) -> Sequence[str]:
54+
return await self._uow.job_type.get_job_types()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Add new filters for ``/v1/jobs`` endpoint.
2+
- ``location_id: int`` id to filter by specific location
3+
- ``job_type: list[str]`` filter by job's type
4+
5+
Add new endpoint GET ``/v1/jobs/types`` - to get distinct job types from DataRentgen.

tests/test_server/fixtures/factories/job.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ async def jobs_search(
155155
await create_location(
156156
async_session,
157157
location_kwargs=kwargs,
158-
address_kwargs={"urls": [random_string(32), random_string(32)]}, # Each location has 2 addresses
158+
# Each location has 2 addresses
159+
address_kwargs={
160+
"urls": [random_string(32), random_string(32)],
161+
},
159162
)
160163
for kwargs in location_kwargs
161164
]
@@ -170,7 +173,9 @@ async def jobs_search(
170173
await create_location(
171174
async_session,
172175
location_kwargs={"type": "random"},
173-
address_kwargs={"urls": [random_string(32), random_string(32)]},
176+
address_kwargs={
177+
"urls": [random_string(32), random_string(32)],
178+
},
174179
)
175180
for _ in range(3)
176181
]
@@ -207,11 +212,61 @@ async def jobs_search(
207212

208213
jobs_by_name = {job.name: job for job in jobs_with_names}
209214
jobs_by_location = dict(
210-
zip([location.name for location in locations_with_names], jobs_with_location_names, strict=False),
215+
zip(
216+
[location.name for location in locations_with_names],
217+
jobs_with_location_names,
218+
strict=False,
219+
),
220+
)
221+
jobs_by_address = dict(
222+
zip(
223+
addresses_url,
224+
[job for job in jobs_with_address_urls for _ in range(2)],
225+
strict=False,
226+
),
211227
)
212-
jobs_by_address = dict(zip(addresses_url, [job for job in jobs_with_address_urls for _ in range(2)], strict=False))
213228

214229
yield jobs_by_name, jobs_by_location, jobs_by_address
215230

216231
async with async_session_maker() as async_session:
217232
await clean_db(async_session)
233+
234+
235+
@pytest_asyncio.fixture
236+
async def jobs_with_locations_and_types(
237+
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
238+
) -> AsyncGenerator[tuple[Job]]:
239+
async with async_session_maker() as async_session:
240+
cluster_location = await create_location(async_session, location_kwargs={"name": "my-cluster", "type": "yarn"})
241+
airflow_location = await create_location(
242+
async_session,
243+
location_kwargs={"name": "airflow-host", "type": "http"},
244+
)
245+
cluster_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
246+
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
247+
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
248+
cluster_job = await create_job(
249+
async_session,
250+
location_id=cluster_location.id,
251+
job_type_id=cluster_type.id,
252+
job_kwargs={"name": "my-job_cluster"},
253+
)
254+
dag_job = await create_job(
255+
async_session,
256+
location_id=airflow_location.id,
257+
job_type_id=airflow_dag_type.id,
258+
job_kwargs={"name": "my-job_dag"},
259+
)
260+
task_job = await create_job(
261+
async_session,
262+
location_id=airflow_location.id,
263+
job_type_id=airflow_task_type.id,
264+
job_kwargs={"name": "my-job_task"},
265+
)
266+
267+
async_session.expunge_all()
268+
269+
yield (cluster_job, dag_job, task_job)
270+
271+
async with async_session_maker() as async_session:
272+
await clean_db(async_session)

0 commit comments

Comments
 (0)