Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions data_rentgen/db/repositories/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Literal, Sequence
from uuid import UUID

from sqlalchemy import ColumnElement, Select, any_, func, literal_column, select
from sqlalchemy import ColumnElement, Row, Select, any_, func, literal_column, select
from sqlalchemy.dialects.postgresql import insert

from data_rentgen.db.models import Input
Expand Down Expand Up @@ -162,7 +162,6 @@ async def _get_inputs(
if granularity == "RUN":
query = select(
func.max(Input.created_at).label("created_at"),
literal_column("NULL").label("id"),
literal_column("NULL").label("operation_id"),
Input.run_id,
Input.job_id,
Expand All @@ -180,7 +179,6 @@ async def _get_inputs(
else:
query = select(
func.max(Input.created_at).label("created_at"),
literal_column("NULL").label("id"),
literal_column("NULL").label("operation_id"),
literal_column("NULL").label("run_id"),
Input.job_id,
Expand Down Expand Up @@ -212,3 +210,32 @@ async def _get_inputs(
)
for row in query_result.all()
]

async def get_stats_by_operation_ids(self, operation_ids: Sequence[UUID]) -> dict[UUID, Row]:
if not operation_ids:
return {}

# Input created_at is always the same as operation's created_at
# do not use `tuple_(Input.created_at, Input.operation_id).in_(...),
# as this is too complex filter for Postgres to make an optimal query plan
min_created_at = extract_timestamp_from_uuid(min(operation_ids))
max_created_at = extract_timestamp_from_uuid(max(operation_ids))

query = (
select(
Input.operation_id.label("operation_id"),
func.count(Input.dataset_id.distinct()).label("total_datasets"),
func.sum(Input.num_bytes).label("total_bytes"),
func.sum(Input.num_rows).label("total_rows"),
func.sum(Input.num_files).label("total_files"),
)
.where(
Input.created_at >= min_created_at,
Input.created_at <= max_created_at,
Input.operation_id == any_(operation_ids), # type: ignore[arg-type]
)
.group_by(Input.operation_id)
)

query_result = await self._session.execute(query)
return {row.operation_id: row for row in query_result.all()}
31 changes: 30 additions & 1 deletion data_rentgen/db/repositories/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Literal, Sequence
from uuid import UUID

from sqlalchemy import ColumnElement, Select, any_, func, literal_column, select
from sqlalchemy import ColumnElement, Row, Select, any_, func, literal_column, select
from sqlalchemy.dialects.postgresql import insert

from data_rentgen.db.models import Output, OutputType
Expand Down Expand Up @@ -219,3 +219,32 @@ async def _get_outputs(
)
for row in results.all()
]

async def get_stats_by_operation_ids(self, operation_ids: Sequence[UUID]) -> dict[UUID, Row]:
if not operation_ids:
return {}

# Input created_at is always the same as operation's created_at
# do not use `tuple_(Input.created_at, Input.operation_id).in_(...),
# as this is too complex filter for Postgres to make an optimal query plan
min_created_at = extract_timestamp_from_uuid(min(operation_ids))
max_created_at = extract_timestamp_from_uuid(max(operation_ids))

query = (
select(
Output.operation_id.label("operation_id"),
func.count(Output.dataset_id.distinct()).label("total_datasets"),
func.sum(Output.num_bytes).label("total_bytes"),
func.sum(Output.num_rows).label("total_rows"),
func.sum(Output.num_files).label("total_files"),
)
.where(
Output.created_at >= min_created_at,
Output.created_at <= max_created_at,
Output.operation_id == any_(operation_ids), # type: ignore[arg-type]
)
.group_by(Output.operation_id)
)

query_result = await self._session.execute(query)
return {row.operation_id: row for row in query_result.all()}
10 changes: 6 additions & 4 deletions data_rentgen/server/api/v1/router/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from data_rentgen.server.errors.schemas import InvalidRequestSchema
from data_rentgen.server.schemas.v1 import (
LineageResponseV1,
OperationDetailedResponseV1,
OperationLineageQueryV1,
OperationQueryV1,
OperationResponseV1,
PageResponseV1,
)
from data_rentgen.server.services import get_user
from data_rentgen.server.services.lineage import LineageService
from data_rentgen.server.services.operation import OperationService
from data_rentgen.server.utils.lineage_response import build_lineage_response
from data_rentgen.services import UnitOfWork

Expand All @@ -30,17 +31,18 @@
async def operations(
query_args: Annotated[OperationQueryV1, Depends()],
unit_of_work: Annotated[UnitOfWork, Depends()],
operation_service: Annotated[OperationService, Depends()],
current_user: User = Depends(get_user()),
) -> PageResponseV1[OperationResponseV1]:
pagination = await unit_of_work.operation.paginate(
) -> PageResponseV1[OperationDetailedResponseV1]:
pagination = await operation_service.paginate(
page=query_args.page,
page_size=query_args.page_size,
since=query_args.since,
until=query_args.until,
operation_ids=query_args.operation_id,
run_id=query_args.run_id,
)
return PageResponseV1[OperationResponseV1].from_pagination(pagination)
return PageResponseV1[OperationDetailedResponseV1].from_pagination(pagination)


@router.get("/lineage", summary="Get Operation lineage graph")
Expand Down
8 changes: 7 additions & 1 deletion data_rentgen/server/schemas/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
UpdateLocationRequestV1,
)
from data_rentgen.server.schemas.v1.operation import (
OperationDetailedResponseV1,
OperationIOStatisticsReponseV1,
OperationQueryV1,
OperationResponseV1,
OperationStatisticsReponseV1,
)
from data_rentgen.server.schemas.v1.pagination import (
PageMetaResponseV1,
Expand Down Expand Up @@ -59,9 +62,12 @@
"UpdateLocationRequestV1",
"PageMetaResponseV1",
"PageResponseV1",
"OperationDetailedResponseV1",
"OperationIOStatisticsReponseV1",
"OperationLineageQueryV1",
"OperationQueryV1",
"OperationResponseV1",
"OperationLineageQueryV1",
"OperationStatisticsReponseV1",
"JobResponseV1",
"JobPaginateQueryV1",
"JobLineageQueryV1",
Expand Down
31 changes: 31 additions & 0 deletions data_rentgen/server/schemas/v1/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,37 @@ def _serialize_status(self, value: OperationStatusV1) -> str:
return str(value)


class OperationIOStatisticsReponseV1(BaseModel):
"""Operation IO statistics response."""

total_datasets: int = Field(default=0, description="Total number of datasets")
total_bytes: int = Field(default=0, description="Total number of bytes")
total_rows: int = Field(default=0, description="Total number of rows")
total_files: int = Field(default=0, description="Total number of files")

model_config = ConfigDict(from_attributes=True)


class OperationStatisticsReponseV1(BaseModel):
"""Operation statistics response."""

outputs: OperationIOStatisticsReponseV1 = Field(description="Output statistics")
inputs: OperationIOStatisticsReponseV1 = Field(description="Input statistics")

model_config = ConfigDict(from_attributes=True)


class OperationDetailedResponseV1(BaseModel):
"""Operation response."""

data: OperationResponseV1 = Field(description="Operation data")
statistics: OperationStatisticsReponseV1 = Field(
description="Operation statistics",
)

model_config = ConfigDict(from_attributes=True)


class OperationQueryV1(PaginateQueryV1):
"""Query params for Operations paginate request."""

Expand Down
3 changes: 2 additions & 1 deletion data_rentgen/server/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
from data_rentgen.server.services.get_user import get_user
from data_rentgen.server.services.lineage import LineageService
from data_rentgen.server.services.operation import OperationService

__all__ = ["LineageService", "get_user"]
__all__ = ["get_user", "LineageService", "OperationService"]
91 changes: 91 additions & 0 deletions data_rentgen/server/services/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from datetime import datetime
from typing import Annotated
from uuid import UUID

from fastapi import Depends
from sqlalchemy import Row

from data_rentgen.db.models.operation import Operation
from data_rentgen.dto.pagination import PaginationDTO
from data_rentgen.services.uow import UnitOfWork


@dataclass
class OperationServiceIOStatistics:
total_datasets: int = 0
total_bytes: int = 0
total_rows: int = 0
total_files: int = 0

@classmethod
def from_row(cls, row: Row | None):
if not row:
return cls()

return cls(
total_datasets=row.total_datasets,
total_bytes=row.total_bytes,
total_rows=row.total_rows,
total_files=row.total_files,
)


@dataclass
class OperationServiceStatistics:
inputs: OperationServiceIOStatistics
outputs: OperationServiceIOStatistics


@dataclass
class OperationServicePageItem:
data: Operation
statistics: OperationServiceStatistics


class OperationServicePaginatedResult(PaginationDTO[OperationServicePageItem]):
pass


class OperationService:
def __init__(self, uow: Annotated[UnitOfWork, Depends()]):
self._uow = uow

Check warning on line 54 in data_rentgen/server/services/operation.py

View check run for this annotation

Codecov / codecov/patch

data_rentgen/server/services/operation.py#L54

Added line #L54 was not covered by tests

async def paginate(
self,
page: int,
page_size: int,
since: datetime | None,
until: datetime | None,
operation_ids: list[UUID],
run_id: UUID | None,
) -> OperationServicePaginatedResult:
pagination = await self._uow.operation.paginate(
page=page,
page_size=page_size,
since=since,
until=until,
operation_ids=operation_ids,
run_id=run_id,
)
operation_ids = [item.id for item in pagination.items]
input_stats = await self._uow.input.get_stats_by_operation_ids(operation_ids)
output_stats = await self._uow.output.get_stats_by_operation_ids(operation_ids)

return OperationServicePaginatedResult(
page=pagination.page,
page_size=pagination.page_size,
total_count=pagination.total_count,
items=[
OperationServicePageItem(
data=operation,
statistics=OperationServiceStatistics(
inputs=OperationServiceIOStatistics.from_row(input_stats.get(operation.id)),
outputs=OperationServiceIOStatistics.from_row(output_stats.get(operation.id)),
),
)
for operation in pagination.items
],
)
47 changes: 47 additions & 0 deletions docs/changelog/next_release/158.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Change response schema of ``GET /operations`` from:

.. code:: python

{
"meta": {...},
"items": [
{
"kind": "OPERATION",
"id": ...,
# ...
}
],
}

to:

.. code:: python

{
"meta": {...},
"items": [
{
"data": {
"kind": "OPERATION",
"id": ...,
# ...
},
"statistics": {
"inputs": {
"total_datasets": 2,
"total_bytes": 123456,
"total_rows": 100,
"total_files": 0,
},
"outputs": {
"total_datasets": 2,
"total_bytes": 123456,
"total_rows": 100,
"total_files": 0,
},
},
}
],
}

This allows to show operation statistics in UI without building up lineage graph.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ ignore = [
"B008",
# Found `__init__.py` module with logic
"WPS412",
# Found incorrect order of methods in a class
"WPS338",
]

per-file-ignores = [
Expand Down
Loading
Loading