diff --git a/data_rentgen/db/repositories/input.py b/data_rentgen/db/repositories/input.py index 8feac25f..9322e1e6 100644 --- a/data_rentgen/db/repositories/input.py +++ b/data_rentgen/db/repositories/input.py @@ -5,7 +5,7 @@ from typing import Literal, Sequence from uuid import UUID -from sqlalchemy import ColumnElement, Select, any_, func, literal_column, select +from sqlalchemy import ColumnElement, Row, Select, any_, func, literal_column, select from sqlalchemy.dialects.postgresql import insert from data_rentgen.db.models import Input @@ -162,7 +162,6 @@ async def _get_inputs( if granularity == "RUN": query = select( func.max(Input.created_at).label("created_at"), - literal_column("NULL").label("id"), literal_column("NULL").label("operation_id"), Input.run_id, Input.job_id, @@ -180,7 +179,6 @@ async def _get_inputs( else: query = select( func.max(Input.created_at).label("created_at"), - literal_column("NULL").label("id"), literal_column("NULL").label("operation_id"), literal_column("NULL").label("run_id"), Input.job_id, @@ -212,3 +210,32 @@ async def _get_inputs( ) for row in query_result.all() ] + + async def get_stats_by_operation_ids(self, operation_ids: Sequence[UUID]) -> dict[UUID, Row]: + if not operation_ids: + return {} + + # Input created_at is always the same as operation's created_at + # do not use `tuple_(Input.created_at, Input.operation_id).in_(...), + # as this is too complex filter for Postgres to make an optimal query plan + min_created_at = extract_timestamp_from_uuid(min(operation_ids)) + max_created_at = extract_timestamp_from_uuid(max(operation_ids)) + + query = ( + select( + Input.operation_id.label("operation_id"), + func.count(Input.dataset_id.distinct()).label("total_datasets"), + func.sum(Input.num_bytes).label("total_bytes"), + func.sum(Input.num_rows).label("total_rows"), + func.sum(Input.num_files).label("total_files"), + ) + .where( + Input.created_at >= min_created_at, + Input.created_at <= max_created_at, + Input.operation_id == any_(operation_ids), # type: ignore[arg-type] + ) + .group_by(Input.operation_id) + ) + + query_result = await self._session.execute(query) + return {row.operation_id: row for row in query_result.all()} diff --git a/data_rentgen/db/repositories/output.py b/data_rentgen/db/repositories/output.py index 987ef472..742a35d5 100644 --- a/data_rentgen/db/repositories/output.py +++ b/data_rentgen/db/repositories/output.py @@ -5,7 +5,7 @@ from typing import Literal, Sequence from uuid import UUID -from sqlalchemy import ColumnElement, Select, any_, func, literal_column, select +from sqlalchemy import ColumnElement, Row, Select, any_, func, literal_column, select from sqlalchemy.dialects.postgresql import insert from data_rentgen.db.models import Output, OutputType @@ -219,3 +219,32 @@ async def _get_outputs( ) for row in results.all() ] + + async def get_stats_by_operation_ids(self, operation_ids: Sequence[UUID]) -> dict[UUID, Row]: + if not operation_ids: + return {} + + # Input created_at is always the same as operation's created_at + # do not use `tuple_(Input.created_at, Input.operation_id).in_(...), + # as this is too complex filter for Postgres to make an optimal query plan + min_created_at = extract_timestamp_from_uuid(min(operation_ids)) + max_created_at = extract_timestamp_from_uuid(max(operation_ids)) + + query = ( + select( + Output.operation_id.label("operation_id"), + func.count(Output.dataset_id.distinct()).label("total_datasets"), + func.sum(Output.num_bytes).label("total_bytes"), + func.sum(Output.num_rows).label("total_rows"), + func.sum(Output.num_files).label("total_files"), + ) + .where( + Output.created_at >= min_created_at, + Output.created_at <= max_created_at, + Output.operation_id == any_(operation_ids), # type: ignore[arg-type] + ) + .group_by(Output.operation_id) + ) + + query_result = await self._session.execute(query) + return {row.operation_id: row for row in query_result.all()} diff --git a/data_rentgen/server/api/v1/router/operation.py b/data_rentgen/server/api/v1/router/operation.py index 9bafdabb..70744dfe 100644 --- a/data_rentgen/server/api/v1/router/operation.py +++ b/data_rentgen/server/api/v1/router/operation.py @@ -9,13 +9,14 @@ from data_rentgen.server.errors.schemas import InvalidRequestSchema from data_rentgen.server.schemas.v1 import ( LineageResponseV1, + OperationDetailedResponseV1, OperationLineageQueryV1, OperationQueryV1, - OperationResponseV1, PageResponseV1, ) from data_rentgen.server.services import get_user from data_rentgen.server.services.lineage import LineageService +from data_rentgen.server.services.operation import OperationService from data_rentgen.server.utils.lineage_response import build_lineage_response from data_rentgen.services import UnitOfWork @@ -30,9 +31,10 @@ async def operations( query_args: Annotated[OperationQueryV1, Depends()], unit_of_work: Annotated[UnitOfWork, Depends()], + operation_service: Annotated[OperationService, Depends()], current_user: User = Depends(get_user()), -) -> PageResponseV1[OperationResponseV1]: - pagination = await unit_of_work.operation.paginate( +) -> PageResponseV1[OperationDetailedResponseV1]: + pagination = await operation_service.paginate( page=query_args.page, page_size=query_args.page_size, since=query_args.since, @@ -40,7 +42,7 @@ async def operations( operation_ids=query_args.operation_id, run_id=query_args.run_id, ) - return PageResponseV1[OperationResponseV1].from_pagination(pagination) + return PageResponseV1[OperationDetailedResponseV1].from_pagination(pagination) @router.get("/lineage", summary="Get Operation lineage graph") diff --git a/data_rentgen/server/schemas/v1/__init__.py b/data_rentgen/server/schemas/v1/__init__.py index 68aa9226..a01eb340 100644 --- a/data_rentgen/server/schemas/v1/__init__.py +++ b/data_rentgen/server/schemas/v1/__init__.py @@ -26,8 +26,11 @@ UpdateLocationRequestV1, ) from data_rentgen.server.schemas.v1.operation import ( + OperationDetailedResponseV1, + OperationIOStatisticsReponseV1, OperationQueryV1, OperationResponseV1, + OperationStatisticsReponseV1, ) from data_rentgen.server.schemas.v1.pagination import ( PageMetaResponseV1, @@ -59,9 +62,12 @@ "UpdateLocationRequestV1", "PageMetaResponseV1", "PageResponseV1", + "OperationDetailedResponseV1", + "OperationIOStatisticsReponseV1", + "OperationLineageQueryV1", "OperationQueryV1", "OperationResponseV1", - "OperationLineageQueryV1", + "OperationStatisticsReponseV1", "JobResponseV1", "JobPaginateQueryV1", "JobLineageQueryV1", diff --git a/data_rentgen/server/schemas/v1/operation.py b/data_rentgen/server/schemas/v1/operation.py index e9dbe419..5b0a53cf 100644 --- a/data_rentgen/server/schemas/v1/operation.py +++ b/data_rentgen/server/schemas/v1/operation.py @@ -64,6 +64,37 @@ def _serialize_status(self, value: OperationStatusV1) -> str: return str(value) +class OperationIOStatisticsReponseV1(BaseModel): + """Operation IO statistics response.""" + + total_datasets: int = Field(default=0, description="Total number of datasets") + total_bytes: int = Field(default=0, description="Total number of bytes") + total_rows: int = Field(default=0, description="Total number of rows") + total_files: int = Field(default=0, description="Total number of files") + + model_config = ConfigDict(from_attributes=True) + + +class OperationStatisticsReponseV1(BaseModel): + """Operation statistics response.""" + + outputs: OperationIOStatisticsReponseV1 = Field(description="Output statistics") + inputs: OperationIOStatisticsReponseV1 = Field(description="Input statistics") + + model_config = ConfigDict(from_attributes=True) + + +class OperationDetailedResponseV1(BaseModel): + """Operation response.""" + + data: OperationResponseV1 = Field(description="Operation data") + statistics: OperationStatisticsReponseV1 = Field( + description="Operation statistics", + ) + + model_config = ConfigDict(from_attributes=True) + + class OperationQueryV1(PaginateQueryV1): """Query params for Operations paginate request.""" diff --git a/data_rentgen/server/services/__init__.py b/data_rentgen/server/services/__init__.py index 51faa94a..64fbf826 100644 --- a/data_rentgen/server/services/__init__.py +++ b/data_rentgen/server/services/__init__.py @@ -2,5 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 from data_rentgen.server.services.get_user import get_user from data_rentgen.server.services.lineage import LineageService +from data_rentgen.server.services.operation import OperationService -__all__ = ["LineageService", "get_user"] +__all__ = ["get_user", "LineageService", "OperationService"] diff --git a/data_rentgen/server/services/operation.py b/data_rentgen/server/services/operation.py new file mode 100644 index 00000000..300a8813 --- /dev/null +++ b/data_rentgen/server/services/operation.py @@ -0,0 +1,91 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from dataclasses import dataclass +from datetime import datetime +from typing import Annotated +from uuid import UUID + +from fastapi import Depends +from sqlalchemy import Row + +from data_rentgen.db.models.operation import Operation +from data_rentgen.dto.pagination import PaginationDTO +from data_rentgen.services.uow import UnitOfWork + + +@dataclass +class OperationServiceIOStatistics: + total_datasets: int = 0 + total_bytes: int = 0 + total_rows: int = 0 + total_files: int = 0 + + @classmethod + def from_row(cls, row: Row | None): + if not row: + return cls() + + return cls( + total_datasets=row.total_datasets, + total_bytes=row.total_bytes, + total_rows=row.total_rows, + total_files=row.total_files, + ) + + +@dataclass +class OperationServiceStatistics: + inputs: OperationServiceIOStatistics + outputs: OperationServiceIOStatistics + + +@dataclass +class OperationServicePageItem: + data: Operation + statistics: OperationServiceStatistics + + +class OperationServicePaginatedResult(PaginationDTO[OperationServicePageItem]): + pass + + +class OperationService: + def __init__(self, uow: Annotated[UnitOfWork, Depends()]): + self._uow = uow + + async def paginate( + self, + page: int, + page_size: int, + since: datetime | None, + until: datetime | None, + operation_ids: list[UUID], + run_id: UUID | None, + ) -> OperationServicePaginatedResult: + pagination = await self._uow.operation.paginate( + page=page, + page_size=page_size, + since=since, + until=until, + operation_ids=operation_ids, + run_id=run_id, + ) + operation_ids = [item.id for item in pagination.items] + input_stats = await self._uow.input.get_stats_by_operation_ids(operation_ids) + output_stats = await self._uow.output.get_stats_by_operation_ids(operation_ids) + + return OperationServicePaginatedResult( + page=pagination.page, + page_size=pagination.page_size, + total_count=pagination.total_count, + items=[ + OperationServicePageItem( + data=operation, + statistics=OperationServiceStatistics( + inputs=OperationServiceIOStatistics.from_row(input_stats.get(operation.id)), + outputs=OperationServiceIOStatistics.from_row(output_stats.get(operation.id)), + ), + ) + for operation in pagination.items + ], + ) diff --git a/docs/changelog/next_release/158.breaking.rst b/docs/changelog/next_release/158.breaking.rst new file mode 100644 index 00000000..d022124c --- /dev/null +++ b/docs/changelog/next_release/158.breaking.rst @@ -0,0 +1,47 @@ +Change response schema of ``GET /operations`` from: + +.. code:: python + + { + "meta": {...}, + "items": [ + { + "kind": "OPERATION", + "id": ..., + # ... + } + ], + } + +to: + +.. code:: python + + { + "meta": {...}, + "items": [ + { + "data": { + "kind": "OPERATION", + "id": ..., + # ... + }, + "statistics": { + "inputs": { + "total_datasets": 2, + "total_bytes": 123456, + "total_rows": 100, + "total_files": 0, + }, + "outputs": { + "total_datasets": 2, + "total_bytes": 123456, + "total_rows": 100, + "total_files": 0, + }, + }, + } + ], + } + +This allows to show operation statistics in UI without building up lineage graph. diff --git a/pyproject.toml b/pyproject.toml index ae26597f..ccf48da8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -438,6 +438,8 @@ ignore = [ "B008", # Found `__init__.py` module with logic "WPS412", +# Found incorrect order of methods in a class + "WPS338", ] per-file-ignores = [ diff --git a/tests/test_server/test_operations/test_get_operations_by_id.py b/tests/test_server/test_operations/test_get_operations_by_id.py index 6e13769a..c109eceb 100644 --- a/tests/test_server/test_operations/test_get_operations_by_id.py +++ b/tests/test_server/test_operations/test_get_operations_by_id.py @@ -1,3 +1,4 @@ +from collections import defaultdict from http import HTTPStatus import pytest @@ -5,6 +6,7 @@ from data_rentgen.db.models import Operation from tests.fixtures.mocks import MockedUser +from tests.test_server.utils.lineage_result import LineageResult pytestmark = [pytest.mark.server, pytest.mark.asyncio] @@ -61,18 +63,34 @@ async def test_get_operations_by_one_id( }, "items": [ { - "kind": "OPERATION", - "id": str(operation.id), - "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "run_id": str(operation.run_id), - "name": operation.name, - "status": operation.status.name, - "type": operation.type.value, - "position": operation.position, - "group": operation.group, - "description": operation.description, - "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), - "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "data": { + "kind": "OPERATION", + "id": str(operation.id), + "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "run_id": str(operation.run_id), + "name": operation.name, + "status": operation.status.name, + "type": operation.type.value, + "position": operation.position, + "group": operation.group, + "description": operation.description, + "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + "statistics": { + "inputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + "outputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + }, }, ], } @@ -108,19 +126,124 @@ async def test_get_operations_by_multiple_ids( }, "items": [ { - "kind": "OPERATION", - "id": str(operation.id), - "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "run_id": str(operation.run_id), - "name": operation.name, - "status": operation.status.name, - "type": operation.type.value, - "position": operation.position, - "group": operation.group, - "description": operation.description, - "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), - "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "data": { + "kind": "OPERATION", + "id": str(operation.id), + "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "run_id": str(operation.run_id), + "name": operation.name, + "status": operation.status.name, + "type": operation.type.value, + "position": operation.position, + "group": operation.group, + "description": operation.description, + "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + "statistics": { + "inputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + "outputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + }, } for operation in sorted(selected_operations, key=lambda x: (x.run_id.int, -x.id.int)) ], } + + +async def test_get_operations_by_multiple_ids_with_stats( + test_client: AsyncClient, + simple_lineage: LineageResult, + mocked_user: MockedUser, +): + input_stats = defaultdict(dict) + output_stats = defaultdict(dict) + for operation in simple_lineage.operations: + input_bytes = 0 + input_rows = 0 + input_files = 0 + output_bytes = 0 + output_rows = 0 + output_files = 0 + + for input in simple_lineage.inputs: + if input.operation_id != operation.id: + continue + input_bytes += input.num_bytes or 0 + input_rows += input.num_rows or 0 + input_files += input.num_files or 0 + + for output in simple_lineage.outputs: + if output.operation_id != operation.id: + continue + output_bytes += output.num_bytes or 0 + output_rows += output.num_rows or 0 + output_files += output.num_files or 0 + + input_stats[operation.id] = { + "total_datasets": 1, + "total_bytes": input_bytes, + "total_rows": input_rows, + "total_files": input_files, + } + + output_stats[operation.id] = { + "total_datasets": 1, + "total_bytes": output_bytes, + "total_rows": output_rows, + "total_files": output_files, + } + + response = await test_client.get( + "v1/operations", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "operation_id": [str(operation.id) for operation in simple_lineage.operations], + }, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "page": 1, + "page_size": 20, + "total_count": len(simple_lineage.operations), + "pages_count": 1, + "has_next": False, + "has_previous": False, + "next_page": None, + "previous_page": None, + }, + "items": [ + { + "data": { + "kind": "OPERATION", + "id": str(operation.id), + "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "run_id": str(operation.run_id), + "name": operation.name, + "status": operation.status.name, + "type": operation.type.value, + "position": operation.position, + "group": operation.group, + "description": operation.description, + "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + "statistics": { + "inputs": input_stats[operation.id], + "outputs": output_stats[operation.id], + }, + } + for operation in sorted(simple_lineage.operations, key=lambda x: (x.run_id.int, -x.id.int)) + ], + } diff --git a/tests/test_server/test_operations/test_get_operations_by_run_id.py b/tests/test_server/test_operations/test_get_operations_by_run_id.py index 7e877369..ea00089f 100644 --- a/tests/test_server/test_operations/test_get_operations_by_run_id.py +++ b/tests/test_server/test_operations/test_get_operations_by_run_id.py @@ -114,18 +114,34 @@ async def test_get_operations_by_run_id( }, "items": [ { - "kind": "OPERATION", - "id": str(operation.id), - "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "run_id": str(operation.run_id), - "name": operation.name, - "status": operation.status.name, - "type": operation.type.value, - "position": operation.position, - "group": operation.group, - "description": operation.description, - "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), - "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "data": { + "kind": "OPERATION", + "id": str(operation.id), + "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "run_id": str(operation.run_id), + "name": operation.name, + "status": operation.status.name, + "type": operation.type.value, + "position": operation.position, + "group": operation.group, + "description": operation.description, + "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + "statistics": { + "inputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + "outputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + }, } for operation in sorted(selected_operations, key=lambda x: x.id, reverse=True) ], @@ -168,18 +184,34 @@ async def test_get_operations_by_run_id_with_until( }, "items": [ { - "kind": "OPERATION", - "id": str(operation.id), - "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "run_id": str(operation.run_id), - "name": operation.name, - "status": operation.status.name, - "type": operation.type.value, - "position": operation.position, - "group": operation.group, - "description": operation.description, - "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), - "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "data": { + "kind": "OPERATION", + "id": str(operation.id), + "created_at": operation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "run_id": str(operation.run_id), + "name": operation.name, + "status": operation.status.name, + "type": operation.type.value, + "position": operation.position, + "group": operation.group, + "description": operation.description, + "started_at": operation.started_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ended_at": operation.ended_at.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + "statistics": { + "inputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + "outputs": { + "total_datasets": 0, + "total_bytes": 0, + "total_rows": 0, + "total_files": 0, + }, + }, } for operation in sorted(selected_operations, key=lambda x: x.id, reverse=True) ],