Skip to content

Commit 624e58b

Browse files
committed
[DOP-29593] Add filter by Run.started_at and Run.ended_at
1 parent 14f1e5e commit 624e58b

File tree

7 files changed

+349
-140
lines changed

7 files changed

+349
-140
lines changed

data_rentgen/db/repositories/run.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484

8585

8686
class RunRepository(Repository[Run]):
87-
async def paginate(
87+
async def paginate( # noqa: PLR0912, C901
8888
self,
8989
page: int,
9090
page_size: int,
@@ -95,7 +95,11 @@ async def paginate(
9595
parent_run_id: UUID | None,
9696
search_query: str | None,
9797
job_type: Collection[str],
98-
status: Collection[str],
98+
status: Collection[RunStatus],
99+
started_since: datetime | None,
100+
started_until: datetime | None,
101+
ended_since: datetime | None,
102+
ended_until: datetime | None,
99103
) -> PaginationDTO[Run]:
100104
# do not use `tuple_(Run.created_at, Run.id).in_(...),
101105
# as this is too complex filter for Postgres to make an optimal query plan
@@ -133,8 +137,15 @@ async def paginate(
133137
if parent_run_id:
134138
where.append(Run.parent_run_id == parent_run_id)
135139
if status:
136-
serialize_status: Collection[RunStatus] = [RunStatus[status] for status in status]
137-
where.append(Run.status == any_(serialize_status)) # type: ignore[arg-type]
140+
where.append(Run.status == any_(status)) # type: ignore[arg-type]
141+
if started_since:
142+
where.append(Run.started_at >= started_since)
143+
if started_until:
144+
where.append(Run.started_at <= started_until)
145+
if ended_since:
146+
where.append(Run.ended_at >= ended_since)
147+
if ended_until:
148+
where.append(Run.ended_at <= ended_until)
138149

139150
query: Select | CompoundSelect
140151
order_by: list[ColumnElement | SQLColumnExpression]

data_rentgen/server/api/v1/router/run.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ async def runs(
4141
search_query=query_args.search_query,
4242
job_type=query_args.job_type,
4343
status=query_args.status,
44+
started_since=query_args.started_since,
45+
started_until=query_args.started_until,
46+
ended_since=query_args.ended_since,
47+
ended_until=query_args.ended_until,
4448
)
4549
return PageResponseV1[RunDetailedResponseV1].from_pagination(pagination)
4650

data_rentgen/server/schemas/v1/run.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,19 @@ def __str__(self) -> str:
4343

4444
class RunStatusForQueryV1(StrEnum):
4545
UNKNOWN = "UNKNOWN"
46+
"""No data about status"""
47+
4648
STARTED = "STARTED"
49+
"""Received START event"""
50+
4751
SUCCEEDED = "SUCCEEDED"
52+
"""Finished successfully"""
53+
4854
FAILED = "FAILED"
55+
"""Internal failure"""
56+
4957
KILLED = "KILLED"
58+
"""Killed externally, e.g. by user request or in case of OOM"""
5059

5160

5261
class RunResponseV1(BaseModel):
@@ -150,16 +159,42 @@ class RunsQueryV1(PaginateQueryV1):
150159
job_type: list[str] = Field(
151160
default_factory=list,
152161
description="Filter runs by type of a Job",
153-
examples=["SPARK_APPLICATION", "AIRFLOW_TASK"],
162+
examples=[["SPARK_APPLICATION", "AIRFLOW_TASK"]],
163+
)
164+
status: list[RunStatusForQueryV1] = Field(
165+
default_factory=list,
166+
description="Filter runs by status",
167+
examples=[["SUCCEEDED", "FAILED"]],
154168
)
155-
status: list[RunStatusForQueryV1] = Field(default_factory=list, description="Filter runs by status")
156169

157170
search_query: str | None = Field(
158171
default=None,
159172
min_length=3,
160173
description="Search query",
161174
)
162175

176+
started_since: datetime | None = Field(
177+
default=None,
178+
description="Minimum value of Run 'started_at' field, in ISO 8601 format",
179+
examples=["2008-09-15T15:53:00+05:00"],
180+
)
181+
started_until: datetime | None = Field(
182+
default=None,
183+
description="Maximum value of Run 'started_at' field, in ISO 8601 format",
184+
examples=["2008-09-15T15:53:00+05:00"],
185+
)
186+
187+
ended_since: datetime | None = Field(
188+
default=None,
189+
description="Minimum value of Run 'ended_at' field, in ISO 8601 format",
190+
examples=["2008-09-15T15:53:00+05:00"],
191+
)
192+
ended_until: datetime | None = Field(
193+
default=None,
194+
description="Maximum value of Run 'ended_at' field, in ISO 8601 format",
195+
examples=["2008-09-15T15:53:00+05:00"],
196+
)
197+
163198
model_config = ConfigDict(extra="forbid")
164199

165200
@field_validator("until", mode="after")

data_rentgen/server/services/run.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from fastapi import Depends
1010
from sqlalchemy import Row
1111

12-
from data_rentgen.db.models.run import Run
12+
from data_rentgen.db.models import Run, RunStatus
1313
from data_rentgen.dto.pagination import PaginationDTO
1414
from data_rentgen.services.uow import UnitOfWork
1515

@@ -82,6 +82,10 @@ async def paginate(
8282
search_query: str | None,
8383
job_type: Collection[str],
8484
status: Collection[str],
85+
started_since: datetime | None,
86+
started_until: datetime | None,
87+
ended_since: datetime | None,
88+
ended_until: datetime | None,
8589
) -> RunServicePaginatedResult:
8690
pagination = await self._uow.run.paginate(
8791
page=page,
@@ -93,7 +97,11 @@ async def paginate(
9397
parent_run_id=parent_run_id,
9498
search_query=search_query,
9599
job_type=job_type,
96-
status=status,
100+
status=[RunStatus[s] for s in status],
101+
started_since=started_since,
102+
started_until=started_until,
103+
ended_since=ended_since,
104+
ended_until=ended_until,
97105
)
98106
run_ids = [item.id for item in pagination.items]
99107
input_stats = await self._uow.input.get_stats_by_run_ids(run_ids)

tests/test_server/fixtures/factories/run.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,16 +194,38 @@ async def runs_search(
194194
{"name": "spark_application_name", "type": "SPARK_APPLICATION"},
195195
{"name": "airflow_dag_name", "type": "AIRFLOW_DAG"},
196196
]
197+
198+
created_at = datetime.now(tz=UTC)
197199
runs_kwargs = [
198-
{"external_id": "application_1638922609021_0001", "status": RunStatus.KILLED},
200+
{
201+
"external_id": "application_1638922609021_0001",
202+
"status": RunStatus.KILLED,
203+
"created_at": created_at + timedelta(seconds=0.1),
204+
"started_at": created_at + timedelta(seconds=1),
205+
"ended_at": created_at + timedelta(seconds=60),
206+
},
199207
{
200208
"external_id": "application_1638922609021_0002",
201209
"status": RunStatus.SUCCEEDED,
210+
"created_at": created_at + timedelta(seconds=0.2),
211+
"started_at": created_at + timedelta(seconds=2),
212+
"ended_at": created_at + timedelta(seconds=120),
213+
},
214+
{
215+
"external_id": "extract_task_0001",
216+
"status": RunStatus.STARTED,
217+
"created_at": created_at + timedelta(seconds=0.3),
218+
"started_at": created_at + timedelta(seconds=3),
219+
"ended_at": None,
220+
},
221+
{
222+
"external_id": "extract_task_0002",
223+
"status": RunStatus.FAILED,
224+
"created_at": created_at + timedelta(seconds=0.4),
225+
"started_at": created_at + timedelta(seconds=4),
226+
"ended_at": created_at + timedelta(seconds=240),
202227
},
203-
{"external_id": "extract_task_0001", "status": RunStatus.STARTED},
204-
{"external_id": "extract_task_0002", "status": RunStatus.FAILED},
205228
]
206-
started_at = datetime.now(tz=UTC)
207229
async with async_session_maker() as async_session:
208230
jobs = []
209231
for kwargs in job_kwargs:
@@ -221,7 +243,6 @@ async def runs_search(
221243
await create_run(
222244
async_session,
223245
run_kwargs={
224-
"created_at": started_at + timedelta(seconds=0.1 * i),
225246
"job_id": job.id,
226247
"started_by_user_id": user.id,
227248
**kwargs,

0 commit comments

Comments
 (0)