Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ repos:
rev: 0.8.19
hooks:
- id: uv-lock
args: [--prerelease=allow]
# https://github.com/astral-sh/uv/issues/10176
args: [--prerelease=allow, --locked]

- repo: local
hooks:
Expand Down
7 changes: 7 additions & 0 deletions data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,20 @@ async def paginate(
page_size: int,
job_ids: Collection[int],
search_query: str | None,
location_id: int | None,
job_type: Collection[str],
) -> PaginationDTO[Job]:
where = []
if job_ids:
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]

query: Select | CompoundSelect
order_by: list[ColumnElement | SQLColumnExpression]
if job_type:
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
if location_id:
where.append(Job.location_id == location_id) # type: ignore[arg-type]

if search_query:
tsquery = make_tsquery(search_query)

Expand Down
6 changes: 6 additions & 0 deletions data_rentgen/db/repositories/job_type.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Sequence

from sqlalchemy import (
any_,
Expand Down Expand Up @@ -39,6 +40,11 @@ async def create(self, job_type_dto: JobTypeDTO) -> JobType:
await self._lock("job_type", job_type_dto.type)
return await self._get(job_type_dto) or await self._create(job_type_dto)

async def get_job_types(self) -> Sequence[str]:
query = select(JobType.type).distinct(JobType.type)
result = await self._session.scalars(query)
return result.all()

async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None:
return await self._session.scalar(get_one_query, {"type": job_type_dto.type})

Expand Down
16 changes: 15 additions & 1 deletion data_rentgen/server/api/v1/router/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
JobDetailedResponseV1,
JobLineageQueryV1,
JobPaginateQueryV1,
JobTypesResponseV1,
LineageResponseV1,
PageResponseV1,
)
Expand All @@ -21,7 +22,9 @@
router = APIRouter(
prefix="/jobs",
tags=["Jobs"],
responses=get_error_responses(include={NotAuthorizedSchema, NotAuthorizedRedirectSchema, InvalidRequestSchema}),
responses=get_error_responses(
include={NotAuthorizedSchema, NotAuthorizedRedirectSchema, InvalidRequestSchema},
),
)


Expand All @@ -36,6 +39,8 @@ async def paginate_jobs(
page_size=query_args.page_size,
job_ids=query_args.job_id,
search_query=query_args.search_query,
location_id=query_args.location_id,
job_type=query_args.job_type,
)
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)

Expand All @@ -57,3 +62,12 @@ async def get_jobs_lineage(
)

return build_lineage_response(lineage)


@router.get("/job-types", summary="Get distinct types of Jobs")
async def get_job_types(
job_service: Annotated[JobService, Depends()],
current_user: Annotated[User, Depends(get_user())],
) -> JobTypesResponseV1:
job_types = await job_service.get_job_types()
return JobTypesResponseV1(job_types=sorted(job_types))
2 changes: 2 additions & 0 deletions data_rentgen/server/schemas/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
JobDetailedResponseV1,
JobPaginateQueryV1,
JobResponseV1,
JobTypesResponseV1,
)
from data_rentgen.server.schemas.v1.lineage import (
ColumnLineageInteractionTypeV1,
Expand Down Expand Up @@ -87,6 +88,7 @@
"JobLineageQueryV1",
"JobPaginateQueryV1",
"JobResponseV1",
"JobTypesResponseV1",
"LineageDirectionV1",
"LineageEntityKindV1",
"LineageEntityV1",
Expand Down
16 changes: 16 additions & 0 deletions data_rentgen/server/schemas/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class JobDetailedResponseV1(BaseModel):
model_config = ConfigDict(from_attributes=True)


class JobTypesResponseV1(BaseModel):
"""JobTypes"""

job_types: list[str] = Field(description="List of distinct job types")

model_config = ConfigDict(from_attributes=True)


class JobPaginateQueryV1(PaginateQueryV1):
"""Query params for Jobs paginate request."""

Expand All @@ -35,5 +43,13 @@ class JobPaginateQueryV1(PaginateQueryV1):
min_length=3,
description="Search query",
)
job_type: list[str] = Field(
default_factory=list,
description="Specify job types",
)
location_id: int | None = Field(
default=None,
description="The location id which jobs belong",
)

