Skip to content

Commit 4aa4ee8

Browse files
committed
[DOP-22530] Implement multi value filters by id
1 parent b6d0c58 commit 4aa4ee8

File tree

26 files changed

+196
-157
lines changed

26 files changed

+196
-157
lines changed

data_rentgen/db/repositories/dataset.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,18 +98,18 @@ async def paginate(
9898
page_size: int,
9999
dataset_ids: Collection[int],
100100
tag_value_ids: Collection[int],
101-
location_id: int | None,
102-
location_type: Collection[str],
101+
location_ids: Collection[int],
102+
location_types: Collection[str],
103103
search_query: str | None,
104104
) -> PaginationDTO[Dataset]:
105105
where = []
106106
location_join_clause = Location.id == Dataset.location_id
107107
if dataset_ids:
108108
where.append(Dataset.id == any_(list(dataset_ids))) # type: ignore[arg-type]
109-
if location_id:
110-
where.append(Dataset.location_id == location_id)
111-
if location_type:
112-
location_type_lower = [location_type.lower() for location_type in location_type]
109+
if location_ids:
110+
where.append(Dataset.location_id == any_(list(location_ids))) # type: ignore[arg-type]
111+
if location_types:
112+
location_type_lower = [location_type.lower() for location_type in location_types]
113113
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]
114114

115115
if tag_value_ids:

data_rentgen/db/repositories/job.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,21 @@ async def paginate(
7575
page: int,
7676
page_size: int,
7777
job_ids: Collection[int],
78+
job_types: Collection[str],
79+
location_ids: Collection[int],
80+
location_types: Collection[str],
7881
search_query: str | None,
79-
location_id: int | None,
80-
location_type: Collection[str],
81-
job_type: Collection[str],
8282
) -> PaginationDTO[Job]:
8383
where = []
8484
location_join_clause = Location.id == Job.location_id
8585
if job_ids:
8686
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]
87-
if job_type:
88-
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
89-
if location_id:
90-
where.append(Job.location_id == location_id) # type: ignore[arg-type]
91-
if location_type:
92-
location_type_lower = [location_type.lower() for location_type in location_type]
87+
if job_types:
88+
where.append(Job.type == any_(list(job_types))) # type: ignore[arg-type]
89+
if location_ids:
90+
where.append(Job.location_id == any_(list(location_ids))) # type: ignore[arg-type]
91+
if location_types:
92+
location_type_lower = [location_type.lower() for location_type in location_types]
9393
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]
9494

9595
query: Select | CompoundSelect

data_rentgen/db/repositories/operation.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ async def paginate(
9898
operation_ids: Collection[UUID],
9999
since: datetime | None,
100100
until: datetime | None,
101-
run_id: UUID | None,
101+
run_ids: Collection[UUID],
102102
) -> PaginationDTO[Operation]:
103103
# do not use `tuple_(Operation.created_at, Operation.id).in_(...),
104104
# as this is too complex filter for Postgres to make an optimal query plan
@@ -119,12 +119,12 @@ async def paginate(
119119
Operation.id == any_(list(operation_ids)), # type: ignore[arg-type]
120120
]
121121

122-
elif run_id:
123-
run_created_at = extract_timestamp_from_uuid(run_id)
122+
elif run_ids:
123+
run_created_at = extract_timestamp_from_uuid(min(run_ids))
124124
# narrow created_at range
125125
min_created_at = max(filter(None, [since, run_created_at]))
126126
where = [
127-
Operation.run_id == run_id,
127+
Operation.run_id == any_(list(run_ids)), # type: ignore[arg-type]
128128
Operation.created_at >= min_created_at,
129129
Operation.id >= get_min_uuid(min_created_at),
130130
]

data_rentgen/db/repositories/run.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
union,
1919
)
2020
from sqlalchemy.dialects.postgresql import insert
21-
from sqlalchemy.orm import selectinload
21+
from sqlalchemy.orm import aliased, selectinload
2222

2323
from data_rentgen.db.models import Job, Run, RunStartReason, RunStatus, User
2424
from data_rentgen.db.repositories.base import Repository
@@ -85,24 +85,24 @@
8585

8686

