From dc77030874a22b1cf669a0591281a89259752ce5 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 16 Apr 2025 15:10:18 +0200 Subject: [PATCH 01/19] intro --- api/specs/web-server/_computations.py | 29 +++++ .../api_schemas_directorv2/__init__.py | 11 ++ .../api_schemas_directorv2/comp_runs.py | 41 +++++++ .../api_schemas_webserver/computations.py | 41 ++++++- .../rpc_interfaces/director_v2/__init__.py | 0 .../director_v2/computations.py | 85 ++++++++++++++ .../api/dependencies/database.py | 10 +- .../api/rpc/__init__.py | 0 .../api/rpc/_computations.py | 74 ++++++++++++ .../api/rpc/routes.py | 34 ++++++ .../core/application.py | 2 + .../modules/comp_scheduler/_manager.py | 4 +- .../modules/db/repositories/comp_runs.py | 86 +++++++++++++- .../db/repositories/comp_tasks/_core.py | 84 +++++++++++--- .../modules/rabbitmq.py | 12 +- .../test_db_repositories_comp_runs.py | 24 ++-- .../api/v0/openapi.yaml | 101 +++++++++++++++++ .../director_v2/_computations_rest.py | 107 ++++++++++++++++++ .../director_v2/_computations_rest_schema.py | 45 ++++++++ .../director_v2/_computations_service.py | 65 +++++++++++ .../director_v2/_controller/rest.py | 8 +- ...py => _director_v2_abc_default_service.py} | 2 +- ...ice_abc.py => _director_v2_abc_service.py} | 0 .../{_service.py => _director_v2_service.py} | 0 .../director_v2/director_v2_service.py | 12 +- .../director_v2/plugin.py | 4 +- .../projects/_nodes_service.py | 2 +- .../projects/_projects_service.py | 4 +- 28 files changed, 838 insertions(+), 49 deletions(-) create mode 100644 packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py create mode 100644 packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/__init__.py create mode 100644 packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py create mode 100644 services/director-v2/src/simcore_service_director_v2/api/rpc/__init__.py create mode 100644 services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py create mode 100644 services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py create mode 100644 services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py create mode 100644 services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py create mode 100644 services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py rename services/web/server/src/simcore_service_webserver/director_v2/{_service_abc_default.py => _director_v2_abc_default_service.py} (94%) rename services/web/server/src/simcore_service_webserver/director_v2/{_service_abc.py => _director_v2_abc_service.py} (100%) rename services/web/server/src/simcore_service_webserver/director_v2/{_service.py => _director_v2_service.py} (100%) diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index 59e0e873edc1..a43895d87ff6 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -1,5 +1,6 @@ from typing import Annotated +from _common import as_query from fastapi import APIRouter, Depends, status from models_library.api_schemas_webserver.computations import ( ComputationGet, @@ -9,6 +10,11 @@ ) from models_library.generics import Envelope from simcore_service_webserver._meta import API_VTAG +from simcore_service_webserver.director_v2._computations_rest_schema import ( + ComputationRunListQueryParams, + ComputationTaskListQueryParams, + ComputationTaskPathParams, +) router = APIRouter( prefix=f"/{API_VTAG}", @@ -53,3 +59,26 @@ async def start_computation( status_code=status.HTTP_204_NO_CONTENT, ) async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]): ... + + +@router.get( + "/computations/-/iterations/latest", + response_model=Envelope[list[ComputationGet]], + name="list_computations_latest_iteration", + description="Lists the latest iteration of computations", +) +async def list_computations_latest_iteration( + _query: Annotated[as_query(ComputationRunListQueryParams), Depends()], +): ... + + +@router.get( + "/computations/{project_id}/iterations/latest/tasks", + response_model=Envelope[list[ComputationGet]], + name="list_computations_latest_iteration_tasks", + description="Lists the latest iteration tasks for a computation", +) +async def list_computations_latest_iteration_tasks( + _query: Annotated[as_query(ComputationTaskListQueryParams), Depends()], + _path: Annotated[ComputationTaskPathParams, Depends()], +): ... diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py b/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py index c96b6f2563e6..6ab84683cdb9 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py @@ -1,8 +1,19 @@ +from typing import Final + +from pydantic import TypeAdapter + +from ..rabbitmq_basic_types import RPCNamespace from . import clusters, dynamic_services assert clusters # nosec assert dynamic_services # nosec + +DIRECTOR_V2_RPC_NAMESPACE: Final[RPCNamespace] = TypeAdapter( + RPCNamespace +).validate_python("director-v2") + + __all__: tuple[str, ...] = ( "clusters", "dynamic_services", diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py new file mode 100644 index 000000000000..a237438b8fd0 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py @@ -0,0 +1,41 @@ +from datetime import datetime +from typing import Any, NamedTuple + +from pydantic import ( + BaseModel, + PositiveInt, +) + +from ..projects import ProjectID +from ..projects_nodes_io import NodeID +from ..projects_state import RunningState + + +class ComputationRunRpcGet(BaseModel): + project_uuid: ProjectID + iteration: int + state: RunningState + info: dict[str, Any] + submitted_at: datetime + started_at: datetime | None + ended_at: datetime | None + + +class ComputationRunRpcGetPage(NamedTuple): + items: list[ComputationRunRpcGet] + total: PositiveInt + + +class ComputationTaskRpcGet(BaseModel): + project_uuid: ProjectID + node_id: NodeID + state: RunningState + progress: float + image: dict[str, Any] + started_at: datetime | None + ended_at: datetime | None + + +class ComputationTaskRpcGetPage(NamedTuple): + items: list[ComputationTaskRpcGet] + total: PositiveInt diff --git a/packages/models-library/src/models_library/api_schemas_webserver/computations.py b/packages/models-library/src/models_library/api_schemas_webserver/computations.py index 3a251c6141bc..a4fcf4f978e5 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/computations.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/computations.py @@ -1,12 +1,19 @@ -from typing import Annotated +from datetime import datetime +from typing import Annotated, Any, NamedTuple from common_library.basic_types import DEFAULT_FACTORY -from pydantic import BaseModel, Field +from pydantic import ( + BaseModel, + Field, + PositiveInt, +) from ..api_schemas_directorv2.computations import ( ComputationGet as _DirectorV2ComputationGet, ) from ..projects import CommitID, ProjectID +from ..projects_nodes_io import NodeID +from ..projects_state import RunningState from ._base import InputSchemaWithoutCamelCase, OutputSchemaWithoutCamelCase @@ -41,3 +48,33 @@ class ComputationStarted(OutputSchemaWithoutCamelCase): json_schema_extra={"default": []}, ), ] = DEFAULT_FACTORY + + +class ComputationRunRestGet(BaseModel): + project_uuid: ProjectID + iteration: int + state: RunningState + info: dict[str, Any] + submitted_at: datetime + started_at: datetime | None + ended_at: datetime | None + + +class ComputationRunRestGetPage(NamedTuple): + items: list[ComputationRunRestGet] + total: PositiveInt + + +class ComputationTaskRestGet(BaseModel): + project_uuid: ProjectID + node_id: NodeID + state: RunningState + progress: float + image: dict[str, Any] + started_at: datetime | None + ended_at: datetime | None + + +class ComputationTaskRestGetPage(NamedTuple): + items: list[ComputationTaskRestGet] + total: PositiveInt diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/__init__.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py new file mode 100644 index 000000000000..74ae336188f2 --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py @@ -0,0 +1,85 @@ +# pylint: disable=too-many-arguments +import logging +from typing import Final + +from models_library.api_schemas_directorv2 import ( + DIRECTOR_V2_RPC_NAMESPACE, +) +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from pydantic import NonNegativeInt, TypeAdapter + +from ....logging_utils import log_decorator +from ... import RabbitMQRPCClient + +_logger = logging.getLogger(__name__) + + +_DEFAULT_TIMEOUT_S: Final[NonNegativeInt] = 20 + +_RPC_METHOD_NAME_ADAPTER: TypeAdapter[RPCMethodName] = TypeAdapter(RPCMethodName) + + +@log_decorator(_logger, level=logging.DEBUG) +async def list_computations_latest_iteration_page( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + product_name: ProductName, + user_id: UserID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationRunRpcGetPage: + result = await rabbitmq_rpc_client.request( + DIRECTOR_V2_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python( + "list_computations_latest_iteration_page" + ), + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, ComputationRunRpcGetPage) # nosec + return result + + +@log_decorator(_logger, level=logging.DEBUG) +async def list_computations_latest_iteration_tasks_page( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationTaskRpcGetPage: + result = await rabbitmq_rpc_client.request( + DIRECTOR_V2_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python( + "list_computations_latest_iteration_tasks_page" + ), + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, ComputationTaskRpcGetPage) # nosec + return result diff --git a/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py b/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py index 576b42a0c59c..aee95be2e150 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py +++ b/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py @@ -3,7 +3,7 @@ from typing import TypeVar, cast from aiopg.sa import Engine -from fastapi import Depends +from fastapi import Depends, FastAPI from fastapi.requests import Request from ...modules.db.repositories import BaseRepository @@ -47,3 +47,11 @@ async def _get_repo( yield get_base_repository(engine=engine, repo_type=repo_type) return _get_repo + + +def get_repository_instance(app: FastAPI, repo_type: type[RepoType]) -> RepoType: + """ + Retrieves an instance of the specified repository type using the database engine from the FastAPI app. + """ + engine = cast(Engine, app.state.engine) + return get_base_repository(engine=engine, repo_type=repo_type) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/__init__.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py new file mode 100644 index 000000000000..3b76a587a3a8 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -0,0 +1,74 @@ +# pylint: disable=too-many-arguments +from fastapi import FastAPI +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from servicelib.rabbitmq import RPCRouter + +from ...modules.db.repositories.comp_runs import CompRunsRepository +from ...modules.db.repositories.comp_tasks import CompTasksRepository +from ..dependencies.database import get_repository_instance + +router = RPCRouter() + + +@router.expose(reraise_if_error_type=()) +async def list_computations_latest_iteration_page( + app: FastAPI, + *, + product_name: ProductName, + user_id: UserID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationRunRpcGetPage: + comp_runs_repo = get_repository_instance(app, CompRunsRepository) + total, comp_runs = await comp_runs_repo.list_for_user__only_latest_iterations( + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + return ComputationRunRpcGetPage( + items=comp_runs, + total=total, + ) + + +@router.expose(reraise_if_error_type=()) +async def list_computations_latest_iteration_tasks_page( + app: FastAPI, + *, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationTaskRpcGetPage: + assert product_name # nosec NOTE: Whether project_id belong to the product_name was checked in the webserver + assert user_id # nosec NOTE: Whether user_id has access to the project was checked in the webserver + + comp_tasks_repo = get_repository_instance(app, CompTasksRepository) + total, comp_runs = ( + await comp_tasks_repo.list_computational_tasks_for_frontend_client( + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + ) + return ComputationTaskRpcGetPage( + items=comp_runs, + total=total, + ) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py new file mode 100644 index 000000000000..ad6bdba28c75 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py @@ -0,0 +1,34 @@ +import logging + +from fastapi import FastAPI +from models_library.api_schemas_directorv2 import ( + DIRECTOR_V2_RPC_NAMESPACE, +) +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RPCRouter + +from ...modules.rabbitmq import get_rabbitmq_rpc_server +from . import ( + _computations, +) + +_logger = logging.getLogger(__name__) + + +ROUTERS: list[RPCRouter] = [ + _computations.router, +] + + +def setup_rpc_api_routes(app: FastAPI) -> None: + async def startup() -> None: + with log_context( + _logger, + logging.INFO, + msg="Director-v2 startup RPC API Routes", + ): + rpc_server = get_rabbitmq_rpc_server(app) + for router in ROUTERS: + await rpc_server.register_router(router, DIRECTOR_V2_RPC_NAMESPACE, app) + + app.add_event_handler("startup", startup) diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index c53c841183e4..17835676cadd 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -17,6 +17,7 @@ make_http_error_handler_for_exception, ) from ..api.errors.validation_error import http422_error_handler +from ..api.rpc.routes import setup_rpc_api_routes from ..modules import ( catalog, comp_scheduler, @@ -183,6 +184,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: ) if dynamic_scheduler_enabled or computational_backend_enabled: rabbitmq.setup(app) + setup_rpc_api_routes(app) # Requires rabbitmq to be setup first redis.setup(app) if dynamic_scheduler_enabled: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py index 509df01c27fc..75e7a57aa7ed 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py @@ -119,14 +119,14 @@ async def _get_pipeline_dag(project_id: ProjectID, db_engine: Engine) -> nx.DiGr async def schedule_all_pipelines(app: FastAPI) -> None: with log_context(_logger, logging.DEBUG, msg="scheduling pipelines"): db_engine = get_db_engine(app) - runs_to_schedule = await CompRunsRepository.instance(db_engine).list( + runs_to_schedule = await CompRunsRepository.instance(db_engine).list_( filter_by_state=SCHEDULED_STATES, never_scheduled=True, processed_since=SCHEDULER_INTERVAL, ) possibly_lost_scheduled_pipelines = await CompRunsRepository.instance( db_engine - ).list( + ).list_( filter_by_state=SCHEDULED_STATES, scheduled_since=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index a5dc296aa21e..9f494997557d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -1,12 +1,15 @@ import datetime import logging -from typing import Any, Final +from typing import Any, Final, cast import arrow import sqlalchemy as sa from aiopg.sa.result import RowProxy +from models_library.api_schemas_directorv2.comp_runs import ComputationRunRpcGet +from models_library.basic_types import IDStr from models_library.projects import ProjectID from models_library.projects_state import RunningState +from models_library.rest_ordering import OrderBy, OrderDirection from models_library.users import UserID from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import PositiveInt @@ -67,7 +70,7 @@ async def get( raise ComputationalRunNotFoundError return CompRunsAtDB.model_validate(row) - async def list( + async def list_( self, *, filter_by_state: set[RunningState] | None = None, @@ -139,6 +142,85 @@ async def list( ) ] + async def list_for_user__only_latest_iterations( + self, + *, + product_name: str, + user_id: UserID, + # pagination + offset: int, + limit: int, + # ordering + order_by: OrderBy | None = None, + ) -> tuple[int, list[ComputationRunRpcGet]]: + # NOTE: Currently, we list only pipelines created by the user themselves. + # If we want to list all pipelines that the user has read access to + # via project access rights, we need to join the `projects_to_groups` + # and `workspaces_access_rights` tables (which will make it slower, but we do + # the same for listing projects). + if order_by is None: + order_by = OrderBy(field=IDStr("run_id")) # default ordering + + base_select_query = sa.select( + comp_runs.c.project_uuid, + comp_runs.c.iteration, + comp_runs.c.result.label("state"), + comp_runs.c.metadata.label("info"), + comp_runs.c.created.label("submitted_at"), + comp_runs.c.started.label("started_at"), + comp_runs.c.ended.label("ended_at"), + ).select_from( + sa.select( + comp_runs.c.project_uuid, + sa.func.max(comp_runs.c.iteration).label( + "latest_iteration" + ), # <-- NOTE: We might create a boolean column with latest iteration for fast retrieval + ) + .where( + (comp_runs.c.user_id == user_id) + & ( + comp_runs.c.metadata["product_name"].astext == product_name + ) # <-- NOTE: We might create a separate column for this for fast retrieval + ) + .group_by(comp_runs.c.project_uuid) + .subquery("latest_runs") + .join( + comp_runs, + sa.and_( + comp_runs.c.project_uuid + == literal_column("latest_runs.project_uuid"), + comp_runs.c.iteration + == literal_column("latest_runs.latest_iteration"), + ), + ) + ) + + # Select total count from base_query + count_query = sa.select(sa.func.count()).select_from( + base_select_query.subquery() + ) + + # Ordering and pagination + if order_by.direction == OrderDirection.ASC: + list_query = base_select_query.order_by( + sa.asc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id + ) + else: + list_query = base_select_query.order_by( + desc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id + ) + list_query = list_query.offset(offset).limit(limit) + + async with self.db_engine.acquire() as conn: + total_count = await conn.scalar(count_query) + + result = await conn.execute(list_query) + items: list[ComputationRunRpcGet] = [ + ComputationRunRpcGet.model_validate(row) async for row in result + ] + + return cast(int, total_count), items + async def create( self, *, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index 5068e144944a..4fbb903aceda 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -1,15 +1,17 @@ import logging from datetime import datetime -from typing import Any +from typing import Any, cast import arrow import sqlalchemy as sa from aiopg.sa.result import ResultProxy, RowProxy +from models_library.api_schemas_directorv2.comp_runs import ComputationTaskRpcGet from models_library.basic_types import IDStr from models_library.errors import ErrorDict from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState +from models_library.rest_ordering import OrderBy, OrderDirection from models_library.users import UserID from models_library.wallets import WalletInfo from servicelib.logging_utils import log_context @@ -75,6 +77,62 @@ async def list_computational_tasks( tasks.append(task_db) return tasks + async def list_computational_tasks_for_frontend_client( + self, + *, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, + ) -> tuple[int, list[ComputationTaskRpcGet]]: + if order_by is None: + order_by = OrderBy(field=IDStr("task_id")) # default ordering + + base_select_query = ( + sa.select( + comp_tasks.c.project_id, + comp_tasks.c.node_id, + comp_tasks.c.state, + comp_tasks.c.progress, + comp_tasks.c.image, + comp_tasks.c.start.label("started_at"), + comp_tasks.c.end.label("ended_at"), + ) + .select_from(comp_tasks) + .where( + (comp_tasks.c.project_id == f"{project_id}") + & (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL) + ) + ) + + # Select total count from base_query + count_query = sa.select(sa.func.count()).select_from( + base_select_query.subquery() + ) + + # Ordering and pagination + if order_by.direction == OrderDirection.ASC: + list_query = base_select_query.order_by( + sa.asc(getattr(comp_tasks.c, order_by.field)), comp_tasks.c.task_id + ) + else: + list_query = base_select_query.order_by( + sa.desc(getattr(comp_tasks.c, order_by.field)), comp_tasks.c.task_id + ) + list_query = list_query.offset(offset).limit(limit) + + async with self.db_engine.acquire() as conn: + total_count = await conn.scalar(count_query) + + result = await conn.execute(list_query) + items: list[ComputationTaskRpcGet] = [ + ComputationTaskRpcGet.model_validate(row) async for row in result + ] + + return cast(int, total_count), items + async def task_exists(self, project_id: ProjectID, node_id: NodeID) -> bool: async with self.db_engine.acquire() as conn: nid: str | None = await conn.scalar( @@ -99,18 +157,18 @@ async def upsert_tasks_from_project( ) -> list[CompTaskAtDB]: # NOTE: really do an upsert here because of issue https://github.com/ITISFoundation/osparc-simcore/issues/2125 async with self.db_engine.acquire() as conn: - list_of_comp_tasks_in_project: list[ - CompTaskAtDB - ] = await _utils.generate_tasks_list_from_project( - project=project, - catalog_client=catalog_client, - published_nodes=published_nodes, - user_id=user_id, - product_name=product_name, - connection=conn, - rut_client=rut_client, - wallet_info=wallet_info, - rabbitmq_rpc_client=rabbitmq_rpc_client, + list_of_comp_tasks_in_project: list[CompTaskAtDB] = ( + await _utils.generate_tasks_list_from_project( + project=project, + catalog_client=catalog_client, + published_nodes=published_nodes, + user_id=user_id, + product_name=product_name, + connection=conn, + rut_client=rut_client, + wallet_info=wallet_info, + rabbitmq_rpc_client=rabbitmq_rpc_client, + ) ) # get current tasks result = await conn.execute( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py index a7cb4e1ba27a..86f6e7037912 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py @@ -58,7 +58,10 @@ async def on_startup() -> None: client_name="director-v2", settings=settings ) app.state.rabbitmq_rpc_client = await RabbitMQRPCClient.create( - client_name="director-v2", settings=settings + client_name="director-v2-rpc-client", settings=settings + ) + app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( + client_name="director-v2-rpc-server", settings=settings ) await app.state.rabbitmq_client.subscribe( @@ -73,6 +76,8 @@ async def on_shutdown() -> None: await app.state.rabbitmq_client.close() if app.state.rabbitmq_rpc_client: await app.state.rabbitmq_rpc_client.close() + if app.state.rabbitmq_rpc_server: + await app.state.rabbitmq_rpc_server.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -92,3 +97,8 @@ def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient: ) raise ConfigurationError(msg=msg) return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_client) + + +def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient: + assert app.state.rabbitmq_rpc_server # nosec + return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py index 1dea4f59cbef..b369490a75e4 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py @@ -79,10 +79,10 @@ async def test_list( run_metadata: RunMetadataDict, faker: Faker, ): - assert await CompRunsRepository(aiopg_engine).list() == [] + assert await CompRunsRepository(aiopg_engine).list_() == [] published_project = await publish_project() - assert await CompRunsRepository(aiopg_engine).list() == [] + assert await CompRunsRepository(aiopg_engine).list_() == [] created = await CompRunsRepository(aiopg_engine).create( user_id=published_project.user["id"], @@ -91,7 +91,7 @@ async def test_list( metadata=run_metadata, use_on_demand_clusters=faker.pybool(), ) - assert await CompRunsRepository(aiopg_engine).list() == [created] + assert await CompRunsRepository(aiopg_engine).list_() == [created] created = [created] + await asyncio.gather( *( @@ -106,7 +106,7 @@ async def test_list( ) ) assert sorted( - await CompRunsRepository(aiopg_engine).list(), key=lambda x: x.iteration + await CompRunsRepository(aiopg_engine).list_(), key=lambda x: x.iteration ) == sorted(created, key=lambda x: x.iteration) # test with filter of state @@ -114,13 +114,13 @@ async def test_list( s for s in RunningState if s is not RunningState.PUBLISHED } assert ( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( filter_by_state=any_state_but_published ) == [] ) assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( filter_by_state={RunningState.PUBLISHED} ), key=lambda x: x.iteration, @@ -128,7 +128,7 @@ async def test_list( # test with never scheduled filter, let's create a bunch of scheduled entries, assert sorted( - await CompRunsRepository(aiopg_engine).list(never_scheduled=True), + await CompRunsRepository(aiopg_engine).list_(never_scheduled=True), key=lambda x: x.iteration, ) == sorted(created, key=lambda x: x.iteration) comp_runs_marked_for_scheduling = random.sample(created, k=25) @@ -145,7 +145,7 @@ async def test_list( # filter them away created = [r for r in created if r not in comp_runs_marked_for_scheduling] assert sorted( - await CompRunsRepository(aiopg_engine).list(never_scheduled=True), + await CompRunsRepository(aiopg_engine).list_(never_scheduled=True), key=lambda x: x.iteration, ) == sorted(created, key=lambda x: x.iteration) @@ -170,7 +170,7 @@ async def test_list( # since they were just marked as processed now, we will get nothing assert ( sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, processed_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, @@ -200,7 +200,7 @@ async def test_list( ) # now we should get them assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, processed_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, @@ -233,14 +233,14 @@ async def test_list( ) # so the processed ones shall remain assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, processed_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, ) == sorted(comp_runs_marked_as_processed, key=lambda x: x.iteration) # the ones waiting for scheduling now assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, scheduled_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 0e11c080bb71..1190403211c6 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2536,6 +2536,91 @@ paths: responses: '204': description: Successful Response + /v0/computations/-/iterations/latest: + get: + tags: + - computations + - projects + summary: List Computations Latest Iteration + description: Lists the latest iteration of computations + operationId: list_computations_latest_iteration + parameters: + - name: order_by + in: query + required: false + schema: + type: string + contentMediaType: application/json + contentSchema: {} + default: '{"field":"created","direction":"asc"}' + title: Order By + - name: limit + in: query + required: false + schema: + type: integer + default: 20 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_list_ComputationGet__' + /v0/computations/{project_id}/iterations/latest/tasks: + get: + tags: + - computations + - projects + summary: List Computations Latest Iteration Tasks + description: Lists the latest iteration tasks for a computation + operationId: list_computations_latest_iteration_tasks + parameters: + - name: project_id + in: path + required: true + schema: + type: string + format: uuid + title: Project Id + - name: order_by + in: query + required: false + schema: + type: string + contentMediaType: application/json + contentSchema: {} + default: '{"field":"start","direction":"asc"}' + title: Order By + - name: limit + in: query + required: false + schema: + type: integer + default: 20 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_list_ComputationGet__' /v0/projects/{project_id}:xport: post: tags: @@ -9955,6 +10040,22 @@ components: title: Error type: object title: Envelope[list[ApiKeyGet]] + Envelope_list_ComputationGet__: + properties: + data: + anyOf: + - items: + $ref: '#/components/schemas/ComputationGet' + type: array + - type: 'null' + title: Data + error: + anyOf: + - {} + - type: 'null' + title: Error + type: object + title: Envelope[list[ComputationGet]] Envelope_list_DatasetMetaData__: properties: data: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py new file mode 100644 index 000000000000..a3a9a93e67ce --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py @@ -0,0 +1,107 @@ +import logging + +from aiohttp import web +from models_library.api_schemas_webserver.computations import ( + ComputationRunRestGet, + ComputationRunRestGetPage, + ComputationTaskRestGet, + ComputationTaskRestGetPage, +) +from models_library.rest_base import RequestParameters +from models_library.users import UserID +from pydantic import Field +from servicelib.aiohttp.requests_validation import ( + parse_request_path_parameters_as, + parse_request_query_parameters_as, +) +from servicelib.request_keys import RQT_USERID_KEY + +from .._meta import API_VTAG as VTAG +from ..constants import RQ_PRODUCT_KEY +from ..login.decorators import login_required +from ..security.decorators import permission_required +from ..utils_aiohttp import envelope_json_response +from . import _computations_service +from ._computations_rest_schema import ( + ComputationRunListQueryParams, + ComputationTaskListQueryParams, + ComputationTaskPathParams, +) + +_logger = logging.getLogger(__name__) + + +routes = web.RouteTableDef() + + +class ComputationsRequestContext(RequestParameters): + user_id: UserID = Field(..., alias=RQT_USERID_KEY) # type: ignore[literal-required] + product_name: str = Field(..., alias=RQ_PRODUCT_KEY) # type: ignore[literal-required] + + +@routes.get( + f"/{VTAG}/computations/-/iterations/latest", + name="list_computations_latest_iteration", +) +@login_required +@permission_required("services.pipeline.*") +@permission_required("project.read") +async def list_computations_latest_iteration(request: web.Request) -> web.Response: + + req_ctx = ComputationsRequestContext.model_validate(request) + query_params: ComputationRunListQueryParams = parse_request_query_parameters_as( + ComputationRunListQueryParams, request + ) + + _get = await _computations_service.list_computations_latest_iteration( + request.app, + product_name=req_ctx.product_name, + user_id=req_ctx.user_id, + # pagination + offset=query_params.offset, + limit=query_params.limit, + # ordering + order_by=query_params.order_by, + ) + _output = ComputationRunRestGetPage( + items=[ComputationRunRestGet(**task.dict()) for task in _get.items], + total=_get.total, + ) + + return envelope_json_response(_output) + + +@routes.get( + f"/{VTAG}/computations/{{project_id}}/iterations/latest/tasks", + name="list_computations_latest_iteration_tasks", +) +@login_required +@permission_required("services.pipeline.*") +@permission_required("project.read") +async def list_computations_latest_iteration_tasks( + request: web.Request, +) -> web.Response: + + req_ctx = ComputationsRequestContext.model_validate(request) + query_params: ComputationTaskListQueryParams = parse_request_query_parameters_as( + ComputationTaskListQueryParams, request + ) + path_params = parse_request_path_parameters_as(ComputationTaskPathParams, request) + + _get = await _computations_service.list_computations_latest_iteration_tasks( + request.app, + product_name=req_ctx.product_name, + user_id=req_ctx.user_id, + project_id=path_params.project_id, + # pagination + offset=query_params.offset, + limit=query_params.limit, + # ordering + order_by=query_params.order_by, + ) + _output = ComputationTaskRestGetPage( + items=[ComputationTaskRestGet(**task.dict()) for task in _get.items], + total=_get.total, + ) + + return envelope_json_response(_output) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py new file mode 100644 index 000000000000..42e34ea3474b --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py @@ -0,0 +1,45 @@ +from models_library.basic_types import IDStr +from models_library.projects import ProjectID +from models_library.rest_ordering import OrderBy, create_ordering_query_model_class +from models_library.rest_pagination import PageQueryParameters +from pydantic import BaseModel, ConfigDict + +### Computation Run + + +ComputationRunListOrderParams = create_ordering_query_model_class( + ordering_fields={ + "submitted_at", + }, + default=OrderBy(field=IDStr("submitted_at")), + ordering_fields_api_to_column_map={"submitted_at": "created"}, +) + + +class ComputationRunListQueryParams( + PageQueryParameters, + ComputationRunListOrderParams, +): ... + + +### Computation Task + + +class ComputationTaskPathParams(BaseModel): + project_id: ProjectID + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + +ComputationTaskListOrderParams = create_ordering_query_model_class( + ordering_fields={ + "started_at", + }, + default=OrderBy(field=IDStr("started_at")), + ordering_fields_api_to_column_map={"started_at": "start"}, +) + + +class ComputationTaskListQueryParams( + PageQueryParameters, + ComputationTaskListOrderParams, +): ... diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py new file mode 100644 index 000000000000..1e5d035f83c5 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -0,0 +1,65 @@ +from aiohttp import web +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from pydantic import NonNegativeInt +from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations + +from ..projects.api import check_user_project_permission +from ..rabbitmq import get_rabbitmq_rpc_client + + +async def list_computations_latest_iteration( + app: web.Application, + product_name: ProductName, + user_id: UserID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> ComputationRunRpcGetPage: + """Returns the list of computations (only latest iterations)""" + rpc_client = get_rabbitmq_rpc_client(app) + return await computations.list_computations_latest_iteration_page( + rpc_client, + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + + +async def list_computations_latest_iteration_tasks( + app: web.Application, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> ComputationTaskRpcGetPage: + """Returns the list of tasks for the latest iteration of a computation""" + + await check_user_project_permission( + app, project_id=project_id, user_id=user_id, product_name=product_name + ) + + rpc_client = get_rabbitmq_rpc_client(app) + return await computations.list_computations_latest_iteration_tasks_page( + rpc_client, + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py index fbd6448036d5..c9d5e9f13459 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py @@ -27,9 +27,9 @@ from ...products import products_web from ...security.decorators import permission_required from ...utils_aiohttp import envelope_json_response -from .. import _service +from .. import _director_v2_service from .._client import DirectorV2RestClient -from .._service_abc import get_project_run_policy +from .._director_v2_abc_service import get_project_run_policy from ._rest_exceptions import handle_rest_requests_exceptions _logger = logging.getLogger(__name__) @@ -58,13 +58,13 @@ async def start_computation(request: web.Request) -> web.Response: force_restart = body_params.force_restart # Group properties - group_properties = await _service.get_group_properties( + group_properties = await _director_v2_service.get_group_properties( request.app, product_name=req_ctx.product_name, user_id=req_ctx.user_id ) # Get wallet information product = products_web.get_current_product(request) - wallet_info = await _service.get_wallet_info( + wallet_info = await _director_v2_service.get_wallet_info( request.app, product=product, user_id=req_ctx.user_id, diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_service_abc_default.py b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_default_service.py similarity index 94% rename from services/web/server/src/simcore_service_webserver/director_v2/_service_abc_default.py rename to services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_default_service.py index 2a2cab073098..88e340fc3437 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_service_abc_default.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_default_service.py @@ -9,7 +9,7 @@ from aiohttp import web from models_library.projects import CommitID, ProjectID -from ._service_abc import AbstractProjectRunPolicy +from ._director_v2_abc_service import AbstractProjectRunPolicy log = logging.getLogger(__name__) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_service_abc.py b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_service.py similarity index 100% rename from services/web/server/src/simcore_service_webserver/director_v2/_service_abc.py rename to services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_service.py diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_service.py similarity index 100% rename from services/web/server/src/simcore_service_webserver/director_v2/_service.py rename to services/web/server/src/simcore_service_webserver/director_v2/_director_v2_service.py diff --git a/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py b/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py index 89a42fe0fa27..b750700e5ee8 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py @@ -1,5 +1,10 @@ from ._client import is_healthy -from ._service import ( +from ._director_v2_abc_service import ( + AbstractProjectRunPolicy, + get_project_run_policy, + set_project_run_policy, +) +from ._director_v2_service import ( create_or_update_pipeline, delete_pipeline, get_batch_tasks_outputs, @@ -7,11 +12,6 @@ is_pipeline_running, stop_pipeline, ) -from ._service_abc import ( - AbstractProjectRunPolicy, - get_project_run_policy, - set_project_run_policy, -) from .exceptions import DirectorServiceError # director-v2 module internal API diff --git a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py index 08df74ee1d84..1566c5b3dbcb 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py @@ -11,8 +11,8 @@ from ..rest.plugin import setup_rest from . import _controller from ._client import DirectorV2RestClient, get_directorv2_client, set_directorv2_client -from ._service_abc import set_project_run_policy -from ._service_abc_default import DefaultProjectRunPolicy +from ._director_v2_abc_default_service import DefaultProjectRunPolicy +from ._director_v2_abc_service import set_project_run_policy _logger = logging.getLogger(__name__) diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py index 0206e1315cc8..30bfc2a018da 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py @@ -41,7 +41,7 @@ def get_service_start_lock_key(user_id: UserID, project_uuid: ProjectID) -> str: - return f"lock_service_start_limit.{user_id}.{project_uuid}" + return f"lock_service_start_limit.usr_{user_id}.prj_{project_uuid}" def check_num_service_per_projects_limit( diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index f079b8fb5f67..5c87fa5f4d60 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -640,7 +640,7 @@ async def _start_dynamic_service( # noqa: C901 ) ), ) - async def _() -> None: + async def _run() -> None: project_running_nodes = await dynamic_scheduler_service.list_dynamic_services( request.app, user_id=user_id, project_id=project_uuid ) @@ -788,7 +788,7 @@ async def _() -> None: ), ) - await _() + await _run() async def add_project_node( From d47f6a6f54deaee7cb0d96064c3fa5e4d2215306 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 16 Apr 2025 15:14:06 +0200 Subject: [PATCH 02/19] revert --- .../src/simcore_service_webserver/projects/_nodes_service.py | 2 +- .../simcore_service_webserver/projects/_projects_service.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py index 30bfc2a018da..0206e1315cc8 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_service.py @@ -41,7 +41,7 @@ def get_service_start_lock_key(user_id: UserID, project_uuid: ProjectID) -> str: - return f"lock_service_start_limit.usr_{user_id}.prj_{project_uuid}" + return f"lock_service_start_limit.{user_id}.{project_uuid}" def check_num_service_per_projects_limit( diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index 5c87fa5f4d60..f079b8fb5f67 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -640,7 +640,7 @@ async def _start_dynamic_service( # noqa: C901 ) ), ) - async def _run() -> None: + async def _() -> None: project_running_nodes = await dynamic_scheduler_service.list_dynamic_services( request.app, user_id=user_id, project_id=project_uuid ) @@ -788,7 +788,7 @@ async def _run() -> None: ), ) - await _run() + await _() async def add_project_node( From 756b22ae641069c9a36a898ff3916b0f0ce77ce6 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 16 Apr 2025 15:37:39 +0200 Subject: [PATCH 03/19] fix test in director-v2 --- services/director-v2/tests/conftest.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index 6337e7eea884..fb8af9b7e8f1 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -278,11 +278,19 @@ def fake_workbench_complete_adjacency( @pytest.fixture def disable_rabbitmq(mocker: MockerFixture) -> None: - def mock_setup(app: FastAPI) -> None: + def rabbitmq_mock_setup(app: FastAPI) -> None: app.state.rabbitmq_client = AsyncMock() + def rpc_api_routes_mock_setup(app: FastAPI) -> None: + app.state.rabbitmq_rpc_server = AsyncMock() + + mocker.patch( + "simcore_service_director_v2.modules.rabbitmq.setup", + side_effect=rabbitmq_mock_setup, + ) mocker.patch( - "simcore_service_director_v2.modules.rabbitmq.setup", side_effect=mock_setup + "simcore_service_director_v2.core.application.setup_rpc_api_routes", + side_effect=rpc_api_routes_mock_setup, ) From 8b5129682d55fc3f7f4e09a0599b5e15338a448b Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 16 Apr 2025 15:39:54 +0200 Subject: [PATCH 04/19] fix --- .../director_v2/_computations_rest_schema.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py index 42e34ea3474b..9ef9c613a04b 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py @@ -18,7 +18,7 @@ class ComputationRunListQueryParams( PageQueryParameters, - ComputationRunListOrderParams, + ComputationRunListOrderParams, # type: ignore[misc, valid-type] ): ... @@ -41,5 +41,5 @@ class ComputationTaskPathParams(BaseModel): class ComputationTaskListQueryParams( PageQueryParameters, - ComputationTaskListOrderParams, + ComputationTaskListOrderParams, # type: ignore[misc, valid-type] ): ... From b13813c478adecfa4fbd627ac3e8664d3dbcd9ee Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 16 Apr 2025 15:48:55 +0200 Subject: [PATCH 05/19] fix --- .../server/src/simcore_service_webserver/director_v2/plugin.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py index 1566c5b3dbcb..119446b637d1 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py @@ -9,7 +9,7 @@ ) from ..rest.plugin import setup_rest -from . import _controller +from . import _computations_rest, _controller from ._client import DirectorV2RestClient, get_directorv2_client, set_directorv2_client from ._director_v2_abc_default_service import DefaultProjectRunPolicy from ._director_v2_abc_service import set_project_run_policy @@ -38,6 +38,7 @@ def setup_director_v2(app: web.Application): if is_setup_completed(setup_rest.metadata()["module_name"], app): set_project_run_policy(app, DefaultProjectRunPolicy()) app.router.add_routes(_controller.rest.routes) + app.router.add_routes(_computations_rest.routes) else: _logger.warning( From d7ece0787bad60a11e4ca945b00c5a26364ec97a Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 16 Apr 2025 17:02:59 +0200 Subject: [PATCH 06/19] adding tests: --- services/director-v2/tests/conftest.py | 13 ++- .../test_api_rpc_computations.py | 92 +++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index fb8af9b7e8f1..01790674d091 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -7,7 +7,7 @@ import json import logging import os -from collections.abc import AsyncIterable, AsyncIterator +from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable from copy import deepcopy from datetime import timedelta from pathlib import Path @@ -30,6 +30,8 @@ setenvs_from_envfile, ) from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.rabbitmq import RabbitMQRPCClient +from settings_library.rabbit import RabbitSettings from simcore_service_director_v2.core.application import init_app from simcore_service_director_v2.core.settings import AppSettings from starlette.testclient import ASGI3App, TestClient @@ -226,6 +228,15 @@ async def async_client(initialized_app: FastAPI) -> AsyncIterable[httpx.AsyncCli yield client +@pytest.fixture +async def rpc_client( + rabbit_service: RabbitSettings, + initialized_app: FastAPI, + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + @pytest.fixture() def minimal_app(client: TestClient) -> ASGI3App: # NOTICE that this app triggers events diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py new file mode 100644 index 000000000000..8355bbc628ea --- /dev/null +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -0,0 +1,92 @@ +# pylint: disable=no-value-for-parameter +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-positional-arguments + +from collections.abc import Awaitable, Callable +from typing import Any + +from models_library.api_schemas_directorv2.comp_runs import ComputationRunRpcGetPage +from models_library.projects import ProjectAtDB +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.director_v2 import ( + computations as rpc_computations, +) +from simcore_postgres_database.models.comp_pipeline import StateType +from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB +from simcore_service_director_v2.models.comp_runs import CompRunsAtDB +from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB + +pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"] +pytest_simcore_ops_services_selection = [ + "adminer", +] + + +# @pytest.fixture() +# def minimal_configuration( +# mock_env: EnvVarsDict, +# postgres_host_config: dict[str, str], +# rabbit_service: RabbitSettings, +# redis_service: RedisSettings, +# monkeypatch: pytest.MonkeyPatch, +# faker: Faker, +# with_disabled_auto_scheduling: mock.Mock, +# with_disabled_scheduler_publisher: mock.Mock, +# ): +# monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false") +# monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1") +# monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "1") +# monkeypatch.setenv("R_CLONE_PROVIDER", "MINIO") +# monkeypatch.setenv("S3_ENDPOINT", faker.url()) +# monkeypatch.setenv("S3_ACCESS_KEY", faker.pystr()) +# monkeypatch.setenv("S3_REGION", faker.pystr()) +# monkeypatch.setenv("S3_SECRET_KEY", faker.pystr()) +# monkeypatch.setenv("S3_BUCKET_NAME", faker.pystr()) + + +async def test_get_computation_from_published_computation_task( + # minimal_configuration: None, + fake_workbench_without_outputs: dict[str, Any], + fake_workbench_adjacency: dict[str, Any], + registered_user: Callable[..., dict[str, Any]], + project: Callable[..., Awaitable[ProjectAtDB]], + create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], + create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], + create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], + # async_client: httpx.AsyncClient, + rpc_client: RabbitMQRPCClient, +): + user = registered_user() + proj = await project(user, workbench=fake_workbench_without_outputs) + await create_pipeline( + project_id=f"{proj.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ) + comp_tasks = await create_tasks( + user=user, project=proj, state=StateType.PUBLISHED, progress=0 + ) + comp_runs = await create_comp_run( + user=user, project=proj, result=StateType.PUBLISHED + ) + assert comp_runs + + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 1 + assert isinstance(output, ComputationRunRpcGetPage) + + # get_computation_url = httpx.URL( + # f"/v2/computations/{proj.uuid}?user_id={user['id']}" + # ) + # response = await async_client.get(get_computation_url) + # assert response.status_code == status.HTTP_200_OK, response.text + # returned_computation = ComputationGet.model_validate(response.json()) + # assert returned_computation + # expected_stop_url = async_client.base_url.join( + # f"/v2/computations/{proj.uuid}:stop?user_id={user['id']}" + # ) From 5eea03ad77e4708efd8b0a494a8d549a1905248f Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 11:55:56 +0200 Subject: [PATCH 07/19] adding tests --- .../api_schemas_directorv2/comp_runs.py | 11 ++- .../db/repositories/comp_tasks/_core.py | 2 +- .../test_api_rpc_computations.py | 67 ++++++++++++++----- .../tests/unit/with_dbs/conftest.py | 5 +- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py index a237438b8fd0..1682ee3f66a5 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py @@ -1,8 +1,9 @@ from datetime import datetime -from typing import Any, NamedTuple +from typing import Annotated, Any, NamedTuple from pydantic import ( BaseModel, + BeforeValidator, PositiveInt, ) @@ -26,11 +27,17 @@ class ComputationRunRpcGetPage(NamedTuple): total: PositiveInt +def _none_to_zero_float_pre_validator(value: Any): + if value is None: + return 0.0 + return value + + class ComputationTaskRpcGet(BaseModel): project_uuid: ProjectID node_id: NodeID state: RunningState - progress: float + progress: Annotated[float, BeforeValidator(_none_to_zero_float_pre_validator)] image: dict[str, Any] started_at: datetime | None ended_at: datetime | None diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index 4fbb903aceda..903348ca09ea 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -92,7 +92,7 @@ async def list_computational_tasks_for_frontend_client( base_select_query = ( sa.select( - comp_tasks.c.project_id, + comp_tasks.c.project_id.label("project_uuid"), comp_tasks.c.node_id, comp_tasks.c.state, comp_tasks.c.progress, diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py index 8355bbc628ea..8e395916a822 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -7,10 +7,15 @@ # pylint: disable=too-many-positional-arguments from collections.abc import Awaitable, Callable +from datetime import datetime, timezone from typing import Any -from models_library.api_schemas_directorv2.comp_runs import ComputationRunRpcGetPage +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) from models_library.projects import ProjectAtDB +from models_library.projects_state import RunningState from servicelib.rabbitmq import RabbitMQRPCClient from servicelib.rabbitmq.rpc_interfaces.director_v2 import ( computations as rpc_computations, @@ -48,8 +53,7 @@ # monkeypatch.setenv("S3_BUCKET_NAME", faker.pystr()) -async def test_get_computation_from_published_computation_task( - # minimal_configuration: None, +async def test_rpc_list_computation_runs_and_tasks( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], registered_user: Callable[..., dict[str, Any]], @@ -57,7 +61,6 @@ async def test_get_computation_from_published_computation_task( create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], - # async_client: httpx.AsyncClient, rpc_client: RabbitMQRPCClient, ): user = registered_user() @@ -67,10 +70,10 @@ async def test_get_computation_from_published_computation_task( dag_adjacency_list=fake_workbench_adjacency, ) comp_tasks = await create_tasks( - user=user, project=proj, state=StateType.PUBLISHED, progress=0 + user=user, project=proj, state=StateType.PUBLISHED, progress=None ) comp_runs = await create_comp_run( - user=user, project=proj, result=StateType.PUBLISHED + user=user, project=proj, result=RunningState.PUBLISHED ) assert comp_runs @@ -79,14 +82,46 @@ async def test_get_computation_from_published_computation_task( ) assert output.total == 1 assert isinstance(output, ComputationRunRpcGetPage) + assert output.items[0].iteration == 1 - # get_computation_url = httpx.URL( - # f"/v2/computations/{proj.uuid}?user_id={user['id']}" - # ) - # response = await async_client.get(get_computation_url) - # assert response.status_code == status.HTTP_200_OK, response.text - # returned_computation = ComputationGet.model_validate(response.json()) - # assert returned_computation - # expected_stop_url = async_client.base_url.join( - # f"/v2/computations/{proj.uuid}:stop?user_id={user['id']}" - # ) + comp_runs_2 = await create_comp_run( + user=user, + project=proj, + result=RunningState.PENDING, + started=datetime.now(tz=timezone.utc), + iteration=2, + ) + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 1 + assert isinstance(output, ComputationRunRpcGetPage) + assert output.items[0].iteration == 2 + assert output.items[0].started_at is not None + assert output.items[0].ended_at is None + + comp_runs_3 = await create_comp_run( + user=user, + project=proj, + result=RunningState.SUCCESS, + started=datetime.now(tz=timezone.utc), + ended=datetime.now(tz=timezone.utc), + iteration=3, + ) + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 1 + assert isinstance(output, ComputationRunRpcGetPage) + assert output.items[0].iteration == 3 + assert output.items[0].ended_at is not None + + # Tasks + + output = await rpc_computations.list_computations_latest_iteration_tasks_page( + rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid + ) + assert output + assert output.total == 4 + assert isinstance(output, ComputationTaskRpcGetPage) + assert len(output.items) == 4 diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 9b0d03e6eed5..cbed218aa13b 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -18,6 +18,7 @@ from fastapi.encoders import jsonable_encoder from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID +from models_library.projects_state import RunningState from pydantic.main import BaseModel from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline from simcore_postgres_database.models.comp_runs import comp_runs @@ -193,7 +194,7 @@ async def _( "project_uuid": f"{project.uuid}", "user_id": user["id"], "iteration": 1, - "result": StateType.NOT_STARTED, + "result": RunningState.NOT_STARTED, "metadata": jsonable_encoder(run_metadata), "use_on_demand_clusters": False, } @@ -247,7 +248,7 @@ async def _() -> PublishedProject: @pytest.fixture async def published_project( - publish_project: Callable[[], Awaitable[PublishedProject]] + publish_project: Callable[[], Awaitable[PublishedProject]], ) -> PublishedProject: return await publish_project() From 840462a8dd322e2a9982714676c1ee35fe1b4a6c Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 14:06:33 +0200 Subject: [PATCH 08/19] adding tests to webserver --- .../api_schemas_directorv2/comp_runs.py | 55 ++++++++++++ .../director_v2/_computations_rest.py | 43 +++++++--- .../server/tests/unit/with_dbs/01/conftest.py | 27 +++++- .../with_dbs/01/test_director_v2_handlers.py | 83 +++++++++++++++++++ 4 files changed, 194 insertions(+), 14 deletions(-) diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py index 1682ee3f66a5..005c9c95412d 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py @@ -4,6 +4,7 @@ from pydantic import ( BaseModel, BeforeValidator, + ConfigDict, PositiveInt, ) @@ -21,6 +22,40 @@ class ComputationRunRpcGet(BaseModel): started_at: datetime | None ended_at: datetime | None + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "project_uuid": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "iteration": 1, + "state": "SUCCESS", + "info": { + "wallet_id": 9866, + "user_email": "test@example.net", + "wallet_name": "test", + "product_name": "osparc", + "project_name": "test", + "project_metadata": { + "parent_node_id": "12e0c8b2-bad6-40fb-9948-8dec4f65d4d9", + "parent_node_name": "UJyfwFVYySnPCaLuQIaz", + "parent_project_id": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "parent_project_name": "qTjDmYPxeqAWfCKCQCYF", + "root_parent_node_id": "37176e84-d977-4993-bc49-d76fcfc6e625", + "root_parent_node_name": "UEXExIZVPeFzGRmMglPr", + "root_parent_project_id": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "root_parent_project_name": "FuDpjjFIyeNTWRUWCuKo", + }, + "node_id_names_map": {}, + "simcore_user_agent": "agent", + }, + "submitted_at": "2023-01-11 13:11:47.293595", + "started_at": "2023-01-11 13:11:47.293595", + "ended_at": "2023-01-11 13:11:47.293595", + } + ] + } + ) + class ComputationRunRpcGetPage(NamedTuple): items: list[ComputationRunRpcGet] @@ -42,6 +77,26 @@ class ComputationTaskRpcGet(BaseModel): started_at: datetime | None ended_at: datetime | None + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "project_uuid": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "node_id": "12e0c8b2-bad6-40fb-9948-8dec4f65d4d9", + "state": "SUCCESS", + "progress": 0.0, + "image": { + "name": "simcore/services/comp/ti-solutions-optimizer", + "tag": "1.0.19", + "node_requirements": {"CPU": 8.0, "RAM": 25769803776}, + }, + "started_at": "2023-01-11 13:11:47.293595", + "ended_at": "2023-01-11 13:11:47.293595", + } + ] + } + ) + class ComputationTaskRpcGetPage(NamedTuple): items: list[ComputationTaskRpcGet] diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py index a3a9a93e67ce..1f367568239d 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py @@ -3,24 +3,25 @@ from aiohttp import web from models_library.api_schemas_webserver.computations import ( ComputationRunRestGet, - ComputationRunRestGetPage, ComputationTaskRestGet, - ComputationTaskRestGetPage, ) from models_library.rest_base import RequestParameters +from models_library.rest_pagination import Page +from models_library.rest_pagination_utils import paginate_data from models_library.users import UserID from pydantic import Field from servicelib.aiohttp.requests_validation import ( parse_request_path_parameters_as, parse_request_query_parameters_as, ) +from servicelib.mimetype_constants import MIMETYPE_APPLICATION_JSON from servicelib.request_keys import RQT_USERID_KEY +from servicelib.rest_constants import RESPONSE_MODEL_POLICY from .._meta import API_VTAG as VTAG from ..constants import RQ_PRODUCT_KEY from ..login.decorators import login_required from ..security.decorators import permission_required -from ..utils_aiohttp import envelope_json_response from . import _computations_service from ._computations_rest_schema import ( ComputationRunListQueryParams, @@ -63,12 +64,20 @@ async def list_computations_latest_iteration(request: web.Request) -> web.Respon # ordering order_by=query_params.order_by, ) - _output = ComputationRunRestGetPage( - items=[ComputationRunRestGet(**task.dict()) for task in _get.items], - total=_get.total, - ) - return envelope_json_response(_output) + page = Page[ComputationRunRestGet].model_validate( + paginate_data( + chunk=[ComputationRunRestGet(**task.dict()) for task in _get.items], + total=_get.total, + limit=query_params.limit, + offset=query_params.offset, + request_url=request.url, + ) + ) + return web.Response( + text=page.model_dump_json(**RESPONSE_MODEL_POLICY), + content_type=MIMETYPE_APPLICATION_JSON, + ) @routes.get( @@ -99,9 +108,17 @@ async def list_computations_latest_iteration_tasks( # ordering order_by=query_params.order_by, ) - _output = ComputationTaskRestGetPage( - items=[ComputationTaskRestGet(**task.dict()) for task in _get.items], - total=_get.total, - ) - return envelope_json_response(_output) + page = Page[ComputationTaskRestGet].model_validate( + paginate_data( + chunk=[ComputationTaskRestGet(**task.dict()) for task in _get.items], + total=_get.total, + limit=query_params.limit, + offset=query_params.offset, + request_url=request.url, + ) + ) + return web.Response( + text=page.model_dump_json(**RESPONSE_MODEL_POLICY), + content_type=MIMETYPE_APPLICATION_JSON, + ) diff --git a/services/web/server/tests/unit/with_dbs/01/conftest.py b/services/web/server/tests/unit/with_dbs/01/conftest.py index 0d136f206374..67f98989effd 100644 --- a/services/web/server/tests/unit/with_dbs/01/conftest.py +++ b/services/web/server/tests/unit/with_dbs/01/conftest.py @@ -1,9 +1,14 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable - +from collections.abc import AsyncIterator +from pathlib import Path import pytest +from aiohttp.test_utils import TestClient +from pytest_simcore.helpers.webserver_login import UserInfoDict +from pytest_simcore.helpers.webserver_projects import NewProject +from simcore_service_webserver.projects.models import ProjectDict @pytest.fixture @@ -13,3 +18,23 @@ def app_environment( # NOTE: overrides app_environment monkeypatch.setenv("WEBSERVER_GARBAGE_COLLECTOR", "null") return app_environment | {"WEBSERVER_GARBAGE_COLLECTOR": "null"} + + +@pytest.fixture +async def user_project( + client: TestClient, + fake_project: ProjectDict, + logged_user: UserInfoDict, + tests_data_dir: Path, + osparc_product_name: str, +) -> AsyncIterator[ProjectDict]: + async with NewProject( + fake_project, + client.app, + user_id=logged_user["id"], + product_name=osparc_product_name, + tests_data_dir=tests_data_dir, + ) as project: + print("-----> added project", project["name"]) + yield project + print("<----- removed project", project["name"]) diff --git a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py index cb40a779378a..0e51d74ecca4 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py +++ b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py @@ -6,7 +6,18 @@ import pytest from aiohttp.test_utils import TestClient from faker import Faker +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGet, + ComputationRunRpcGetPage, + ComputationTaskRpcGet, + ComputationTaskRpcGetPage, +) +from models_library.api_schemas_webserver.computations import ( + ComputationRunRestGet, + ComputationTaskRestGet, +) from models_library.projects import ProjectID +from pytest_mock import MockerFixture from pytest_simcore.helpers.assert_checks import assert_status from pytest_simcore.helpers.webserver_login import LoggedUser from pytest_simcore.helpers.webserver_parametrizations import ( @@ -16,6 +27,7 @@ from pytest_simcore.services_api_mocks_for_aiohttp_clients import AioResponsesMock from servicelib.aiohttp import status from simcore_service_webserver.db.models import UserRole +from simcore_service_webserver.projects.models import ProjectDict @pytest.fixture @@ -117,3 +129,74 @@ async def test_stop_computation( else expected.no_content ), ) + + +@pytest.fixture +def mock_rpc_list_computations_latest_iteration_tasks( + mocker: MockerFixture, +) -> ComputationRunRpcGetPage: + return mocker.patch( + "simcore_service_webserver.director_v2._computations_service.computations.list_computations_latest_iteration_page", + spec=True, + return_value=ComputationRunRpcGetPage( + items=[ + ComputationRunRpcGet.model_validate( + ComputationRunRpcGet.model_config["json_schema_extra"]["examples"][ + 0 + ] + ) + ], + total=1, + ), + ) + + +@pytest.fixture +def mock_rpc_list_computations_latest_iteration_tasks_page( + mocker: MockerFixture, +) -> ComputationTaskRpcGetPage: + return mocker.patch( + "simcore_service_webserver.director_v2._computations_service.computations.list_computations_latest_iteration_tasks_page", + spec=True, + return_value=ComputationTaskRpcGetPage( + items=[ + ComputationTaskRpcGet.model_validate( + ComputationTaskRpcGet.model_config["json_schema_extra"]["examples"][ + 0 + ] + ) + ], + total=1, + ), + ) + + +@pytest.mark.parametrize(*standard_role_response(), ids=str) +async def test_list_computations_latest_iteration( + director_v2_service_mock: AioResponsesMock, + user_project: ProjectDict, + client: TestClient, + logged_user: LoggedUser, + user_role: UserRole, + expected: ExpectedResponse, + mock_rpc_list_computations_latest_iteration_tasks: None, + mock_rpc_list_computations_latest_iteration_tasks_page: None, +): + assert client.app + url = client.app.router["list_computations_latest_iteration"].url_for() + resp = await client.get(f"{url}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationRunRestGet.model_validate(data[0]) + + url = client.app.router["list_computations_latest_iteration_tasks"].url_for( + project_id=f"{user_project['uuid']}" + ) + resp = await client.get(f"{url}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationTaskRestGet.model_validate(data[0]) From b7394923476f9a2da8b11913519069f6fe80a599 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 14:24:00 +0200 Subject: [PATCH 09/19] remove comment --- .../test_api_rpc_computations.py | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py index 8e395916a822..c2180f22e0bb 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -31,28 +31,6 @@ ] -# @pytest.fixture() -# def minimal_configuration( -# mock_env: EnvVarsDict, -# postgres_host_config: dict[str, str], -# rabbit_service: RabbitSettings, -# redis_service: RedisSettings, -# monkeypatch: pytest.MonkeyPatch, -# faker: Faker, -# with_disabled_auto_scheduling: mock.Mock, -# with_disabled_scheduler_publisher: mock.Mock, -# ): -# monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false") -# monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1") -# monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "1") -# monkeypatch.setenv("R_CLONE_PROVIDER", "MINIO") -# monkeypatch.setenv("S3_ENDPOINT", faker.url()) -# monkeypatch.setenv("S3_ACCESS_KEY", faker.pystr()) -# monkeypatch.setenv("S3_REGION", faker.pystr()) -# monkeypatch.setenv("S3_SECRET_KEY", faker.pystr()) -# monkeypatch.setenv("S3_BUCKET_NAME", faker.pystr()) - - async def test_rpc_list_computation_runs_and_tasks( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], From ee472d8f660f24ae0914b90ed6ac585896308cf2 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 14:37:14 +0200 Subject: [PATCH 10/19] small cleanup --- .../{ => _controller}/_computations_rest.py | 10 +++++----- .../{ => _controller}/_computations_rest_schema.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 7 deletions(-) rename services/web/server/src/simcore_service_webserver/director_v2/{ => _controller}/_computations_rest.py (94%) rename services/web/server/src/simcore_service_webserver/director_v2/{ => _controller}/_computations_rest_schema.py (85%) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest.py similarity index 94% rename from services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py rename to services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest.py index 1f367568239d..47eae927ea5d 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest.py @@ -18,11 +18,11 @@ from servicelib.request_keys import RQT_USERID_KEY from servicelib.rest_constants import RESPONSE_MODEL_POLICY -from .._meta import API_VTAG as VTAG -from ..constants import RQ_PRODUCT_KEY -from ..login.decorators import login_required -from ..security.decorators import permission_required -from . import _computations_service +from ..._meta import API_VTAG as VTAG +from ...constants import RQ_PRODUCT_KEY +from ...login.decorators import login_required +from ...security.decorators import permission_required +from .. import _computations_service from ._computations_rest_schema import ( ComputationRunListQueryParams, ComputationTaskListQueryParams, diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py similarity index 85% rename from services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py rename to services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py index 9ef9c613a04b..92b08d6c351a 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_rest_schema.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py @@ -6,13 +6,19 @@ ### Computation Run - ComputationRunListOrderParams = create_ordering_query_model_class( ordering_fields={ "submitted_at", + "started_at", + "ended_at", + "state", }, default=OrderBy(field=IDStr("submitted_at")), - ordering_fields_api_to_column_map={"submitted_at": "created"}, + ordering_fields_api_to_column_map={ + "submitted_at": "created", + "started_at": "started", + "ended_at": "ended", + }, ) From 72f289ebed821072fbec2b5f79094d358682a51f Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 14:45:55 +0200 Subject: [PATCH 11/19] add db indexes --- ...f65f7786cd4b_add_indexes_to_comp_tables.py | 53 +++++++++++++++++++ .../models/comp_runs.py | 1 + .../models/comp_tasks.py | 1 + 3 files changed, 55 insertions(+) create mode 100644 packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py new file mode 100644 index 000000000000..49c5aae9c92f --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py @@ -0,0 +1,53 @@ +"""add indexes to comp tables + +Revision ID: f65f7786cd4b +Revises: cf8f743fd0b7 +Create Date: 2025-04-17 12:44:27.577984+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f65f7786cd4b" +down_revision = "cf8f743fd0b7" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_index("ix_comp_runs_user_id", "comp_runs", ["user_id"], unique=False) + op.create_index( + "ix_comp_tasks_project_id", "comp_tasks", ["project_id"], unique=False + ) + op.drop_index("idx_projects_last_change_date_desc", table_name="projects") + op.create_index( + "idx_projects_last_change_date_desc", + "projects", + ["last_change_date"], + unique=False, + postgresql_using="btree", + postgresql_ops={"last_change_date": "DESC"}, + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + "idx_projects_last_change_date_desc", + table_name="projects", + postgresql_using="btree", + postgresql_ops={"last_change_date": "DESC"}, + ) + op.create_index( + "idx_projects_last_change_date_desc", + "projects", + [sa.text("last_change_date DESC")], + unique=False, + ) + op.drop_index("ix_comp_tasks_project_id", table_name="comp_tasks") + op.drop_index("ix_comp_runs_user_id", table_name="comp_runs") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py index 26adb63077e1..af14196d1849 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py @@ -99,4 +99,5 @@ doc="the run uses on demand clusters", ), sa.UniqueConstraint("project_uuid", "user_id", "iteration"), + sa.Index("ix_comp_runs_user_id", "user_id"), ) diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py index 096447f7366b..15c3ddbacd1f 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py @@ -110,6 +110,7 @@ class NodeClass(enum.Enum): ), # ------ sa.UniqueConstraint("project_id", "node_id", name="project_node_uniqueness"), + sa.Index("ix_comp_tasks_project_id", "project_id"), ) register_modified_datetime_auto_update_trigger(comp_tasks) From 243c5dccc1ae16f8e24537cfd624eb176b2c878b Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 14:59:33 +0200 Subject: [PATCH 12/19] fix --- .../director_v2/_controller/__init__.py | 2 +- .../{_computations_rest.py => computations_rest.py} | 0 .../src/simcore_service_webserver/director_v2/plugin.py | 4 ++-- 3 files changed, 3 insertions(+), 3 deletions(-) rename services/web/server/src/simcore_service_webserver/director_v2/_controller/{_computations_rest.py => computations_rest.py} (100%) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py index 1bed5207e944..7fa642dff799 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py @@ -1 +1 @@ -from . import rest +from . import computations_rest, rest diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py similarity index 100% rename from services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest.py rename to services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py diff --git a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py index 119446b637d1..d847156841b4 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py @@ -9,7 +9,7 @@ ) from ..rest.plugin import setup_rest -from . import _computations_rest, _controller +from . import _controller from ._client import DirectorV2RestClient, get_directorv2_client, set_directorv2_client from ._director_v2_abc_default_service import DefaultProjectRunPolicy from ._director_v2_abc_service import set_project_run_policy @@ -38,7 +38,7 @@ def setup_director_v2(app: web.Application): if is_setup_completed(setup_rest.metadata()["module_name"], app): set_project_run_policy(app, DefaultProjectRunPolicy()) app.router.add_routes(_controller.rest.routes) - app.router.add_routes(_computations_rest.routes) + app.router.add_routes(_controller.computations_rest.routes) else: _logger.warning( From fb2a8d188eba756f5f9c6ea7f276b2bfe7f27131 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 17:01:30 +0200 Subject: [PATCH 13/19] update open api specs --- api/specs/web-server/_computations.py | 12 +- .../api/v0/openapi.yaml | 108 ++++++++++++++++-- 2 files changed, 106 insertions(+), 14 deletions(-) diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index a43895d87ff6..9623012ee3c1 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -5,12 +5,14 @@ from models_library.api_schemas_webserver.computations import ( ComputationGet, ComputationPathParams, + ComputationRunRestGet, ComputationStart, ComputationStarted, + ComputationTaskRestGet, ) from models_library.generics import Envelope from simcore_service_webserver._meta import API_VTAG -from simcore_service_webserver.director_v2._computations_rest_schema import ( +from simcore_service_webserver.director_v2._controller._computations_rest_schema import ( ComputationRunListQueryParams, ComputationTaskListQueryParams, ComputationTaskPathParams, @@ -63,9 +65,7 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]): @router.get( "/computations/-/iterations/latest", - response_model=Envelope[list[ComputationGet]], - name="list_computations_latest_iteration", - description="Lists the latest iteration of computations", + response_model=Envelope[list[ComputationRunRestGet]], ) async def list_computations_latest_iteration( _query: Annotated[as_query(ComputationRunListQueryParams), Depends()], @@ -74,9 +74,7 @@ async def list_computations_latest_iteration( @router.get( "/computations/{project_id}/iterations/latest/tasks", - response_model=Envelope[list[ComputationGet]], - name="list_computations_latest_iteration_tasks", - description="Lists the latest iteration tasks for a computation", + response_model=Envelope[list[ComputationTaskRestGet]], ) async def list_computations_latest_iteration_tasks( _query: Annotated[as_query(ComputationTaskListQueryParams), Depends()], diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index e3acf2d1f2e6..ed9e97277762 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2542,7 +2542,6 @@ paths: - computations - projects summary: List Computations Latest Iteration - description: Lists the latest iteration of computations operationId: list_computations_latest_iteration parameters: - name: order_by @@ -2574,14 +2573,13 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_list_ComputationGet__' + $ref: '#/components/schemas/Envelope_list_ComputationRunRestGet__' /v0/computations/{project_id}/iterations/latest/tasks: get: tags: - computations - projects summary: List Computations Latest Iteration Tasks - description: Lists the latest iteration tasks for a computation operationId: list_computations_latest_iteration_tasks parameters: - name: project_id @@ -2620,7 +2618,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_list_ComputationGet__' + $ref: '#/components/schemas/Envelope_list_ComputationTaskRestGet__' /v0/projects/{project_id}:xport: post: tags: @@ -8726,6 +8724,46 @@ components: - submitted - url title: ComputationGet + ComputationRunRestGet: + properties: + project_uuid: + type: string + format: uuid + title: Project Uuid + iteration: + type: integer + title: Iteration + state: + $ref: '#/components/schemas/RunningState' + info: + type: object + title: Info + submitted_at: + type: string + format: date-time + title: Submitted At + started_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Started At + ended_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Ended At + type: object + required: + - project_uuid + - iteration + - state + - info + - submitted_at + - started_at + - ended_at + title: ComputationRunRestGet ComputationStart: properties: force_restart: @@ -8759,6 +8797,46 @@ components: required: - pipeline_id title: ComputationStarted + ComputationTaskRestGet: + properties: + project_uuid: + type: string + format: uuid + title: Project Uuid + node_id: + type: string + format: uuid + title: Node Id + state: + $ref: '#/components/schemas/RunningState' + progress: + type: number + title: Progress + image: + type: object + title: Image + started_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Started At + ended_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Ended At + type: object + required: + - project_uuid + - node_id + - state + - progress + - image + - started_at + - ended_at + title: ComputationTaskRestGet ConnectServiceToPricingPlanBodyParams: properties: serviceKey: @@ -10111,12 +10189,28 @@ components: title: Error type: object title: Envelope[list[ApiKeyGet]] - Envelope_list_ComputationGet__: + Envelope_list_ComputationRunRestGet__: + properties: + data: + anyOf: + - items: + $ref: '#/components/schemas/ComputationRunRestGet' + type: array + - type: 'null' + title: Data + error: + anyOf: + - {} + - type: 'null' + title: Error + type: object + title: Envelope[list[ComputationRunRestGet]] + Envelope_list_ComputationTaskRestGet__: properties: data: anyOf: - items: - $ref: '#/components/schemas/ComputationGet' + $ref: '#/components/schemas/ComputationTaskRestGet' type: array - type: 'null' title: Data @@ -10126,7 +10220,7 @@ components: - type: 'null' title: Error type: object - title: Envelope[list[ComputationGet]] + title: Envelope[list[ComputationTaskRestGet]] Envelope_list_DatasetMetaData__: properties: data: From d786c64d70f4fd6b4db0d36779cbc61258f938c9 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 17:16:56 +0200 Subject: [PATCH 14/19] review @pcrespov --- .../api_schemas_webserver/computations.py | 71 +++++++++++++++---- .../_controller/_computations_rest_schema.py | 1 + .../_controller/computations_rest.py | 11 ++- services/web/server/tests/unit/conftest.py | 27 ++++++- .../server/tests/unit/with_dbs/01/conftest.py | 26 ------- .../server/tests/unit/with_dbs/02/conftest.py | 20 ------ 6 files changed, 93 insertions(+), 63 deletions(-) diff --git a/packages/models-library/src/models_library/api_schemas_webserver/computations.py b/packages/models-library/src/models_library/api_schemas_webserver/computations.py index a4fcf4f978e5..6a49129034fa 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/computations.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/computations.py @@ -1,20 +1,27 @@ from datetime import datetime -from typing import Annotated, Any, NamedTuple +from typing import Annotated, Any from common_library.basic_types import DEFAULT_FACTORY from pydantic import ( BaseModel, + ConfigDict, Field, - PositiveInt, ) from ..api_schemas_directorv2.computations import ( ComputationGet as _DirectorV2ComputationGet, ) +from ..basic_types import IDStr from ..projects import CommitID, ProjectID from ..projects_nodes_io import NodeID from ..projects_state import RunningState -from ._base import InputSchemaWithoutCamelCase, OutputSchemaWithoutCamelCase +from ..rest_ordering import OrderBy, create_ordering_query_model_class +from ..rest_pagination import PageQueryParameters +from ._base import ( + InputSchemaWithoutCamelCase, + OutputSchema, + OutputSchemaWithoutCamelCase, +) class ComputationPathParams(BaseModel): @@ -50,7 +57,32 @@ class ComputationStarted(OutputSchemaWithoutCamelCase): ] = DEFAULT_FACTORY -class ComputationRunRestGet(BaseModel): +### Computation Run + + +ComputationRunListOrderParams = create_ordering_query_model_class( + ordering_fields={ + "submitted_at", + "started_at", + "ended_at", + "state", + }, + default=OrderBy(field=IDStr("submitted_at")), + ordering_fields_api_to_column_map={ + "submitted_at": "created", + "started_at": "started", + "ended_at": "ended", + }, +) + + +class ComputationRunListQueryParams( + PageQueryParameters, + ComputationRunListOrderParams, # type: ignore[misc, valid-type] +): ... + + +class ComputationRunRestGet(OutputSchema): project_uuid: ProjectID iteration: int state: RunningState @@ -60,12 +92,30 @@ class ComputationRunRestGet(BaseModel): ended_at: datetime | None -class ComputationRunRestGetPage(NamedTuple): - items: list[ComputationRunRestGet] - total: PositiveInt +### Computation Task + + +class ComputationTaskPathParams(BaseModel): + project_id: ProjectID + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + +ComputationTaskListOrderParams = create_ordering_query_model_class( + ordering_fields={ + "started_at", + }, + default=OrderBy(field=IDStr("started_at")), + ordering_fields_api_to_column_map={"started_at": "start"}, +) -class ComputationTaskRestGet(BaseModel): +class ComputationTaskListQueryParams( + PageQueryParameters, + ComputationTaskListOrderParams, # type: ignore[misc, valid-type] +): ... + + +class ComputationTaskRestGet(OutputSchema): project_uuid: ProjectID node_id: NodeID state: RunningState @@ -73,8 +123,3 @@ class ComputationTaskRestGet(BaseModel): image: dict[str, Any] started_at: datetime | None ended_at: datetime | None - - -class ComputationTaskRestGetPage(NamedTuple): - items: list[ComputationTaskRestGet] - total: PositiveInt diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py index 92b08d6c351a..4791a1c9743b 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py @@ -6,6 +6,7 @@ ### Computation Run + ComputationRunListOrderParams = create_ordering_query_model_class( ordering_fields={ "submitted_at", diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index 47eae927ea5d..cc3e915e97ee 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -67,13 +67,17 @@ async def list_computations_latest_iteration(request: web.Request) -> web.Respon page = Page[ComputationRunRestGet].model_validate( paginate_data( - chunk=[ComputationRunRestGet(**task.dict()) for task in _get.items], + chunk=[ + ComputationRunRestGet.model_validate(task, from_attributes=True) + for task in _get.items + ], total=_get.total, limit=query_params.limit, offset=query_params.offset, request_url=request.url, ) ) + return web.Response( text=page.model_dump_json(**RESPONSE_MODEL_POLICY), content_type=MIMETYPE_APPLICATION_JSON, @@ -111,7 +115,10 @@ async def list_computations_latest_iteration_tasks( page = Page[ComputationTaskRestGet].model_validate( paginate_data( - chunk=[ComputationTaskRestGet(**task.dict()) for task in _get.items], + chunk=[ + ComputationTaskRestGet.model_validate(task, from_attributes=True) + for task in _get.items + ], total=_get.total, limit=query_params.limit, offset=query_params.offset, diff --git a/services/web/server/tests/unit/conftest.py b/services/web/server/tests/unit/conftest.py index 7f3d23819916..d99275c43573 100644 --- a/services/web/server/tests/unit/conftest.py +++ b/services/web/server/tests/unit/conftest.py @@ -7,17 +7,20 @@ import json import sys -from collections.abc import Callable, Iterable +from collections.abc import AsyncIterator, Callable, Iterable from pathlib import Path from typing import Any import pytest import yaml +from aiohttp.test_utils import TestClient from models_library.products import ProductName from pytest_mock import MockFixture, MockType -from pytest_simcore.helpers.webserver_projects import empty_project_data +from pytest_simcore.helpers.webserver_login import UserInfoDict +from pytest_simcore.helpers.webserver_projects import NewProject, empty_project_data from simcore_service_webserver.application_settings_utils import AppConfigDict from simcore_service_webserver.constants import FRONTEND_APP_DEFAULT +from simcore_service_webserver.projects.models import ProjectDict CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent @@ -93,3 +96,23 @@ def disabled_setup_garbage_collector(mocker: MockFixture) -> MockType: @pytest.fixture(scope="session") def product_name() -> ProductName: return ProductName(FRONTEND_APP_DEFAULT) + + +@pytest.fixture +async def user_project( + client: TestClient, + fake_project: ProjectDict, + logged_user: UserInfoDict, + tests_data_dir: Path, + osparc_product_name: str, +) -> AsyncIterator[ProjectDict]: + async with NewProject( + fake_project, + client.app, + user_id=logged_user["id"], + product_name=osparc_product_name, + tests_data_dir=tests_data_dir, + ) as project: + print("-----> added project", project["name"]) + yield project + print("<----- removed project", project["name"]) diff --git a/services/web/server/tests/unit/with_dbs/01/conftest.py b/services/web/server/tests/unit/with_dbs/01/conftest.py index 67f98989effd..5e77d24abf3f 100644 --- a/services/web/server/tests/unit/with_dbs/01/conftest.py +++ b/services/web/server/tests/unit/with_dbs/01/conftest.py @@ -1,14 +1,8 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable -from collections.abc import AsyncIterator -from pathlib import Path import pytest -from aiohttp.test_utils import TestClient -from pytest_simcore.helpers.webserver_login import UserInfoDict -from pytest_simcore.helpers.webserver_projects import NewProject -from simcore_service_webserver.projects.models import ProjectDict @pytest.fixture @@ -18,23 +12,3 @@ def app_environment( # NOTE: overrides app_environment monkeypatch.setenv("WEBSERVER_GARBAGE_COLLECTOR", "null") return app_environment | {"WEBSERVER_GARBAGE_COLLECTOR": "null"} - - -@pytest.fixture -async def user_project( - client: TestClient, - fake_project: ProjectDict, - logged_user: UserInfoDict, - tests_data_dir: Path, - osparc_product_name: str, -) -> AsyncIterator[ProjectDict]: - async with NewProject( - fake_project, - client.app, - user_id=logged_user["id"], - product_name=osparc_product_name, - tests_data_dir=tests_data_dir, - ) as project: - print("-----> added project", project["name"]) - yield project - print("<----- removed project", project["name"]) diff --git a/services/web/server/tests/unit/with_dbs/02/conftest.py b/services/web/server/tests/unit/with_dbs/02/conftest.py index 674da4193f42..339e154e95e5 100644 --- a/services/web/server/tests/unit/with_dbs/02/conftest.py +++ b/services/web/server/tests/unit/with_dbs/02/conftest.py @@ -104,26 +104,6 @@ def mock_catalog_api( } -@pytest.fixture -async def user_project( - client: TestClient, - fake_project: ProjectDict, - logged_user: UserInfoDict, - tests_data_dir: Path, - osparc_product_name: str, -) -> AsyncIterator[ProjectDict]: - async with NewProject( - fake_project, - client.app, - user_id=logged_user["id"], - product_name=osparc_product_name, - tests_data_dir=tests_data_dir, - ) as project: - print("-----> added project", project["name"]) - yield project - print("<----- removed project", project["name"]) - - @pytest.fixture async def shared_project( client, From f552f42c85732b611487b495443b8fbac3c629fc Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 17 Apr 2025 21:41:41 +0200 Subject: [PATCH 15/19] fix --- .../src/models_library/projects_state.py | 4 +- .../_controller/_computations_rest_schema.py | 52 ------------------- .../_controller/computations_rest.py | 13 +++-- 3 files changed, 9 insertions(+), 60 deletions(-) delete mode 100644 services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py diff --git a/packages/models-library/src/models_library/projects_state.py b/packages/models-library/src/models_library/projects_state.py index ca5698ed6b2c..8080881006ed 100644 --- a/packages/models-library/src/models_library/projects_state.py +++ b/packages/models-library/src/models_library/projects_state.py @@ -1,5 +1,5 @@ """ - Models both project and node states +Models both project and node states """ from enum import Enum, unique @@ -30,6 +30,7 @@ class RunningState(str, Enum): PENDING = "PENDING" WAITING_FOR_RESOURCES = "WAITING_FOR_RESOURCES" STARTED = "STARTED" + RUNNING = "RUNNING" SUCCESS = "SUCCESS" FAILED = "FAILED" ABORTED = "ABORTED" @@ -41,6 +42,7 @@ def is_running(self) -> bool: RunningState.PENDING, RunningState.WAITING_FOR_RESOURCES, RunningState.STARTED, + RunningState.RUNNING, RunningState.WAITING_FOR_CLUSTER, ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py deleted file mode 100644 index 4791a1c9743b..000000000000 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/_computations_rest_schema.py +++ /dev/null @@ -1,52 +0,0 @@ -from models_library.basic_types import IDStr -from models_library.projects import ProjectID -from models_library.rest_ordering import OrderBy, create_ordering_query_model_class -from models_library.rest_pagination import PageQueryParameters -from pydantic import BaseModel, ConfigDict - -### Computation Run - - -ComputationRunListOrderParams = create_ordering_query_model_class( - ordering_fields={ - "submitted_at", - "started_at", - "ended_at", - "state", - }, - default=OrderBy(field=IDStr("submitted_at")), - ordering_fields_api_to_column_map={ - "submitted_at": "created", - "started_at": "started", - "ended_at": "ended", - }, -) - - -class ComputationRunListQueryParams( - PageQueryParameters, - ComputationRunListOrderParams, # type: ignore[misc, valid-type] -): ... - - -### Computation Task - - -class ComputationTaskPathParams(BaseModel): - project_id: ProjectID - model_config = ConfigDict(populate_by_name=True, extra="forbid") - - -ComputationTaskListOrderParams = create_ordering_query_model_class( - ordering_fields={ - "started_at", - }, - default=OrderBy(field=IDStr("started_at")), - ordering_fields_api_to_column_map={"started_at": "start"}, -) - - -class ComputationTaskListQueryParams( - PageQueryParameters, - ComputationTaskListOrderParams, # type: ignore[misc, valid-type] -): ... diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index cc3e915e97ee..9631d10f27ed 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -2,10 +2,14 @@ from aiohttp import web from models_library.api_schemas_webserver.computations import ( + ComputationRunListQueryParams, ComputationRunRestGet, + ComputationTaskListQueryParams, + ComputationTaskPathParams, ComputationTaskRestGet, ) from models_library.rest_base import RequestParameters +from models_library.rest_ordering import OrderBy from models_library.rest_pagination import Page from models_library.rest_pagination_utils import paginate_data from models_library.users import UserID @@ -23,11 +27,6 @@ from ...login.decorators import login_required from ...security.decorators import permission_required from .. import _computations_service -from ._computations_rest_schema import ( - ComputationRunListQueryParams, - ComputationTaskListQueryParams, - ComputationTaskPathParams, -) _logger = logging.getLogger(__name__) @@ -62,7 +61,7 @@ async def list_computations_latest_iteration(request: web.Request) -> web.Respon offset=query_params.offset, limit=query_params.limit, # ordering - order_by=query_params.order_by, + order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), ) page = Page[ComputationRunRestGet].model_validate( @@ -110,7 +109,7 @@ async def list_computations_latest_iteration_tasks( offset=query_params.offset, limit=query_params.limit, # ordering - order_by=query_params.order_by, + order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), ) page = Page[ComputationTaskRestGet].model_validate( From 9e58ce35ad061547955ef3efaee1ab83564003db Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 22 Apr 2025 11:38:30 +0200 Subject: [PATCH 16/19] review @sanderegg --- .../src/models_library/projects_state.py | 2 -- .../api/dependencies/database.py | 10 +--------- .../api/rpc/_computations.py | 17 +++++++---------- .../modules/db/repositories/comp_runs.py | 12 +++++++++--- .../modules/db/repositories/comp_tasks/_core.py | 15 ++++++++++----- .../director-v2/tests/unit/with_dbs/conftest.py | 3 +-- 6 files changed, 28 insertions(+), 31 deletions(-) diff --git a/packages/models-library/src/models_library/projects_state.py b/packages/models-library/src/models_library/projects_state.py index 8080881006ed..25d437cc2d57 100644 --- a/packages/models-library/src/models_library/projects_state.py +++ b/packages/models-library/src/models_library/projects_state.py @@ -30,7 +30,6 @@ class RunningState(str, Enum): PENDING = "PENDING" WAITING_FOR_RESOURCES = "WAITING_FOR_RESOURCES" STARTED = "STARTED" - RUNNING = "RUNNING" SUCCESS = "SUCCESS" FAILED = "FAILED" ABORTED = "ABORTED" @@ -42,7 +41,6 @@ def is_running(self) -> bool: RunningState.PENDING, RunningState.WAITING_FOR_RESOURCES, RunningState.STARTED, - RunningState.RUNNING, RunningState.WAITING_FOR_CLUSTER, ) diff --git a/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py b/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py index aee95be2e150..576b42a0c59c 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py +++ b/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py @@ -3,7 +3,7 @@ from typing import TypeVar, cast from aiopg.sa import Engine -from fastapi import Depends, FastAPI +from fastapi import Depends from fastapi.requests import Request from ...modules.db.repositories import BaseRepository @@ -47,11 +47,3 @@ async def _get_repo( yield get_base_repository(engine=engine, repo_type=repo_type) return _get_repo - - -def get_repository_instance(app: FastAPI, repo_type: type[RepoType]) -> RepoType: - """ - Retrieves an instance of the specified repository type using the database engine from the FastAPI app. - """ - engine = cast(Engine, app.state.engine) - return get_base_repository(engine=engine, repo_type=repo_type) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index 3b76a587a3a8..dbc32828e36c 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -12,7 +12,6 @@ from ...modules.db.repositories.comp_runs import CompRunsRepository from ...modules.db.repositories.comp_tasks import CompTasksRepository -from ..dependencies.database import get_repository_instance router = RPCRouter() @@ -29,7 +28,7 @@ async def list_computations_latest_iteration_page( # ordering order_by: OrderBy | None = None, ) -> ComputationRunRpcGetPage: - comp_runs_repo = get_repository_instance(app, CompRunsRepository) + comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) total, comp_runs = await comp_runs_repo.list_for_user__only_latest_iterations( product_name=product_name, user_id=user_id, @@ -59,14 +58,12 @@ async def list_computations_latest_iteration_tasks_page( assert product_name # nosec NOTE: Whether project_id belong to the product_name was checked in the webserver assert user_id # nosec NOTE: Whether user_id has access to the project was checked in the webserver - comp_tasks_repo = get_repository_instance(app, CompTasksRepository) - total, comp_runs = ( - await comp_tasks_repo.list_computational_tasks_for_frontend_client( - project_id=project_id, - offset=offset, - limit=limit, - order_by=order_by, - ) + comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine) + total, comp_runs = await comp_tasks_repo.list_computational_tasks_rpc_domain( + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, ) return ComputationTaskRpcGetPage( items=comp_runs, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 9f494997557d..8b26c0448345 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -25,7 +25,7 @@ UserNotFoundError, ) from ....models.comp_runs import CompRunsAtDB, RunMetadataDict -from ....utils.db import RUNNING_STATE_TO_DB +from ....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB from ..tables import comp_runs from ._base import BaseRepository @@ -215,8 +215,14 @@ async def list_for_user__only_latest_iterations( total_count = await conn.scalar(count_query) result = await conn.execute(list_query) - items: list[ComputationRunRpcGet] = [ - ComputationRunRpcGet.model_validate(row) async for row in result + items = [ + ComputationRunRpcGet.model_validate( + { + **row, + "state": DB_TO_RUNNING_STATE[row["state"]], + } + ) + async for row in result ] return cast(int, total_count), items diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index 903348ca09ea..93f39295feda 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -24,7 +24,7 @@ from .....models.comp_tasks import CompTaskAtDB from .....modules.resource_usage_tracker_client import ResourceUsageTrackerClient from .....utils.computations import to_node_class -from .....utils.db import RUNNING_STATE_TO_DB +from .....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB from ....catalog import CatalogClient from ...tables import NodeClass, StateType, comp_tasks from .._base import BaseRepository @@ -77,7 +77,7 @@ async def list_computational_tasks( tasks.append(task_db) return tasks - async def list_computational_tasks_for_frontend_client( + async def list_computational_tasks_rpc_domain( self, *, project_id: ProjectID, @@ -127,10 +127,15 @@ async def list_computational_tasks_for_frontend_client( total_count = await conn.scalar(count_query) result = await conn.execute(list_query) - items: list[ComputationTaskRpcGet] = [ - ComputationTaskRpcGet.model_validate(row) async for row in result + items = [ + ComputationTaskRpcGet.model_validate( + { + **row, + "state": DB_TO_RUNNING_STATE[row["state"]], # Convert the state + } + ) + async for row in result ] - return cast(int, total_count), items async def task_exists(self, project_id: ProjectID, node_id: NodeID) -> bool: diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index cbed218aa13b..fb5056ad2e70 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -18,7 +18,6 @@ from fastapi.encoders import jsonable_encoder from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID -from models_library.projects_state import RunningState from pydantic.main import BaseModel from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline from simcore_postgres_database.models.comp_runs import comp_runs @@ -194,7 +193,7 @@ async def _( "project_uuid": f"{project.uuid}", "user_id": user["id"], "iteration": 1, - "result": RunningState.NOT_STARTED, + "result": StateType.NOT_STARTED, "metadata": jsonable_encoder(run_metadata), "use_on_demand_clusters": False, } From 871f2628efad1afb4b13dff20ef09584dc6a91f9 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 22 Apr 2025 11:47:59 +0200 Subject: [PATCH 17/19] update open api specs --- api/specs/web-server/_computations.py | 2 +- .../api/v0/openapi.yaml | 48 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index 9623012ee3c1..8c7038fed20d 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -12,7 +12,7 @@ ) from models_library.generics import Envelope from simcore_service_webserver._meta import API_VTAG -from simcore_service_webserver.director_v2._controller._computations_rest_schema import ( +from simcore_service_webserver.director_v2._controller.computations_restp import ( ComputationRunListQueryParams, ComputationTaskListQueryParams, ComputationTaskPathParams, diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index ed9e97277762..73a7abf032c0 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -8726,10 +8726,10 @@ components: title: ComputationGet ComputationRunRestGet: properties: - project_uuid: + projectUuid: type: string format: uuid - title: Project Uuid + title: Projectuuid iteration: type: integer title: Iteration @@ -8738,31 +8738,31 @@ components: info: type: object title: Info - submitted_at: + submittedAt: type: string format: date-time - title: Submitted At - started_at: + title: Submittedat + startedAt: anyOf: - type: string format: date-time - type: 'null' - title: Started At - ended_at: + title: Startedat + endedAt: anyOf: - type: string format: date-time - type: 'null' - title: Ended At + title: Endedat type: object required: - - project_uuid + - projectUuid - iteration - state - info - - submitted_at - - started_at - - ended_at + - submittedAt + - startedAt + - endedAt title: ComputationRunRestGet ComputationStart: properties: @@ -8799,14 +8799,14 @@ components: title: ComputationStarted ComputationTaskRestGet: properties: - project_uuid: + projectUuid: type: string format: uuid - title: Project Uuid - node_id: + title: Projectuuid + nodeId: type: string format: uuid - title: Node Id + title: Nodeid state: $ref: '#/components/schemas/RunningState' progress: @@ -8815,27 +8815,27 @@ components: image: type: object title: Image - started_at: + startedAt: anyOf: - type: string format: date-time - type: 'null' - title: Started At - ended_at: + title: Startedat + endedAt: anyOf: - type: string format: date-time - type: 'null' - title: Ended At + title: Endedat type: object required: - - project_uuid - - node_id + - projectUuid + - nodeId - state - progress - image - - started_at - - ended_at + - startedAt + - endedAt title: ComputationTaskRestGet ConnectServiceToPricingPlanBodyParams: properties: From 8db4e6336626e0d36c55d3df2131cfc31334aedb Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 22 Apr 2025 11:52:31 +0200 Subject: [PATCH 18/19] update open api specs --- api/specs/web-server/_computations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index 8c7038fed20d..a1858091106a 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -12,7 +12,7 @@ ) from models_library.generics import Envelope from simcore_service_webserver._meta import API_VTAG -from simcore_service_webserver.director_v2._controller.computations_restp import ( +from simcore_service_webserver.director_v2._controller.computations_rest import ( ComputationRunListQueryParams, ComputationTaskListQueryParams, ComputationTaskPathParams, From 9bc685a34adb6be71c5a437b291fff53ce653c8c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 22 Apr 2025 16:27:37 +0200 Subject: [PATCH 19/19] fix ordering of tasks --- .../modules/comp_scheduler/_scheduler_base.py | 18 ++++++++++++------ .../comp_scheduler/test_scheduler_dask.py | 3 +-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index 22b8677611fa..28b9373cec43 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -116,7 +116,7 @@ class SortedTasks: async def _triage_changed_tasks( - changed_tasks: list[tuple[_Previous, _Current]] + changed_tasks: list[tuple[_Previous, _Current]], ) -> SortedTasks: started_tasks = [ current @@ -242,9 +242,13 @@ async def _set_schedule_done( async def _set_states_following_failed_to_aborted( self, project_id: ProjectID, dag: nx.DiGraph ) -> dict[NodeIDStr, CompTaskAtDB]: - tasks: dict[NodeIDStr, CompTaskAtDB] = await self._get_pipeline_tasks( - project_id, dag - ) + tasks = await self._get_pipeline_tasks(project_id, dag) + # Perform a reverse topological sort to ensure tasks are ordered from last to first + sorted_node_ids = list(reversed(list(nx.topological_sort(dag)))) + tasks = { + node_id: tasks[node_id] for node_id in sorted_node_ids if node_id in tasks + } + # we need the tasks ordered from the last task to the first node_ids_to_set_as_aborted: set[NodeIDStr] = set() for task in tasks.values(): if task.state == RunningState.FAILED: @@ -651,8 +655,10 @@ async def _schedule_tasks_to_stop( ) -> None: # get any running task and stop them comp_tasks_repo = CompTasksRepository.instance(self.db_engine) - await comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( - project_id + await ( + comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( + project_id + ) ) # stop any remaining running task, these are already submitted if tasks_to_stop := [ diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index fb06b116c70b..bb4adba21357 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -1451,7 +1451,6 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData: project_uuid=running_project.project.uuid, task_ids=[ running_project.tasks[1].node_id, - running_project.tasks[2].node_id, running_project.tasks[3].node_id, ], expected_state=reboot_state.expected_task_state_group1, @@ -1460,7 +1459,7 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData: await assert_comp_tasks( sqlalchemy_async_engine, project_uuid=running_project.project.uuid, - task_ids=[running_project.tasks[4].node_id], + task_ids=[running_project.tasks[2].node_id, running_project.tasks[4].node_id], expected_state=reboot_state.expected_task_state_group2, expected_progress=reboot_state.expected_task_progress_group2, )