Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions data_rentgen/db/repositories/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,18 @@ async def paginate(
page_size: int,
dataset_ids: Collection[int],
tag_value_ids: Collection[int],
location_id: int | None,
location_type: Collection[str],
location_ids: Collection[int],
location_types: Collection[str],
search_query: str | None,
) -> PaginationDTO[Dataset]:
where = []
location_join_clause = Location.id == Dataset.location_id
if dataset_ids:
where.append(Dataset.id == any_(list(dataset_ids))) # type: ignore[arg-type]
if location_id:
where.append(Dataset.location_id == location_id)
if location_type:
location_type_lower = [location_type.lower() for location_type in location_type]
if location_ids:
where.append(Dataset.location_id == any_(list(location_ids))) # type: ignore[arg-type]
if location_types:
location_type_lower = [location_type.lower() for location_type in location_types]
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]

if tag_value_ids:
Expand Down
18 changes: 9 additions & 9 deletions data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ async def paginate(
page: int,
page_size: int,
job_ids: Collection[int],
job_types: Collection[str],
location_ids: Collection[int],
location_types: Collection[str],
search_query: str | None,
location_id: int | None,
location_type: Collection[str],
job_type: Collection[str],
) -> PaginationDTO[Job]:
where = []
location_join_clause = Location.id == Job.location_id
if job_ids:
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]
if job_type:
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
if location_id:
where.append(Job.location_id == location_id) # type: ignore[arg-type]
if location_type:
location_type_lower = [location_type.lower() for location_type in location_type]
if job_types:
where.append(Job.type == any_(list(job_types))) # type: ignore[arg-type]
if location_ids:
where.append(Job.location_id == any_(list(location_ids))) # type: ignore[arg-type]
if location_types:
location_type_lower = [location_type.lower() for location_type in location_types]
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]

query: Select | CompoundSelect
Expand Down
8 changes: 4 additions & 4 deletions data_rentgen/db/repositories/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def paginate(
operation_ids: Collection[UUID],
since: datetime | None,
until: datetime | None,
run_id: UUID | None,
run_ids: Collection[UUID],
) -> PaginationDTO[Operation]:
# do not use `tuple_(Operation.created_at, Operation.id).in_(...),
# as this is too complex filter for Postgres to make an optimal query plan
Expand All @@ -119,12 +119,12 @@ async def paginate(
Operation.id == any_(list(operation_ids)), # type: ignore[arg-type]
]

elif run_id:
run_created_at = extract_timestamp_from_uuid(run_id)
elif run_ids:
run_created_at = extract_timestamp_from_uuid(min(run_ids))
# narrow created_at range
min_created_at = max(filter(None, [since, run_created_at]))
where = [
Operation.run_id == run_id,
Operation.run_id == any_(list(run_ids)), # type: ignore[arg-type]
Operation.created_at >= min_created_at,
Operation.id >= get_min_uuid(min_created_at),
]
Expand Down
56 changes: 29 additions & 27 deletions data_rentgen/db/repositories/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
union,
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import aliased, selectinload

from data_rentgen.db.models import Job, Run, RunStartReason, RunStatus, User
from data_rentgen.db.repositories.base import Repository
Expand Down Expand Up @@ -85,24 +85,24 @@