8787
class RunRepository(Repository[Run]):
88-
async def paginate( # noqa: PLR0912, C901
88+
async def paginate( # noqa: PLR0912, C901, PLR0915
8989
self,
9090
page: int,
9191
page_size: int,
9292
since: datetime | None,
9393
until: datetime | None,
9494
run_ids: Collection[UUID],
95-
job_id: int | None,
96-
parent_run_id: UUID | None,
97-
search_query: str | None,
98-
job_type: Collection[str],
99-
job_location_id: int | None,
100-
status: Collection[RunStatus],
101-
started_by_user: Collection[str] | None,
95+
parent_run_ids: Collection[UUID],
96+
job_ids: Collection[int],
97+
job_types: Collection[str],
98+
job_location_ids: Collection[int],
99+
statuses: Collection[RunStatus],
100+
started_by_users: Collection[str],
102101
started_since: datetime | None,
103102
started_until: datetime | None,
104103
ended_since: datetime | None,
105104
ended_until: datetime | None,
105+
search_query: str | None,
106106
) -> PaginationDTO[Run]:
107107
# do not use `tuple_(Run.created_at, Run.id).in_(...),
108108
# as this is too complex filter for Postgres to make an optimal query plan
@@ -135,12 +135,12 @@ async def paginate( # noqa: PLR0912, C901
135135
Run.id <= get_max_uuid(until),
136136
]
137137

138-
if job_id:
139-
where.append(Run.job_id == job_id)
140-
if parent_run_id:
141-
where.append(Run.parent_run_id == parent_run_id)
142-
if status:
143-
where.append(Run.status == any_(status)) # type: ignore[arg-type]
138+
if job_ids:
139+
where.append(Run.job_id == any_(list(job_ids))) # type: ignore[arg-type]
140+
if parent_run_ids:
141+
where.append(Run.parent_run_id == any_(list(parent_run_ids))) # type: ignore[arg-type]
142+
if statuses:
143+
where.append(Run.status == any_(statuses)) # type: ignore[arg-type]
144144
if started_since:
145145
where.append(Run.started_at >= started_since)
146146
if started_until:
@@ -159,10 +159,11 @@ async def paginate( # noqa: PLR0912, C901
159159
ts_match(Run.search_vector, tsquery),
160160
*where,
161161
)
162+
job_search = aliased(Job, name="job_search")
162163
job_stmt = (
163-
select(Run, ts_rank(Job.search_vector, tsquery).label("search_rank"))
164-
.join(Job, Job.id == Run.job_id)
165-
.where(ts_match(Job.search_vector, tsquery), *where)
164+
select(Run, ts_rank(job_search.search_vector, tsquery).label("search_rank"))
165+
.join(job_search, job_search.id == Run.job_id)
166+
.where(ts_match(job_search.search_vector, tsquery), *where)
166167
)
167168

168169
union_cte = union(run_stmt, job_stmt).cte()
@@ -179,20 +180,21 @@ async def paginate( # noqa: PLR0912, C901
179180
query = select(Run).where(*where)
180181
order_by = [Run.created_at.desc(), Run.id.desc()]
181182

182-
job_where = []
183-
if job_type:
184-
job_where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
185-
if job_location_id is not None:
186-
job_where.append(Job.location_id == job_location_id)
187-
if job_where:
188-
query = query.join(Job, and_(Run.job_id == Job.id, *job_where))
183+
if job_types or job_location_ids:
184+
job = aliased(Job, name="job_type")
185+
query = query.join(job, and_(Run.job_id == job.id))
186+
if job_types:
187+
query = query.where(job.type == any_(list(job_types))) # type: ignore[arg-type]
188+
if job_location_ids:
189+
query = query.where(job.location_id == any_(list(job_location_ids))) # type: ignore[arg-type]
189190

190-
if started_by_user:
191+
if started_by_users:
192+
usernames_lower = [name.lower() for name in started_by_users]
191193
query = query.join(
192194
User,
193195
and_(
194196
Run.started_by_user_id == User.id,
195-
func.lower(User.name) == any_([name.lower() for name in started_by_user]), # type: ignore[arg-type]
197+
func.lower(User.name) == any_(usernames_lower), # type: ignore[arg-type]
196198
),
197199
)
198200

data_rentgen/server/api/v1/router/dataset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ async def paginate_datasets(
3838
page_size=query_args.page_size,
3939
dataset_ids=query_args.dataset_id,
4040
tag_value_ids=query_args.tag_value_id,
41-
location_id=query_args.location_id,
42-
location_type=query_args.location_type,
41+
location_ids=query_args.location_id,
42+
location_types=query_args.location_type,
4343
search_query=query_args.search_query,
4444
)
4545
return PageResponseV1[DatasetDetailedResponseV1].from_pagination(pagination)

data_rentgen/server/api/v1/router/job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ async def paginate_jobs(
4242
page=query_args.page,
4343
page_size=query_args.page_size,
4444
job_ids=query_args.job_id,
45+
job_types=query_args.job_type,
46+
location_ids=query_args.location_id,
47+
location_types=query_args.location_type,
4548
search_query=query_args.search_query,
46-
location_id=query_args.location_id,
47-
location_type=query_args.location_type,
48-
job_type=query_args.job_type,
4949
)
5050
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)
5151

data_rentgen/server/api/v1/router/operation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ async def operations(
3838
since=query_args.since,
3939
until=query_args.until,
4040
operation_ids=query_args.operation_id,
41-
run_id=query_args.run_id,
41+
run_ids=query_args.run_id,
4242
)
4343
return PageResponseV1[OperationDetailedResponseV1].from_pagination(pagination)
4444

data_rentgen/server/api/v1/router/run.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
PageResponseV1,
1313
RunDetailedResponseV1,
1414
RunLineageQueryV1,
15-
RunsQueryV1,
15+
RunsPaginateQueryV1,
1616
)
1717
from data_rentgen.server.services import LineageService, RunService, get_user
1818
from data_rentgen.server.utils.lineage_response import build_lineage_response
@@ -26,7 +26,7 @@
2626

