Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,4 @@ ci:
- mypy # checked with Github Actions
- docker-compose-check # cannot run on pre-commit.ci
- chmod # failing in pre-commit.ci
- uv-lock # failing in pre-commit.ci in cause of https://github.com/astral-sh/uv/issues/10167
7 changes: 7 additions & 0 deletions data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,20 @@ async def paginate(
page_size: int,
job_ids: Collection[int],
search_query: str | None,
location_id: int | None,
job_type: Collection[str],
) -> PaginationDTO[Job]:
where = []
if job_ids:
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]

query: Select | CompoundSelect
order_by: list[ColumnElement | SQLColumnExpression]
if job_type:
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
if location_id:
where.append(Job.location_id == location_id) # type: ignore[arg-type]

if search_query:
tsquery = make_tsquery(search_query)

Expand Down
6 changes: 6 additions & 0 deletions data_rentgen/db/repositories/job_type.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Sequence

from sqlalchemy import (
any_,
Expand Down Expand Up @@ -39,6 +40,11 @@ async def create(self, job_type_dto: JobTypeDTO) -> JobType:
await self._lock("job_type", job_type_dto.type)
return await self._get(job_type_dto) or await self._create(job_type_dto)

async def get_job_types(self) -> Sequence[str]:
query = select(JobType.type).distinct(JobType.type)
result = await self._session.scalars(query)
return result.all()

async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None:
return await self._session.scalar(get_one_query, {"type": job_type_dto.type})

Expand Down
20 changes: 19 additions & 1 deletion data_rentgen/server/api/v1/router/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
JobDetailedResponseV1,
JobLineageQueryV1,
JobPaginateQueryV1,
JobTypesResponseV1,
LineageResponseV1,
PageResponseV1,
)
Expand All @@ -21,7 +22,13 @@
router = APIRouter(
prefix="/jobs",
tags=["Jobs"],
responses=get_error_responses(include={NotAuthorizedSchema, NotAuthorizedRedirectSchema, InvalidRequestSchema}),
responses=get_error_responses(
include={
NotAuthorizedSchema,
NotAuthorizedRedirectSchema,
InvalidRequestSchema,
},
),
)


Expand All @@ -36,6 +43,8 @@ async def paginate_jobs(
page_size=query_args.page_size,
job_ids=query_args.job_id,
search_query=query_args.search_query,
location_id=query_args.location_id,
job_type=query_args.job_type,
)
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)

Expand All @@ -57,3 +66,12 @@ async def get_jobs_lineage(
)

return build_lineage_response(lineage)


@router.get("/types", summary="Get distinct types of Jobs")
async def get_job_types(
job_service: Annotated[JobService, Depends()],
current_user: Annotated[User, Depends(get_user())],
) -> JobTypesResponseV1:
job_types = await job_service.get_job_types()
return JobTypesResponseV1(job_types=sorted(job_types))
2 changes: 2 additions & 0 deletions data_rentgen/server/schemas/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
JobDetailedResponseV1,
JobPaginateQueryV1,
JobResponseV1,
JobTypesResponseV1,
)
from data_rentgen.server.schemas.v1.lineage import (
ColumnLineageInteractionTypeV1,
Expand Down Expand Up @@ -87,6 +88,7 @@
"JobLineageQueryV1",
"JobPaginateQueryV1",
"JobResponseV1",
"JobTypesResponseV1",
"LineageDirectionV1",
"LineageEntityKindV1",
"LineageEntityV1",
Expand Down
16 changes: 16 additions & 0 deletions data_rentgen/server/schemas/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class JobDetailedResponseV1(BaseModel):
model_config = ConfigDict(from_attributes=True)


class JobTypesResponseV1(BaseModel):
"""JobTypes"""

job_types: list[str] = Field(description="List of distinct job types")

model_config = ConfigDict(from_attributes=True)


class JobPaginateQueryV1(PaginateQueryV1):
"""Query params for Jobs paginate request."""

Expand All @@ -35,5 +43,13 @@ class JobPaginateQueryV1(PaginateQueryV1):
min_length=3,
description="Search query",
)
job_type: list[str] = Field(
default_factory=list,
description="Specify job types",
)
location_id: int | None = Field(
default=None,
description="The location id which jobs belong",
)