class RunRepository(Repository[Run]):
async def paginate( # noqa: PLR0912, C901
async def paginate( # noqa: PLR0912, C901, PLR0915
self,
page: int,
page_size: int,
since: datetime | None,
until: datetime | None,
run_ids: Collection[UUID],
job_id: int | None,
parent_run_id: UUID | None,
search_query: str | None,
job_type: Collection[str],
job_location_id: int | None,
status: Collection[RunStatus],
started_by_user: Collection[str] | None,
parent_run_ids: Collection[UUID],
job_ids: Collection[int],
job_types: Collection[str],
job_location_ids: Collection[int],
statuses: Collection[RunStatus],
started_by_users: Collection[str],
started_since: datetime | None,
started_until: datetime | None,
ended_since: datetime | None,
ended_until: datetime | None,
search_query: str | None,
) -> PaginationDTO[Run]:
# do not use `tuple_(Run.created_at, Run.id).in_(...),
# as this is too complex filter for Postgres to make an optimal query plan
Expand Down Expand Up @@ -135,12 +135,12 @@ async def paginate( # noqa: PLR0912, C901
Run.id <= get_max_uuid(until),
]

if job_id:
where.append(Run.job_id == job_id)
if parent_run_id:
where.append(Run.parent_run_id == parent_run_id)
if status:
where.append(Run.status == any_(status)) # type: ignore[arg-type]
if job_ids:
where.append(Run.job_id == any_(list(job_ids))) # type: ignore[arg-type]
if parent_run_ids:
where.append(Run.parent_run_id == any_(list(parent_run_ids))) # type: ignore[arg-type]
if statuses:
where.append(Run.status == any_(statuses)) # type: ignore[arg-type]
if started_since:
where.append(Run.started_at >= started_since)
if started_until:
Expand All @@ -159,10 +159,11 @@ async def paginate( # noqa: PLR0912, C901
ts_match(Run.search_vector, tsquery),
*where,
)
job_search = aliased(Job, name="job_search")
job_stmt = (
select(Run, ts_rank(Job.search_vector, tsquery).label("search_rank"))
.join(Job, Job.id == Run.job_id)
.where(ts_match(Job.search_vector, tsquery), *where)
select(Run, ts_rank(job_search.search_vector, tsquery).label("search_rank"))
.join(job_search, job_search.id == Run.job_id)
.where(ts_match(job_search.search_vector, tsquery), *where)
)

union_cte = union(run_stmt, job_stmt).cte()
Expand All @@ -179,20 +180,21 @@ async def paginate( # noqa: PLR0912, C901
query = select(Run).where(*where)
order_by = [Run.created_at.desc(), Run.id.desc()]

job_where = []
if job_type:
job_where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
if job_location_id is not None:
job_where.append(Job.location_id == job_location_id)
if job_where:
query = query.join(Job, and_(Run.job_id == Job.id, *job_where))
if job_types or job_location_ids:
job = aliased(Job, name="job_type")
query = query.join(job, and_(Run.job_id == job.id))
if job_types:
query = query.where(job.type == any_(list(job_types))) # type: ignore[arg-type]
if job_location_ids:
query = query.where(job.location_id == any_(list(job_location_ids))) # type: ignore[arg-type]

if started_by_user:
if started_by_users:
usernames_lower = [name.lower() for name in started_by_users]
query = query.join(
User,
and_(
Run.started_by_user_id == User.id,
func.lower(User.name) == any_([name.lower() for name in started_by_user]), # type: ignore[arg-type]
func.lower(User.name) == any_(usernames_lower), # type: ignore[arg-type]
),
)

Expand Down
4 changes: 2 additions & 2 deletions data_rentgen/server/api/v1/router/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ async def paginate_datasets(
page_size=query_args.page_size,
dataset_ids=query_args.dataset_id,
tag_value_ids=query_args.tag_value_id,
location_id=query_args.location_id,
location_type=query_args.location_type,
location_ids=query_args.location_id,
location_types=query_args.location_type,
search_query=query_args.search_query,
)
return PageResponseV1[DatasetDetailedResponseV1].from_pagination(pagination)
Expand Down
6 changes: 3 additions & 3 deletions data_rentgen/server/api/v1/router/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ async def paginate_jobs(
page=query_args.page,
page_size=query_args.page_size,
job_ids=query_args.job_id,
job_types=query_args.job_type,
location_ids=query_args.location_id,
location_types=query_args.location_type,
search_query=query_args.search_query,
location_id=query_args.location_id,
location_type=query_args.location_type,
job_type=query_args.job_type,
)
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)

Expand Down
2 changes: 1 addition & 1 deletion data_rentgen/server/api/v1/router/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def operations(
since=query_args.since,
until=query_args.until,
operation_ids=query_args.operation_id,
run_id=query_args.run_id,
run_ids=query_args.run_id,
)
return PageResponseV1[OperationDetailedResponseV1].from_pagination(pagination)

Expand Down
16 changes: 8 additions & 8 deletions data_rentgen/server/api/v1/router/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
PageResponseV1,
RunDetailedResponseV1,
RunLineageQueryV1,
RunsQueryV1,
RunsPaginateQueryV1,
)
from data_rentgen.server.services import LineageService, RunService, get_user
from data_rentgen.server.utils.lineage_response import build_lineage_response
Expand All @@ -26,7 +26,7 @@

