Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions data_rentgen/db/repositories/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,18 @@ async def paginate(
dataset_ids: Collection[int],
tag_value_ids: Collection[int],
location_id: int | None,
location_type: Collection[str],
search_query: str | None,
) -> PaginationDTO[Dataset]:
where = []
location_join_clause = Location.id == Dataset.location_id
if dataset_ids:
where.append(Dataset.id == any_(list(dataset_ids))) # type: ignore[arg-type]

if location_id:
where.append(Dataset.location_id == location_id)
if location_type:
location_type_lower = [location_type.lower() for location_type in location_type]
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]

if tag_value_ids:
tv_ids = list(tag_value_ids)
Expand All @@ -125,19 +129,20 @@ async def paginate(
if search_query:
tsquery = make_tsquery(search_query)

dataset_stmt = select(Dataset, ts_rank(Dataset.search_vector, tsquery).label("search_rank")).where(
ts_match(Dataset.search_vector, tsquery),
*where,
dataset_stmt = (
select(Dataset, ts_rank(Dataset.search_vector, tsquery).label("search_rank"))
.join(Location, location_join_clause)
.where(ts_match(Dataset.search_vector, tsquery), *where)
)
location_stmt = (
select(Dataset, ts_rank(Location.search_vector, tsquery).label("search_rank"))
.join(Dataset, Location.id == Dataset.location_id)
.join(Location, location_join_clause)
.where(ts_match(Location.search_vector, tsquery), *where)
)
address_stmt = (
select(Dataset, func.max(ts_rank(Address.search_vector, tsquery)).label("search_rank"))
.join(Location, Address.location_id == Location.id)
.join(Dataset, Location.id == Dataset.location_id)
.join(Location, location_join_clause)
.join(Address, Address.location_id == Dataset.location_id)
.where(ts_match(Address.search_vector, tsquery), *where)
.group_by(Dataset.id, Location.id, Address.id)
)
Expand All @@ -152,7 +157,7 @@ async def paginate(
).group_by(*dataset_columns)
order_by = [desc("search_rank"), asc("name")]
else:
query = select(Dataset).where(*where)
query = select(Dataset).join(Location, location_join_clause).where(*where)
order_by = [Dataset.name]

options = [
Expand Down
25 changes: 15 additions & 10 deletions data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,40 @@ async def paginate(
job_ids: Collection[int],
search_query: str | None,
location_id: int | None,
location_type: Collection[str],
job_type: Collection[str],
) -> PaginationDTO[Job]:
where = []
location_join_clause = Location.id == Job.location_id
if job_ids:
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]

query: Select | CompoundSelect
order_by: list[ColumnElement | SQLColumnExpression]
if job_type:
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
if location_id:
where.append(Job.location_id == location_id) # type: ignore[arg-type]
if location_type:
location_type_lower = [location_type.lower() for location_type in location_type]
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]

query: Select | CompoundSelect
order_by: list[ColumnElement | SQLColumnExpression]
if search_query:
tsquery = make_tsquery(search_query)

job_stmt = select(Job, ts_rank(Job.search_vector, tsquery).label("search_rank")).where(
ts_match(Job.search_vector, tsquery),
*where,
job_stmt = (
select(Job, ts_rank(Job.search_vector, tsquery).label("search_rank"))
.join(Location, location_join_clause)
.where(ts_match(Job.search_vector, tsquery), *where)
)
location_stmt = (
select(Job, ts_rank(Location.search_vector, tsquery).label("search_rank"))
.join(Job, Location.id == Job.location_id)
.join(Location, location_join_clause)
.where(ts_match(Location.search_vector, tsquery), *where)
)
address_stmt = (
select(Job, func.max(ts_rank(Address.search_vector, tsquery).label("search_rank")))
.join(Location, Address.location_id == Location.id)
.join(Job, Location.id == Job.location_id)
.join(Location, location_join_clause)
.join(Address, Address.location_id == Job.location_id)
.where(ts_match(Address.search_vector, tsquery), *where)
.group_by(Job.id, Location.id, Address.id)
)
Expand All @@ -120,7 +125,7 @@ async def paginate(
).group_by(*job_columns)
order_by = [desc("search_rank"), asc("name")]
else:
query = select(Job).where(*where)
query = select(Job).join(Location, location_join_clause).where(*where)
order_by = [Job.name]

options = [selectinload(Job.location).selectinload(Location.addresses)]
Expand Down
5 changes: 3 additions & 2 deletions data_rentgen/db/repositories/job_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
JobType.type == bindparam("type"),
)

get_distinct_query = select(JobType.type).distinct(JobType.type).order_by(JobType.type)


class JobTypeRepository(Repository[JobType]):
async def fetch_bulk(self, job_types_dto: list[JobTypeDTO]) -> list[tuple[JobTypeDTO, JobType | None]]:
Expand All @@ -41,8 +43,7 @@ async def create(self, job_type_dto: JobTypeDTO) -> JobType:
return await self._get(job_type_dto) or await self._create(job_type_dto)

async def get_job_types(self) -> Sequence[str]:
query = select(JobType.type).distinct(JobType.type)
result = await self._session.scalars(query)
result = await self._session.scalars(get_distinct_query)
return result.all()

async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None:
Expand Down
12 changes: 8 additions & 4 deletions data_rentgen/db/repositories/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
.options(selectinload(Location.addresses))
)
get_distinct_query = select(Location.type).distinct(Location.type).order_by(Location.type)