model_config = ConfigDict(extra="forbid")
9 changes: 8 additions & 1 deletion data_rentgen/server/services/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Collection
from collections.abc import Collection, Sequence
from dataclasses import dataclass
from typing import Annotated

Expand Down Expand Up @@ -31,12 +31,16 @@ async def paginate(
page_size: int,
job_ids: Collection[int],
search_query: str | None,
location_id: int | None,
job_type: Collection[str],
) -> JobServicePaginatedResult:
pagination = await self._uow.job.paginate(
page=page,
page_size=page_size,
job_ids=job_ids,
search_query=search_query,
location_id=location_id,
job_type=job_type,
)

return JobServicePaginatedResult(
Expand All @@ -45,3 +49,6 @@ async def paginate(
total_count=pagination.total_count,
items=[JobServiceResult(id=job.id, data=job) for job in pagination.items],
)

async def get_job_types(self) -> Sequence[str]:
return await self._uow.job_type.get_job_types()
5 changes: 5 additions & 0 deletions docs/changelog/next_release/319.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add new filters for ``/v1/jobs`` endpoint.
- ``location_id: int`` id to filter by specific location
- ``job_type: list[str]`` filter by job's type

Add new endpoint GET ``/v1/jobs/types`` - to get distinct job types from DataRentgen.
63 changes: 59 additions & 4 deletions tests/test_server/fixtures/factories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ async def jobs_search(
await create_location(
async_session,
location_kwargs=kwargs,
address_kwargs={"urls": [random_string(32), random_string(32)]}, # Each location has 2 addresses
# Each location has 2 addresses
address_kwargs={
"urls": [random_string(32), random_string(32)],
},
)
for kwargs in location_kwargs
]
Expand All @@ -170,7 +173,9 @@ async def jobs_search(
await create_location(
async_session,
location_kwargs={"type": "random"},
address_kwargs={"urls": [random_string(32), random_string(32)]},
address_kwargs={
"urls": [random_string(32), random_string(32)],
},
)
for _ in range(3)
]
Expand Down Expand Up @@ -207,11 +212,61 @@ async def jobs_search(

jobs_by_name = {job.name: job for job in jobs_with_names}
jobs_by_location = dict(
zip([location.name for location in locations_with_names], jobs_with_location_names, strict=False),
zip(
[location.name for location in locations_with_names],
jobs_with_location_names,
strict=False,
),
)
jobs_by_address = dict(
zip(
addresses_url,
[job for job in jobs_with_address_urls for _ in range(2)],
strict=False,
),
)
jobs_by_address = dict(zip(addresses_url, [job for job in jobs_with_address_urls for _ in range(2)], strict=False))

yield jobs_by_name, jobs_by_location, jobs_by_address

async with async_session_maker() as async_session:
await clean_db(async_session)


@pytest_asyncio.fixture
async def jobs_with_locations_and_types(
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
) -> AsyncGenerator[tuple[Job]]:
async with async_session_maker() as async_session:
cluster_location = await create_location(async_session, location_kwargs={"name": "my-cluster", "type": "yarn"})
airflow_location = await create_location(
async_session,
location_kwargs={"name": "airflow-host", "type": "http"},
)
cluster_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
cluster_job = await create_job(
async_session,
location_id=cluster_location.id,
job_type_id=cluster_type.id,
job_kwargs={"name": "my-job_cluster"},
)
dag_job = await create_job(
async_session,
location_id=airflow_location.id,
job_type_id=airflow_dag_type.id,
job_kwargs={"name": "my-job_dag"},
)
task_job = await create_job(
async_session,
location_id=airflow_location.id,
job_type_id=airflow_task_type.id,
job_kwargs={"name": "my-job_task"},
)

async_session.expunge_all()

yield (cluster_job, dag_job, task_job)

async with async_session_maker() as async_session:
await clean_db(async_session)
Loading
Loading