Skip to content

Commit d91591c

Browse files
committed
[DOP-22578] Return statistics in GET /operations
1 parent 7db3272 commit d91591c

File tree

10 files changed

+402
-58
lines changed

10 files changed

+402
-58
lines changed

data_rentgen/db/repositories/input.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Literal, Sequence
66
from uuid import UUID
77

8-
from sqlalchemy import ColumnElement, Select, any_, func, literal_column, select
8+
from sqlalchemy import ColumnElement, Row, Select, any_, func, literal_column, select
99
from sqlalchemy.dialects.postgresql import insert
1010

1111
from data_rentgen.db.models import Input
@@ -162,7 +162,6 @@ async def _get_inputs(
162162
if granularity == "RUN":
163163
query = select(
164164
func.max(Input.created_at).label("created_at"),
165-
literal_column("NULL").label("id"),
166165
literal_column("NULL").label("operation_id"),
167166
Input.run_id,
168167
Input.job_id,
@@ -180,7 +179,6 @@ async def _get_inputs(
180179
else:
181180
query = select(
182181
func.max(Input.created_at).label("created_at"),
183-
literal_column("NULL").label("id"),
184182
literal_column("NULL").label("operation_id"),
185183
literal_column("NULL").label("run_id"),
186184
Input.job_id,
@@ -212,3 +210,32 @@ async def _get_inputs(
212210
)
213211
for row in query_result.all()
214212
]
213+
214+
async def get_stats_by_operation_ids(self, operation_ids: Sequence[UUID]) -> dict[UUID, Row]:
215+
if not operation_ids:
216+
return {}
217+
218+
# Input created_at is always the same as operation's created_at
219+
# do not use `tuple_(Input.created_at, Input.operation_id).in_(...),
220+
# as this is too complex filter for Postgres to make an optimal query plan
221+
min_created_at = extract_timestamp_from_uuid(min(operation_ids))
222+
max_created_at = extract_timestamp_from_uuid(max(operation_ids))
223+
224+
query = (
225+
select(
226+
Input.operation_id.label("operation_id"),
227+
func.count(Input.dataset_id.distinct()).label("total_datasets"),
228+
func.sum(Input.num_bytes).label("total_bytes"),
229+
func.sum(Input.num_rows).label("total_rows"),
230+
func.sum(Input.num_files).label("total_files"),
231+
)
232+
.where(
233+
Input.created_at >= min_created_at,
234+
Input.created_at <= max_created_at,
235+
Input.operation_id == any_(operation_ids), # type: ignore[arg-type]
236+
)
237+
.group_by(Input.operation_id)
238+
)
239+
240+
query_result = await self._session.execute(query)
241+
return {row.operation_id: row for row in query_result.all()}

data_rentgen/db/repositories/output.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Literal, Sequence
66
from uuid import UUID
77

8-
from sqlalchemy import ColumnElement, Select, any_, func, literal_column, select
8+
from sqlalchemy import ColumnElement, Row, Select, any_, func, literal_column, select
99
from sqlalchemy.dialects.postgresql import insert
1010

1111
from data_rentgen.db.models import Output, OutputType
@@ -219,3 +219,32 @@ async def _get_outputs(
219219
)
220220
for row in results.all()
221221
]
222+
223+
async def get_stats_by_operation_ids(self, operation_ids: Sequence[UUID]) -> dict[UUID, Row]:
224+
if not operation_ids:
225+
return {}
226+
227+
# Input created_at is always the same as operation's created_at
228+
# do not use `tuple_(Input.created_at, Input.operation_id).in_(...),
229+
# as this is too complex filter for Postgres to make an optimal query plan
230+
min_created_at = extract_timestamp_from_uuid(min(operation_ids))
231+
max_created_at = extract_timestamp_from_uuid(max(operation_ids))
232+
233+
query = (
234+
select(
235+
Output.operation_id.label("operation_id"),
236+
func.count(Output.dataset_id.distinct()).label("total_datasets"),
237+
func.sum(Output.num_bytes).label("total_bytes"),
238+
func.sum(Output.num_rows).label("total_rows"),
239+
func.sum(Output.num_files).label("total_files"),
240+
)
241+
.where(
242+
Output.created_at >= min_created_at,
243+
Output.created_at <= max_created_at,
244+
Output.operation_id == any_(operation_ids), # type: ignore[arg-type]
245+
)
246+
.group_by(Output.operation_id)
247+
)
248+
249+
query_result = await self._session.execute(query)
250+
return {row.operation_id: row for row in query_result.all()}

data_rentgen/server/api/v1/router/operation.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
from data_rentgen.server.errors.schemas import InvalidRequestSchema
1010
from data_rentgen.server.schemas.v1 import (
1111
LineageResponseV1,
12+
OperationDetailedResponseV1,
1213
OperationLineageQueryV1,
1314
OperationQueryV1,
14-
OperationResponseV1,
1515
PageResponseV1,
1616
)
1717
from data_rentgen.server.services import get_user
1818
from data_rentgen.server.services.lineage import LineageService
19+
from data_rentgen.server.services.operation import OperationService
1920
from data_rentgen.server.utils.lineage_response import build_lineage_response
2021
from data_rentgen.services import UnitOfWork
2122

@@ -30,17 +31,18 @@
3031
async def operations(
3132
query_args: Annotated[OperationQueryV1, Depends()],
3233
unit_of_work: Annotated[UnitOfWork, Depends()],
34+
operation_service: Annotated[OperationService, Depends()],
3335
current_user: User = Depends(get_user()),
34-
) -> PageResponseV1[OperationResponseV1]:
35-
pagination = await unit_of_work.operation.paginate(
36+
) -> PageResponseV1[OperationDetailedResponseV1]:
37+
pagination = await operation_service.paginate(
3638
page=query_args.page,
3739
page_size=query_args.page_size,
3840
since=query_args.since,
3941
until=query_args.until,
4042
operation_ids=query_args.operation_id,
4143
run_id=query_args.run_id,
4244
)
43-
return PageResponseV1[OperationResponseV1].from_pagination(pagination)
45+
return PageResponseV1[OperationDetailedResponseV1].from_pagination(pagination)
4446

4547

4648
@router.get("/lineage", summary="Get Operation lineage graph")

data_rentgen/server/schemas/v1/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
UpdateLocationRequestV1,
2727
)
2828
from data_rentgen.server.schemas.v1.operation import (
29+
OperationDetailedResponseV1,
30+
OperationIOStatisticsReponseV1,
2931
OperationQueryV1,
3032
OperationResponseV1,
33+
OperationStatisticsReponseV1,
3134
)
3235
from data_rentgen.server.schemas.v1.pagination import (
3336
PageMetaResponseV1,
@@ -59,9 +62,12 @@
5962
"UpdateLocationRequestV1",
6063
"PageMetaResponseV1",
6164
"PageResponseV1",
65+
"OperationDetailedResponseV1",
66+
"OperationIOStatisticsReponseV1",
67+
"OperationLineageQueryV1",
6268
"OperationQueryV1",
6369
"OperationResponseV1",
64-
"OperationLineageQueryV1",
70+
"OperationStatisticsReponseV1",
6571
"JobResponseV1",
6672
"JobPaginateQueryV1",
6773
"JobLineageQueryV1",

data_rentgen/server/schemas/v1/operation.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,37 @@ def _serialize_status(self, value: OperationStatusV1) -> str:
6464
return str(value)
6565

6666

67+
class OperationIOStatisticsReponseV1(BaseModel):
68+
"""Operation IO statistics response."""
69+
70+
total_datasets: int = Field(default=0, description="Total number of datasets")
71+
total_bytes: int = Field(default=0, description="Total number of bytes")
72+
total_rows: int = Field(default=0, description="Total number of rows")
73+
total_files: int = Field(default=0, description="Total number of files")
74+
75+
model_config = ConfigDict(from_attributes=True)
76+
77+
78+
class OperationStatisticsReponseV1(BaseModel):
79+
"""Operation statistics response."""
80+
81+
outputs: OperationIOStatisticsReponseV1 = Field(description="Output statistics")
82+
inputs: OperationIOStatisticsReponseV1 = Field(description="Input statistics")
83+
84+
model_config = ConfigDict(from_attributes=True)
85+
86+
87+
class OperationDetailedResponseV1(BaseModel):
88+
"""Operation response."""
89+
90+
data: OperationResponseV1 = Field(description="Operation data")
91+
statistics: OperationStatisticsReponseV1 = Field(
92+
description="Operation statistics",
93+
)
94+
95+
model_config = ConfigDict(from_attributes=True)
96+
97+
6798
class OperationQueryV1(PaginateQueryV1):
6899
"""Query params for Operations paginate request."""
69100

data_rentgen/server/services/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from data_rentgen.server.services.get_user import get_user
44
from data_rentgen.server.services.lineage import LineageService
5+
from data_rentgen.server.services.operation import OperationService
56

6-
__all__ = ["LineageService", "get_user"]
7+
__all__ = ["get_user", "LineageService", "OperationService"]
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
from typing import Annotated
6+
from uuid import UUID
7+
8+
from fastapi import Depends
9+
from sqlalchemy import Row
10+
11+
from data_rentgen.db.models.operation import Operation
12+
from data_rentgen.dto.pagination import PaginationDTO
13+
from data_rentgen.services.uow import UnitOfWork
14+
15+
16+
@dataclass
17+
class OperationServiceIOStatistics:
18+
total_datasets: int = 0
19+
total_bytes: int = 0
20+
total_rows: int = 0
21+
total_files: int = 0
22+
23+
@classmethod
24+
def from_row(cls, row: Row | None):
25+
if not row:
26+
return cls()
27+
28+
return cls(
29+
total_datasets=row.total_datasets,
30+
total_bytes=row.total_bytes,
31+
total_rows=row.total_rows,
32+
total_files=row.total_files,
33+
)
34+
35+
36+
@dataclass
37+
class OperationServiceStatistics:
38+
inputs: OperationServiceIOStatistics
39+
outputs: OperationServiceIOStatistics
40+
41+
42+
@dataclass
43+
class OperationServicePageItem:
44+
data: Operation
45+
statistics: OperationServiceStatistics
46+
47+
48+
class OperationServicePaginatedResult(PaginationDTO[OperationServicePageItem]):
49+
pass
50+
51+
52+
class OperationService:
53+
def __init__(self, uow: Annotated[UnitOfWork, Depends()]):
54+
self._uow = uow
55+
56+
async def paginate(
57+
self,
58+
page: int,
59+
page_size: int,
60+
since: datetime | None,
61+
until: datetime | None,
62+
operation_ids: list[UUID],
63+
run_id: UUID | None,
64+
) -> OperationServicePaginatedResult:
65+
pagination = await self._uow.operation.paginate(
66+
page=page,
67+
page_size=page_size,
68+
since=since,
69+
until=until,
70+
operation_ids=operation_ids,
71+
run_id=run_id,
72+
)
73+
operation_ids = [item.id for item in pagination.items]
74+
input_stats = await self._uow.input.get_stats_by_operation_ids(operation_ids)
75+
output_stats = await self._uow.output.get_stats_by_operation_ids(operation_ids)
76+
77+
return OperationServicePaginatedResult(
78+
page=pagination.page,
79+
page_size=pagination.page_size,
80+
total_count=pagination.total_count,
81+
items=[
82+
OperationServicePageItem(
83+
data=operation,
84+
statistics=OperationServiceStatistics(
85+
inputs=OperationServiceIOStatistics.from_row(input_stats.get(operation.id)),
86+
outputs=OperationServiceIOStatistics.from_row(output_stats.get(operation.id)),
87+
),
88+
)
89+
for operation in pagination.items
90+
],
91+
)

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,8 @@ ignore = [
438438
"B008",
439439
# Found `__init__.py` module with logic
440440
"WPS412",
441+
# Found incorrect order of methods in a class
442+
"WPS338",
441443
]
442444

443445
per-file-ignores = [

0 commit comments

Comments
 (0)