Skip to content

Commit 11946fa

Browse files
committed
[DOP-29112] add status and job_type filters for run search
1 parent 8dfdc63 commit 11946fa

File tree

9 files changed

+111
-8
lines changed

9 files changed

+111
-8
lines changed

data_rentgen/db/repositories/run.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ async def paginate(
9494
job_id: int | None,
9595
parent_run_id: UUID | None,
9696
search_query: str | None,
97+
job_types: Collection[str],
98+
statuses: Collection[str],
9799
) -> PaginationDTO[Run]:
98100
# do not use `tuple_(Run.created_at, Run.id).in_(...),
99101
# as this is too complex filter for Postgres to make an optimal query plan
@@ -130,6 +132,9 @@ async def paginate(
130132
where.append(Run.job_id == job_id)
131133
if parent_run_id:
132134
where.append(Run.parent_run_id == parent_run_id)
135+
if statuses:
136+
serialize_statuses: Collection[RunStatus] = [RunStatus[status] for status in statuses]
137+
where.append(Run.status == any_(serialize_statuses)) # type: ignore[arg-type]
133138

134139
query: Select | CompoundSelect
135140
order_by: list[ColumnElement | SQLColumnExpression]
@@ -160,6 +165,9 @@ async def paginate(
160165
query = select(Run).where(*where)
161166
order_by = [Run.created_at.desc(), Run.id.desc()]
162167

168+
if job_types:
169+
query = query.join(Job, Run.job_id == Job.id).where(Job.type == any_(job_types)) # type: ignore[arg-type]
170+
163171
options = [selectinload(Run.started_by_user)]
164172
return await self._paginate_by_query(
165173
query=query,

data_rentgen/server/api/v1/router/run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ async def runs(
3939
job_id=query_args.job_id,
4040
parent_run_id=query_args.parent_run_id,
4141
search_query=query_args.search_query,
42+
job_types=query_args.job_types,
43+
statuses=query_args.statuses,
4244
)
4345
return PageResponseV1[RunDetailedResponseV1].from_pagination(pagination)
4446

data_rentgen/server/schemas/v1/run.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
from datetime import datetime
6-
from enum import IntEnum
6+
from enum import IntEnum, StrEnum
77
from uuid import UUID
88

99
from pydantic import (
@@ -41,6 +41,14 @@ def __str__(self) -> str:
4141
return self.name
4242

4343

44+
class RunStatusForQueryV1(StrEnum):
45+
UNKNOWN = "UNKNOWN"
46+
STARTED = "STARTED"
47+
SUCCEEDED = "SUCCEEDED"
48+
FAILED = "FAILED"
49+
KILLED = "KILLED"
50+
51+
4452
class RunResponseV1(BaseModel):
4553
"""Run response"""
4654

@@ -139,6 +147,13 @@ class RunsQueryV1(PaginateQueryV1):
139147
examples=["01913217-b761-7b1a-bb52-489da9c8b9c8"],
140148
)
141149

150+
job_types: list[str] = Field(
151+
default_factory=list,
152+
description="Filter runs by type of a Job",
153+
examples=["SPARK_APPLICATION", "AIRFLOW_TASK"],
154+
)
155+
statuses: list[RunStatusForQueryV1] = Field(default_factory=list, description="Filter runs by status")
156+
142157
search_query: str | None = Field(
143158
default=None,
144159
min_length=3,

data_rentgen/server/services/run.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ async def paginate(
8080
job_id: int | None,
8181
parent_run_id: UUID | None,
8282
search_query: str | None,
83+
job_types: Collection[str],
84+
statuses: Collection[str],
8385
) -> RunServicePaginatedResult:
8486
pagination = await self._uow.run.paginate(
8587
page=page,
@@ -90,6 +92,8 @@ async def paginate(
9092
job_id=job_id,
9193
parent_run_id=parent_run_id,
9294
search_query=search_query,
95+
job_types=job_types,
96+
statuses=statuses,
9397
)
9498
run_ids = [item.id for item in pagination.items]
9599
input_stats = await self._uow.input.get_stats_by_run_ids(run_ids)

tests/test_server/fixtures/factories/run.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ async def runs_with_same_job(
135135
async_session,
136136
run_kwargs={
137137
"job_id": job.id,
138-
"created_at": started_at + timedelta(seconds=s), # To be sure runs has different timestamps
138+
# To be sure runs has different timestamps
139+
"created_at": started_at + timedelta(seconds=s),
139140
"started_by_user_id": user.id,
140141
**params,
141142
},
@@ -167,7 +168,8 @@ async def runs_with_same_parent(
167168
async_session,
168169
run_kwargs={
169170
"parent_run_id": parent_run_id,
170-
"created_at": started_at + timedelta(seconds=s), # To be sure runs has different timestamps
171+
# To be sure runs has different timestamps
172+
"created_at": started_at + timedelta(seconds=s),
171173
"started_by_user_id": user.id,
172174
**params,
173175
},
@@ -193,10 +195,10 @@ async def runs_search(
193195
{"name": "airflow_dag_name", "type": "AIRFLOW_DAG"},
194196
]
195197
runs_kwargs = [
196-
{"external_id": "application_1638922609021_0001"},
197-
{"external_id": "application_1638922609021_0002"},
198-
{"external_id": "extract_task_0001"},
199-
{"external_id": "extract_task_0002"},
198+
{"external_id": "application_1638922609021_0001", "status": 3},
199+
{"external_id": "application_1638922609021_0002", "status": 1},
200+
{"external_id": "extract_task_0001", "status": 0},
201+
{"external_id": "extract_task_0002", "status": 2},
200202
]
201203
started_at = datetime.now(tz=UTC)
202204
async with async_session_maker() as async_session:

tests/test_server/test_runs/test_get_runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ async def test_get_runs_missing_fields(
3636
"message": "Value error, input should contain either 'since', 'run_id', 'job_id', 'parent_run_id' or 'search_query' field",
3737
"context": {},
3838
"input": {
39+
"statuses": [],
40+
"job_types": [],
3941
"page": 1,
4042
"page_size": 20,
4143
"run_id": [],

tests/test_server/test_runs/test_get_runs_by_job_id.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ async def test_get_runs_by_job_id_missing_since(
3838
"message": "Value error, 'job_id' can be passed only with 'since'",
3939
"context": {},
4040
"input": {
41+
"statuses": [],
42+
"job_types": [],
4143
"page": 1,
4244
"page_size": 20,
4345
"job_id": str(new_run.job_id),

tests/test_server/test_runs/test_get_runs_by_parent_run_id.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ async def test_get_runs_by_job_id_missing_since(
3838
"message": "Value error, 'parent_run_id' can be passed only with 'since'",
3939
"context": {},
4040
"input": {
41+
"statuses": [],
42+
"job_types": [],
4143
"page": 1,
4244
"page_size": 20,
4345
"parent_run_id": str(new_run.parent_run_id),

tests/test_server/test_runs/test_search_runs.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ async def test_search_runs_missing_since(
3737
"message": "Value error, 'search_query' can be passed only with 'since'",
3838
"context": {},
3939
"input": {
40+
"statuses": [],
41+
"job_types": [],
4042
"page": 1,
4143
"page_size": 20,
4244
"run_id": [],
@@ -198,7 +200,71 @@ async def test_search_runs_by_job_type(
198200
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
199201
params={
200202
"since": since.isoformat(),
201-
"search_query": "SPARK",
203+
"job_types": ["SPARK_APPLICATION"],
204+
},
205+
)
206+
207+
assert response.status_code == HTTPStatus.OK, response.json()
208+
assert response.json() == {
209+
"meta": {
210+
"has_next": False,
211+
"has_previous": False,
212+
"next_page": None,
213+
"page": 1,
214+
"page_size": 20,
215+
"pages_count": 1,
216+
"previous_page": None,
217+
"total_count": 2,
218+
},
219+
"items": [
220+
{
221+
"id": str(run.id),
222+
"data": run_to_json(run),
223+
"statistics": {
224+
"inputs": {
225+
"total_datasets": 0,
226+
"total_bytes": 0,
227+
"total_rows": 0,
228+
"total_files": 0,
229+
},
230+
"outputs": {
231+
"total_datasets": 0,
232+
"total_bytes": 0,
233+
"total_rows": 0,
234+
"total_files": 0,
235+
},
236+
"operations": {
237+
"total_operations": 0,
238+
},
239+
},
240+
}
241+
for run in runs
242+
],
243+
}
244+
245+
246+
async def test_search_runs_by_status(
247+
test_client: AsyncClient,
248+
async_session: AsyncSession,
249+
runs_search: dict[str, Run],
250+
mocked_user: MockedUser,
251+
) -> None:
252+
runs = await enrich_runs(
253+
[
254+
# runs sorted by id in descending order
255+
runs_search["extract_task_0001"],
256+
runs_search["application_1638922609021_0002"],
257+
],
258+
async_session,
259+
)
260+
since = min(run.created_at for run in runs)
261+
262+
response = await test_client.get(
263+
"/v1/runs",
264+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
265+
params={
266+
"since": since.isoformat(),
267+
"statuses": ["SUCCEEDED", "STARTED"],
202268
},
203269
)
204270

0 commit comments

Comments
 (0)