model_config = ConfigDict(extra="forbid")
9 changes: 8 additions & 1 deletion data_rentgen/server/services/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Collection
from collections.abc import Collection, Sequence
from dataclasses import dataclass
from typing import Annotated

Expand Down Expand Up @@ -31,12 +31,16 @@ async def paginate(
page_size: int,
job_ids: Collection[int],
search_query: str | None,
location_id: int | None,
job_type: Collection[str],
) -> JobServicePaginatedResult:
pagination = await self._uow.job.paginate(
page=page,
page_size=page_size,
job_ids=job_ids,
search_query=search_query,
location_id=location_id,
job_type=job_type,
)

return JobServicePaginatedResult(
Expand All @@ -45,3 +49,6 @@ async def paginate(
total_count=pagination.total_count,
items=[JobServiceResult(id=job.id, data=job) for job in pagination.items],
)

async def get_job_types(self) -> Sequence[str]:
return await self._uow.job_type.get_job_types()
5 changes: 5 additions & 0 deletions docs/changelog/next_release/319.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add new filters for ``/v1/jobs`` endpoint.
- ``location_id: int`` id to filter by specific location
- ``job_type: list[str]`` filter by job's type

Add new endpoint GET ``/v1/jobs/job_type`` - to get distinct job types from DataRentgen.
63 changes: 59 additions & 4 deletions tests/test_server/fixtures/factories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ async def jobs_search(
await create_location(
async_session,
location_kwargs=kwargs,
address_kwargs={"urls": [random_string(32), random_string(32)]}, # Each location has 2 addresses
# Each location has 2 addresses
address_kwargs={
"urls": [random_string(32), random_string(32)],
},
)
for kwargs in location_kwargs
]
Expand All @@ -170,7 +173,9 @@ async def jobs_search(
await create_location(
async_session,
location_kwargs={"type": "random"},
address_kwargs={"urls": [random_string(32), random_string(32)]},
address_kwargs={
"urls": [random_string(32), random_string(32)],
},
)
for _ in range(3)
]
Expand Down Expand Up @@ -207,11 +212,61 @@ async def jobs_search(

jobs_by_name = {job.name: job for job in jobs_with_names}
jobs_by_location = dict(
zip([location.name for location in locations_with_names], jobs_with_location_names, strict=False),
zip(
[location.name for location in locations_with_names],
jobs_with_location_names,
strict=False,
),
)
jobs_by_address = dict(
zip(
addresses_url,
[job for job in jobs_with_address_urls for _ in range(2)],
strict=False,
),
)
jobs_by_address = dict(zip(addresses_url, [job for job in jobs_with_address_urls for _ in range(2)], strict=False))

yield jobs_by_name, jobs_by_location, jobs_by_address

async with async_session_maker() as async_session:
await clean_db(async_session)


@pytest_asyncio.fixture
async def jobs_with_locations_and_types(
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
) -> AsyncGenerator[tuple[Job]]:
async with async_session_maker() as async_session:
cluster_location = await create_location(async_session, location_kwargs={"name": "my-cluster", "type": "yarn"})
airflow_location = await create_location(
async_session,
location_kwargs={"name": "airflow-host", "type": "http"},
)
cluster_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
cluster_job = await create_job(
async_session,
location_id=cluster_location.id,
job_type_id=cluster_type.id,
job_kwargs={"name": "my-job_cluster"},
)
dag_job = await create_job(
async_session,
location_id=airflow_location.id,
job_type_id=airflow_dag_type.id,
job_kwargs={"name": "my-job_dag"},
)
task_job = await create_job(
async_session,
location_id=airflow_location.id,
job_type_id=airflow_task_type.id,
job_kwargs={"name": "my-job_task"},
)

async_session.expunge_all()

yield (cluster_job, dag_job, task_job)

async with async_session_maker() as async_session:
await clean_db(async_session)
Loading
Loading