class LocationRepository(Repository[Location]):
Expand All @@ -50,16 +51,15 @@ async def paginate(
page: int,
page_size: int,
location_ids: Collection[int],
location_type: str | None,
location_type: Collection[str],
search_query: str | None,
) -> PaginationDTO[Location]:
where = []

if location_ids:
where.append(Location.id == any_(list(location_ids))) # type: ignore[arg-type]

if location_type:
where.append(Location.type == location_type)
location_type_lower = [location_type.lower() for location_type in location_type]
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]

query: Select | CompoundSelect
order_by: list[ColumnElement | SQLColumnExpression]
Expand Down Expand Up @@ -161,3 +161,7 @@ async def _update_addresses(self, existing: Location, new: LocationDTO) -> Locat
existing.addresses.extend(addresses)
await self._session.flush([existing])
return existing

async def get_location_types(self):
scalars = await self._session.scalars(get_distinct_query)
return scalars.all()
1 change: 1 addition & 0 deletions data_rentgen/server/api/v1/router/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async def paginate_datasets(
dataset_ids=query_args.dataset_id,
tag_value_ids=query_args.tag_value_id,
location_id=query_args.location_id,
location_type=query_args.location_type,
search_query=query_args.search_query,
)
return PageResponseV1[DatasetDetailedResponseV1].from_pagination(pagination)
Expand Down
3 changes: 2 additions & 1 deletion data_rentgen/server/api/v1/router/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def paginate_jobs(
job_ids=query_args.job_id,
search_query=query_args.search_query,
location_id=query_args.location_id,
location_type=query_args.location_type,
job_type=query_args.job_type,
)
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)
Expand Down Expand Up @@ -74,4 +75,4 @@ async def get_job_types(
current_user: Annotated[User, Depends(get_user())],
) -> JobTypesResponseV1:
job_types = await job_service.get_job_types()
return JobTypesResponseV1(job_types=sorted(job_types))
return JobTypesResponseV1(job_types=list(job_types))
10 changes: 10 additions & 0 deletions data_rentgen/server/api/v1/router/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
PageResponseV1,
UpdateLocationRequestV1,
)
from data_rentgen.server.schemas.v1.location import LocationTypesResponseV1
from data_rentgen.server.services import LocationService, get_user

router = APIRouter(
Expand Down Expand Up @@ -57,3 +58,12 @@ async def update_location(
) -> LocationDetailedResponseV1:
location = await location_service.update_external_id(location_id, location_data.external_id)
return LocationDetailedResponseV1.model_validate(location)


@router.get("/types", summary="Get distinct types of Locations")
async def get_location_types(
location_service: Annotated[LocationService, Depends()],
current_user: Annotated[User, Depends(get_user())],
) -> LocationTypesResponseV1:
location_types = await location_service.get_location_types()
return LocationTypesResponseV1(location_types=list(location_types))
21 changes: 18 additions & 3 deletions data_rentgen/server/schemas/v1/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,28 @@ class DatasetDetailedResponseV1(BaseModel):
class DatasetPaginateQueryV1(PaginateQueryV1):
"""Query params for Dataset paginate request."""

dataset_id: list[int] = Field(default_factory=list, description="Dataset id")
tag_value_id: list[int] = Field(default_factory=list, description="Tag value id")
location_id: int | None = Field(default=None, description="Location id to filter dataset")
dataset_id: list[int] = Field(
default_factory=list,
description="Get specific datasets by their ids",
)
tag_value_id: list[int] = Field(
default_factory=list,
description="Get datasets with specific tag values (AND)",
)
location_id: int | None = Field(
default=None,
description="Get datasets by location id",
)
location_type: list[str] = Field(
default_factory=list,
description="Get datasets by location types",
examples=[["yarn"]],
)
search_query: str | None = Field(
default=None,
min_length=3,
description="Search query",
examples=[["my dataset"]],
)

model_config = ConfigDict(extra="forbid")
15 changes: 13 additions & 2 deletions data_rentgen/server/schemas/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ class JobDetailedResponseV1(BaseModel):


class JobTypesResponseV1(BaseModel):
"""JobTypes"""
"""Job types"""

job_types: list[str] = Field(description="List of distinct job types")
job_types: list[str] = Field(
description="List of distinct job types",
examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]],
)