@router.get("", summary="Paginated list of Runs")
async def runs(
query_args: Annotated[RunsQueryV1, Query()],
query_args: Annotated[RunsPaginateQueryV1, Query()],
run_service: Annotated[RunService, Depends()],
current_user: Annotated[User, Depends(get_user())],
) -> PageResponseV1[RunDetailedResponseV1]:
Expand All @@ -36,13 +36,13 @@ async def runs(
since=query_args.since,
until=query_args.until,
run_ids=query_args.run_id,
job_id=query_args.job_id,
parent_run_id=query_args.parent_run_id,
parent_run_ids=query_args.parent_run_id,
job_ids=query_args.job_id,
job_types=query_args.job_type,
job_location_ids=query_args.job_location_id,
search_query=query_args.search_query,
job_type=query_args.job_type,
job_location_id=query_args.job_location_id,
status=query_args.status,
started_by_user=query_args.started_by_user,
statuses=query_args.status,
started_by_users=query_args.started_by_user,
started_since=query_args.started_since,
started_until=query_args.started_until,
ended_since=query_args.ended_since,
Expand Down
4 changes: 2 additions & 2 deletions data_rentgen/server/schemas/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
RunIOStatisticsReponseV1,
RunOperationStatisticsReponseV1,
RunResponseV1,
RunsQueryV1,
RunsPaginateQueryV1,
RunStatisticsReponseV1,
)
from data_rentgen.server.schemas.v1.tag import TagDetailedResponseV1
Expand Down Expand Up @@ -126,7 +126,7 @@
"RunOperationStatisticsReponseV1",
"RunResponseV1",
"RunStatisticsReponseV1",
"RunsQueryV1",
"RunsPaginateQueryV1",
"TagDetailedResponseV1",
"UpdateLocationRequestV1",
"UserResponseV1",
Expand Down
29 changes: 17 additions & 12 deletions data_rentgen/server/schemas/v1/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,31 @@ class DatasetPaginateQueryV1(PaginateQueryV1):

dataset_id: list[int] = Field(
default_factory=list,
description="Get specific datasets by their ids",
description="Ids of datasets to fetch specific items only",
)
search_query: str | None = Field(
default=None,
min_length=3,
description="Search query, partial matching by dataset name/location",
examples=["my dataset"],
)
tag_value_id: list[int] = Field(
default_factory=list,
description="Get datasets with specific tag values (AND)",
description=(
"Get datasets having specific tag values assigned. "
"If multiple values are passed, dataset should have all of them (AND, not OR)"
),
examples=[[123]],
)
location_id: int | None = Field(
default=None,
description="Get datasets by location id",
location_id: list[int] = Field(
default_factory=list,
description="Ids of locations the dataset belongs to",
examples=[[123]],
)
location_type: list[str] = Field(
default_factory=list,
description="Get datasets by location types",
description="Types of locations the dataset belongs to",
examples=[["yarn"]],
)
search_query: str | None = Field(
default=None,
min_length=3,
description="Search query",
examples=[["my dataset"]],
)

model_config = ConfigDict(extra="forbid")
19 changes: 11 additions & 8 deletions data_rentgen/server/schemas/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,29 @@ class JobTypesResponseV1(BaseModel):
class JobPaginateQueryV1(PaginateQueryV1):
"""Query params for Jobs paginate request."""

job_id: list[int] = Field(default_factory=list, description="Job id")
job_id: list[int] = Field(
default_factory=list,
description="Ids of jobs to fetch specific items only",
)
search_query: str | None = Field(
default=None,
min_length=3,
description="Search query",
description="Search query, partial match by job name or location name/address",
examples=["my job"],
)
job_type: list[str] = Field(
default_factory=list,
description="Specify job types",
description="Types of jobs",
examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]],
)
location_id: int | None = Field(
default=None,
description="The location id which jobs belong",
examples=[123],
location_id: list[int] = Field(
default_factory=list,
description="Ids of locations the job started at",
examples=[[123]],
)
location_type: list[str] = Field(
default_factory=list,
description="Specify location types",
description="Types of location the job started at",
examples=[["yarn"]],
)

Expand Down
9 changes: 6 additions & 3 deletions data_rentgen/server/schemas/v1/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class LocationTypesResponseV1(BaseModel):
class LocationPaginateQueryV1(PaginateQueryV1):
"""Query params for Location paginate request."""

location_id: list[int] = Field(default_factory=list, description="Location id")
location_id: list[int] = Field(
default_factory=list,
description="Ids of locations to fetch specific items only",
)
location_type: list[str] = Field(
default_factory=list,
description="Location type",
Expand All @@ -74,8 +77,8 @@ class LocationPaginateQueryV1(PaginateQueryV1):
search_query: str | None = Field(
default=None,
min_length=3,
description="Search query",
examples=[["localhost:8123"]],
description="Search query, partial matching by location name or any address",
examples=["clickhouse://localhost:8123"],
)

model_config = ConfigDict(extra="forbid")
Expand Down
8 changes: 5 additions & 3 deletions data_rentgen/server/schemas/v1/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ class OperationQueryV1(PaginateQueryV1):
operation_id: list[UUID7] = Field(
default_factory=list,
description="Operation ids, for exact match",
examples=[["01913217-b761-7b1a-bb52-489da9c8b9c8"]],
)
run_id: UUID7 | None = Field(
default=None,
description="Run id, can be used only with 'since'",
run_id: list[UUID7] = Field(
default_factory=list,
description="Run ids, can be used only with 'since'",
examples=[["01913217-b761-7b1a-bb52-489da9c8b9c8"]],
)

model_config = ConfigDict(extra="forbid")
Expand Down
Loading
Loading