2727
@router.get("", summary="Paginated list of Runs")
2828
async def runs(
29-
query_args: Annotated[RunsQueryV1, Query()],
29+
query_args: Annotated[RunsPaginateQueryV1, Query()],
3030
run_service: Annotated[RunService, Depends()],
3131
current_user: Annotated[User, Depends(get_user())],
3232
) -> PageResponseV1[RunDetailedResponseV1]:
@@ -36,13 +36,13 @@ async def runs(
3636
since=query_args.since,
3737
until=query_args.until,
3838
run_ids=query_args.run_id,
39-
job_id=query_args.job_id,
40-
parent_run_id=query_args.parent_run_id,
39+
parent_run_ids=query_args.parent_run_id,
40+
job_ids=query_args.job_id,
41+
job_types=query_args.job_type,
42+
job_location_ids=query_args.job_location_id,
4143
search_query=query_args.search_query,
42-
job_type=query_args.job_type,
43-
job_location_id=query_args.job_location_id,
44-
status=query_args.status,
45-
started_by_user=query_args.started_by_user,
44+
statuses=query_args.status,
45+
started_by_users=query_args.started_by_user,
4646
started_since=query_args.started_since,
4747
started_until=query_args.started_until,
4848
ended_since=query_args.ended_since,

data_rentgen/server/schemas/v1/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
RunIOStatisticsReponseV1,
6868
RunOperationStatisticsReponseV1,
6969
RunResponseV1,
70-
RunsQueryV1,
70+
RunsPaginateQueryV1,
7171
RunStatisticsReponseV1,
7272
)
7373
from data_rentgen.server.schemas.v1.tag import TagDetailedResponseV1
@@ -126,7 +126,7 @@
126126
"RunOperationStatisticsReponseV1",
127127
"RunResponseV1",
128128
"RunStatisticsReponseV1",
129-
"RunsQueryV1",
129+
"RunsPaginateQueryV1",
130130
"TagDetailedResponseV1",
131131
"UpdateLocationRequestV1",
132132
"UserResponseV1",

data_rentgen/server/schemas/v1/dataset.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,26 +57,31 @@ class DatasetPaginateQueryV1(PaginateQueryV1):
5757

5858
dataset_id: list[int] = Field(
5959
default_factory=list,
60-
description="Get specific datasets by their ids",
60+
description="Ids of datasets to fetch specific items only",
61+
)
62+
search_query: str | None = Field(
63+
default=None,
64+
min_length=3,
65+
description="Search query, partial matching by dataset name/location",
66+
examples=["my dataset"],
6167
)
6268
tag_value_id: list[int] = Field(
6369
default_factory=list,
64-
description="Get datasets with specific tag values (AND)",
70+
description=(
71+
"Get datasets having specific tag values assigned. "
72+
"If multiple values are passed, dataset should have all of them (AND, not OR)"
73+
),
74+
examples=[[123]],
6575
)
66-
location_id: int | None = Field(
67-
default=None,
68-
description="Get datasets by location id",
76+
location_id: list[int] = Field(
77+
default_factory=list,
78+
description="Ids of locations the dataset belongs to",
79+
examples=[[123]],
6980
)
7081
location_type: list[str] = Field(
7182
default_factory=list,
72-
description="Get datasets by location types",
83+
description="Types of locations the dataset belongs to",
7384
examples=[["yarn"]],
7485
)
75-
search_query: str | None = Field(
76-
default=None,
77-
min_length=3,
78-
description="Search query",
79-
examples=[["my dataset"]],
80-
)
8186

8287
model_config = ConfigDict(extra="forbid")

0 commit comments

Comments
 (0)