model_config = ConfigDict(from_attributes=True)

Expand All @@ -42,14 +45,22 @@ class JobPaginateQueryV1(PaginateQueryV1):
default=None,
min_length=3,
description="Search query",
examples=["my job"],
)
job_type: list[str] = Field(
default_factory=list,
description="Specify job types",
examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]],
)
location_id: int | None = Field(
default=None,
description="The location id which jobs belong",
examples=[123],
)
location_type: list[str] = Field(
default_factory=list,
description="Specify location types",
examples=[["yarn"]],
)

model_config = ConfigDict(extra="forbid")
18 changes: 17 additions & 1 deletion data_rentgen/server/schemas/v1/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,31 @@ class LocationDetailedResponseV1(BaseModel):
model_config = ConfigDict(from_attributes=True)


class LocationTypesResponseV1(BaseModel):
"""Location types"""

location_types: list[str] = Field(
description="List of distinct location types",
examples=[["kafka", "hdfs", "yarn"]],
)

model_config = ConfigDict(from_attributes=True)


class LocationPaginateQueryV1(PaginateQueryV1):
"""Query params for Location paginate request."""

location_id: list[int] = Field(default_factory=list, description="Location id")
location_type: str | None = Field(default=None, description="Location type")
location_type: list[str] = Field(
default_factory=list,
description="Location type",
examples=[["kafka", "hdfs"], ["yarn"]],
)
search_query: str | None = Field(
default=None,
min_length=3,
description="Search query",
examples=[["localhost:8123"]],
)

model_config = ConfigDict(extra="forbid")
Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/server/services/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def paginate(
dataset_ids: Collection[int],
tag_value_ids: Collection[int],
location_id: int | None,
location_type: Collection[str],
search_query: str | None,
) -> DatasetServicePaginatedResult:
pagination = await self._uow.dataset.paginate(
Expand All @@ -51,6 +52,7 @@ async def paginate(
dataset_ids=dataset_ids,
tag_value_ids=tag_value_ids,
location_id=location_id,
location_type=location_type,
search_query=search_query,
)

Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/server/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async def paginate(
job_ids: Collection[int],
search_query: str | None,
location_id: int | None,
location_type: Collection[str],
job_type: Collection[str],
) -> JobServicePaginatedResult:
pagination = await self._uow.job.paginate(
Expand All @@ -40,6 +41,7 @@ async def paginate(
job_ids=job_ids,
search_query=search_query,
location_id=location_id,
location_type=location_type,
job_type=job_type,
)

Expand Down
7 changes: 5 additions & 2 deletions data_rentgen/server/services/location.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Collection
from collections.abc import Collection, Sequence
from dataclasses import dataclass
from typing import Annotated

Expand Down Expand Up @@ -62,7 +62,7 @@ async def paginate(
page: int,
page_size: int,
location_ids: Collection[int],
location_type: str | None,
location_type: Collection[str],
search_query: str | None,
) -> LocationServicePaginatedResult:
pagination = await self._uow.location.paginate(
Expand Down Expand Up @@ -109,3 +109,6 @@ async def update_external_id(
jobs=LocationServiceJobStatistics.from_row(job_stats.get(location.id)),
),
)

async def get_location_types(self) -> Sequence[str]:
return await self._uow.location.get_location_types()
1 change: 1 addition & 0 deletions docs/changelog/next_release/328.feature.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add new ``GET /v1/locations/types`` endpoint returning list of all known location types.
2 changes: 2 additions & 0 deletions docs/changelog/next_release/328.feature.2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add new filter to ``GET /v1/jobs``:
- location_type: ``list[str]``
2 changes: 2 additions & 0 deletions docs/changelog/next_release/328.feature.3.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add new filter to ``GET /v1/datasets``:
- location_type: ``list[str]``
1 change: 1 addition & 0 deletions docs/changelog/next_release/328.feature.4.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow passing multiple ``location_type`` filters to ``GET /v1/locations``.
Loading
Loading