Skip to content

Commit 40a8255

Browse files
committed
[DOP-29593] Add filter by Run.started_at and Run.ended_at
1 parent 14f1e5e commit 40a8255

File tree

12 files changed

+604
-196
lines changed

12 files changed

+604
-196
lines changed

data_rentgen/db/repositories/run.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
CompoundSelect,
1010
Select,
1111
SQLColumnExpression,
12+
and_,
1213
any_,
1314
bindparam,
1415
desc,
@@ -19,7 +20,7 @@
1920
from sqlalchemy.dialects.postgresql import insert
2021
from sqlalchemy.orm import selectinload
2122

22-
from data_rentgen.db.models import Job, Run, RunStartReason, RunStatus
23+
from data_rentgen.db.models import Job, Run, RunStartReason, RunStatus, User
2324
from data_rentgen.db.repositories.base import Repository
2425
from data_rentgen.db.utils.search import make_tsquery, ts_match, ts_rank
2526
from data_rentgen.dto import PaginationDTO, RunDTO
@@ -84,7 +85,7 @@
8485

8586

8687
class RunRepository(Repository[Run]):
87-
async def paginate(
88+
async def paginate( # noqa: PLR0912, C901
8889
self,
8990
page: int,
9091
page_size: int,
@@ -95,7 +96,13 @@ async def paginate(
9596
parent_run_id: UUID | None,
9697
search_query: str | None,
9798
job_type: Collection[str],
98-
status: Collection[str],
99+
job_location_id: int | None,
100+
status: Collection[RunStatus],
101+
started_by_user: str | None,
102+
started_since: datetime | None,
103+
started_until: datetime | None,
104+
ended_since: datetime | None,
105+
ended_until: datetime | None,
99106
) -> PaginationDTO[Run]:
100107
# do not use `tuple_(Run.created_at, Run.id).in_(...),
101108
# as this is too complex filter for Postgres to make an optimal query plan
@@ -133,8 +140,15 @@ async def paginate(
133140
if parent_run_id:
134141
where.append(Run.parent_run_id == parent_run_id)
135142
if status:
136-
serialize_status: Collection[RunStatus] = [RunStatus[status] for status in status]
137-
where.append(Run.status == any_(serialize_status)) # type: ignore[arg-type]
143+
where.append(Run.status == any_(status)) # type: ignore[arg-type]
144+
if started_since:
145+
where.append(Run.started_at >= started_since)
146+
if started_until:
147+
where.append(Run.started_at <= started_until)
148+
if ended_since:
149+
where.append(Run.ended_at >= ended_since)
150+
if ended_until:
151+
where.append(Run.ended_at <= ended_until)
138152

139153
query: Select | CompoundSelect
140154
order_by: list[ColumnElement | SQLColumnExpression]
@@ -165,8 +179,16 @@ async def paginate(
165179
query = select(Run).where(*where)
166180
order_by = [Run.created_at.desc(), Run.id.desc()]
167181

182+
job_where = []
168183
if job_type:
169-
query = query.join(Job, Run.job_id == Job.id).where(Job.type == any_(job_type)) # type: ignore[arg-type]
184+
job_where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
185+
if job_location_id is not None:
186+
job_where.append(Job.location_id == job_location_id)
187+
if job_where:
188+
query = query.join(Job, and_(Run.job_id == Job.id, *job_where))
189+
190+
if started_by_user:
191+
query = query.join(User, and_(Run.started_by_user_id == User.id, User.name == started_by_user))
170192

171193
options = [selectinload(Run.started_by_user)]
172194
return await self._paginate_by_query(

data_rentgen/server/api/v1/router/run.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ async def runs(
4040
parent_run_id=query_args.parent_run_id,
4141
search_query=query_args.search_query,
4242
job_type=query_args.job_type,
43+
job_location_id=query_args.job_location_id,
4344
status=query_args.status,
45+
started_by_user=query_args.started_by_user,
46+
started_since=query_args.started_since,
47+
started_until=query_args.started_until,
48+
ended_since=query_args.ended_since,
49+
ended_until=query_args.ended_until,
4450
)
4551
return PageResponseV1[RunDetailedResponseV1].from_pagination(pagination)
4652

data_rentgen/server/schemas/v1/run.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,19 @@ def __str__(self) -> str:
4343

4444
class RunStatusForQueryV1(StrEnum):
4545
UNKNOWN = "UNKNOWN"
46+
"""No data about status"""
47+
4648
STARTED = "STARTED"
49+
"""Received START event"""
50+
4751
SUCCEEDED = "SUCCEEDED"
52+
"""Finished successfully"""
53+
4854
FAILED = "FAILED"
55+
"""Internal failure"""
56+
4957
KILLED = "KILLED"
58+
"""Killed externally, e.g. by user request or in case of OOM"""
5059

5160

5261
class RunResponseV1(BaseModel):
@@ -150,16 +159,54 @@ class RunsQueryV1(PaginateQueryV1):
150159
job_type: list[str] = Field(
151160
default_factory=list,
152161
description="Filter runs by type of a Job",
153-
examples=["SPARK_APPLICATION", "AIRFLOW_TASK"],
162+
examples=[["SPARK_APPLICATION", "AIRFLOW_TASK"]],
163+
)
164+
job_location_id: int | None = Field(
165+
default=None,
166+
description="Filter runs by location of a Job",
167+
examples=[123, 234],
168+
)
169+
170+
status: list[RunStatusForQueryV1] = Field(
171+
default_factory=list,
172+
description="Filter runs by status",
173+
examples=[["SUCCEEDED", "FAILED"]],
174+
)
175+
176+
started_by_user: str | None = Field(
177+
default=None,
178+
description="User who started the Run",
179+
examples=["someuser"],
154180
)
155-
status: list[RunStatusForQueryV1] = Field(default_factory=list, description="Filter runs by status")
156181

157182
search_query: str | None = Field(
158183
default=None,
159184
min_length=3,
160185
description="Search query",
161186
)
162187

188+
started_since: datetime | None = Field(
189+
default=None,
190+
description="Minimum value of Run 'started_at' field, in ISO 8601 format",
191+
examples=["2008-09-15T15:53:00+05:00"],
192+
)
193+
started_until: datetime | None = Field(
194+
default=None,
195+
description="Maximum value of Run 'started_at' field, in ISO 8601 format",
196+
examples=["2008-09-15T15:53:00+05:00"],
197+
)
198+
199+
ended_since: datetime | None = Field(
200+
default=None,
201+
description="Minimum value of Run 'ended_at' field, in ISO 8601 format",
202+
examples=["2008-09-15T15:53:00+05:00"],
203+
)
204+
ended_until: datetime | None = Field(
205+
default=None,
206+
description="Maximum value of Run 'ended_at' field, in ISO 8601 format",
207+
examples=["2008-09-15T15:53:00+05:00"],
208+
)
209+
163210
model_config = ConfigDict(extra="forbid")
164211

165212
@field_validator("until", mode="after")

data_rentgen/server/services/run.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from fastapi import Depends
1010
from sqlalchemy import Row
1111

12-
from data_rentgen.db.models.run import Run
12+
from data_rentgen.db.models import Run, RunStatus
1313
from data_rentgen.dto.pagination import PaginationDTO
1414
from data_rentgen.services.uow import UnitOfWork
1515

@@ -81,7 +81,13 @@ async def paginate(
8181
parent_run_id: UUID | None,
8282
search_query: str | None,
8383
job_type: Collection[str],
84+
job_location_id: int | None,
8485
status: Collection[str],
86+
started_by_user: str | None,
87+
started_since: datetime | None,
88+
started_until: datetime | None,
89+
ended_since: datetime | None,
90+
ended_until: datetime | None,
8591
) -> RunServicePaginatedResult:
8692
pagination = await self._uow.run.paginate(
8793
page=page,
@@ -93,7 +99,13 @@ async def paginate(
9399
parent_run_id=parent_run_id,
94100
search_query=search_query,
95101
job_type=job_type,
96-
status=status,
102+
job_location_id=job_location_id,
103+
status=[RunStatus[s] for s in status],
104+
started_by_user=started_by_user,
105+
started_since=started_since,
106+
started_until=started_until,
107+
ended_since=ended_since,
108+
ended_until=ended_until,
97109
)
98110
run_ids = [item.id for item in pagination.items]
99111
input_stats = await self._uow.input.get_stats_by_run_ids(run_ids)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Add new filters for ``GET /v1/jobs`` endpoint.
2+
- location_id: ``int`` id to filter by specific location
3+
- job_type: ``list[str]`` filter by job's type
4+
5+
Add new endpoint GET ``/v1/jobs/types`` - to get distinct job types from DataRentgen.

docs/changelog/next_release/319.improvement.rst

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Add new query parameters for ``/api/v1/runs`` endpoint:
1+
Add new query parameters for ``GET /v1/runs`` endpoint:
22

33
- job_type: ``list[str]`` - filter by corresponding job type. For example ``SPARK_APLICATION``. You can use ``/api/v1/jobs/types`` to get all job types.
4-
- status:``list[RunStatus]`` - filter by runs statuses.
4+
- status: ``list[RunStatus]`` - filter by runs statuses.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Add new query parameters for ``GET /v1/runs`` endpoint:
2+
3+
- started_since: ``datetime | None``
4+
- started_until: ``datetime | None``
5+
- ended_since: ``datetime | None``
6+
- ended_until: ``datetime | None``
7+
- job_location_id: ``int | None``
8+
- started_by_user: ``str | None``
9+

tests/test_server/fixtures/factories/run.py

Lines changed: 88 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from tests.test_server.fixtures.factories.job import create_job
1313
from tests.test_server.fixtures.factories.job_type import create_job_type
1414
from tests.test_server.fixtures.factories.location import create_location
15+
from tests.test_server.fixtures.factories.user import create_user
1516
from tests.test_server.utils.delete import clean_db
1617

1718
if TYPE_CHECKING:
@@ -188,50 +189,95 @@ async def runs_with_same_parent(
188189
@pytest_asyncio.fixture()
189190
async def runs_search(
190191
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
191-
user: User,
192192
) -> AsyncGenerator[dict[str | None, Run], None]:
193-
job_kwargs = [
194-
{"name": "spark_application_name", "type": "SPARK_APPLICATION"},
195-
{"name": "airflow_dag_name", "type": "AIRFLOW_DAG"},
196-
]
197-
runs_kwargs = [
198-
{"external_id": "application_1638922609021_0001", "status": RunStatus.KILLED},
199-
{
200-
"external_id": "application_1638922609021_0002",
201-
"status": RunStatus.SUCCEEDED,
202-
},
203-
{"external_id": "extract_task_0001", "status": RunStatus.STARTED},
204-
{"external_id": "extract_task_0002", "status": RunStatus.FAILED},
205-
]
206-
started_at = datetime.now(tz=UTC)
193+
created_at = datetime.now(tz=UTC)
207194
async with async_session_maker() as async_session:
208-
jobs = []
209-
for kwargs in job_kwargs:
210-
location = await create_location(async_session)
211-
job_type = await create_job_type(async_session, job_type_kwargs={"type": kwargs["type"]})
212-
jobs.append(
213-
await create_job(
214-
async_session,
215-
location_id=location.id,
216-
job_type_id=job_type.id,
217-
job_kwargs=kwargs,
218-
),
219-
)
220-
runs = [
221-
await create_run(
222-
async_session,
223-
run_kwargs={
224-
"created_at": started_at + timedelta(seconds=0.1 * i),
225-
"job_id": job.id,
226-
"started_by_user_id": user.id,
227-
**kwargs,
228-
},
229-
)
230-
for i, (job, kwargs) in enumerate(zip([job for job in jobs for _ in range(2)], runs_kwargs, strict=False))
231-
]
232-
233-
async_session.expunge_all()
234-
195+
spark_location = await create_location(async_session)
196+
airflow_location = await create_location(async_session)
197+
198+
spark_user = await create_user(async_session)
199+
airflow_user = await create_user(async_session)
200+
201+
spark_application_job_type = await create_job_type(async_session, job_type_kwargs={"type": "SPARK_APPLICATION"})
202+
airflow_dag_job_type = await create_job_type(async_session, job_type_kwargs={"type": "AIRFLOW_DAG"})
203+
airflow_task_job_type = await create_job_type(async_session, job_type_kwargs={"type": "AIRFLOW_TASK"})
204+
205+
spark_application = await create_job(
206+
async_session,
207+
location_id=spark_location.id,
208+
job_type_id=spark_application_job_type.id,
209+
job_kwargs={"name": "spark_application_name"},
210+
)
211+
airflow_dag = await create_job(
212+
async_session,
213+
location_id=airflow_location.id,
214+
job_type_id=airflow_dag_job_type.id,
215+
job_kwargs={"name": "airflow_dag_name"},
216+
)
217+
airflow_task = await create_job(
218+
async_session,
219+
location_id=airflow_location.id,
220+
job_type_id=airflow_task_job_type.id,
221+
job_kwargs={"name": "airflow_task_name"},
222+
)
223+
224+
spark_app_run1 = await create_run(
225+
async_session,
226+
run_kwargs={
227+
"job_id": spark_application.id,
228+
"started_by_user_id": spark_user.id,
229+
"external_id": "application_1638922609021_0001",
230+
"status": RunStatus.KILLED,
231+
"created_at": created_at + timedelta(seconds=0.1),
232+
"started_at": created_at + timedelta(seconds=1),
233+
"ended_at": created_at + timedelta(seconds=60),
234+
},
235+
)
236+
spark_app_run2 = await create_run(
237+
async_session,
238+
run_kwargs={
239+
"job_id": spark_application.id,
240+
"started_by_user_id": spark_user.id,
241+
"external_id": "application_1638922609021_0002",
242+
"status": RunStatus.SUCCEEDED,
243+
"created_at": created_at + timedelta(seconds=0.2),
244+
"started_at": created_at + timedelta(seconds=2),
245+
"ended_at": created_at + timedelta(seconds=120),
246+
},
247+
)
248+
249+
airflow_dag_run1 = await create_run(
250+
async_session,
251+
run_kwargs={
252+
"job_id": airflow_dag.id,
253+
"started_by_user_id": airflow_user.id,
254+
"external_id": "dag_0001",
255+
"status": RunStatus.STARTED,
256+
"created_at": created_at + timedelta(seconds=0.3),
257+
"started_at": created_at + timedelta(seconds=3),
258+
"ended_at": None,
259+
},
260+
)
261+
airflow_task_run1 = await create_run(
262+
async_session,
263+
run_kwargs={
264+
"job_id": airflow_task.id,
265+
"parent_run_id": airflow_dag_run1.id,
266+
"started_by_user_id": airflow_user.id,
267+
"external_id": "task_0001",
268+
"status": RunStatus.FAILED,
269+
"created_at": created_at + timedelta(seconds=0.4),
270+
"started_at": created_at + timedelta(seconds=4),
271+
"ended_at": created_at + timedelta(seconds=240),
272+
},
273+
)
274+
275+
runs = [
276+
spark_app_run1,
277+
spark_app_run2,
278+
airflow_dag_run1,
279+
airflow_task_run1,
280+
]
235281
yield {run.external_id: run for run in runs}
236282

237283
async with async_session_maker() as async_session:

0 commit comments

Comments
 (0)