diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index 59e0e873edc1..a1858091106a 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -1,14 +1,22 @@ from typing import Annotated +from _common import as_query from fastapi import APIRouter, Depends, status from models_library.api_schemas_webserver.computations import ( ComputationGet, ComputationPathParams, + ComputationRunRestGet, ComputationStart, ComputationStarted, + ComputationTaskRestGet, ) from models_library.generics import Envelope from simcore_service_webserver._meta import API_VTAG +from simcore_service_webserver.director_v2._controller.computations_rest import ( + ComputationRunListQueryParams, + ComputationTaskListQueryParams, + ComputationTaskPathParams, +) router = APIRouter( prefix=f"/{API_VTAG}", @@ -53,3 +61,22 @@ async def start_computation( status_code=status.HTTP_204_NO_CONTENT, ) async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]): ... + + +@router.get( + "/computations/-/iterations/latest", + response_model=Envelope[list[ComputationRunRestGet]], +) +async def list_computations_latest_iteration( + _query: Annotated[as_query(ComputationRunListQueryParams), Depends()], +): ... + + +@router.get( + "/computations/{project_id}/iterations/latest/tasks", + response_model=Envelope[list[ComputationTaskRestGet]], +) +async def list_computations_latest_iteration_tasks( + _query: Annotated[as_query(ComputationTaskListQueryParams), Depends()], + _path: Annotated[ComputationTaskPathParams, Depends()], +): ... diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py b/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py index c96b6f2563e6..6ab84683cdb9 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/__init__.py @@ -1,8 +1,19 @@ +from typing import Final + +from pydantic import TypeAdapter + +from ..rabbitmq_basic_types import RPCNamespace from . import clusters, dynamic_services assert clusters # nosec assert dynamic_services # nosec + +DIRECTOR_V2_RPC_NAMESPACE: Final[RPCNamespace] = TypeAdapter( + RPCNamespace +).validate_python("director-v2") + + __all__: tuple[str, ...] = ( "clusters", "dynamic_services", diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py new file mode 100644 index 000000000000..005c9c95412d --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_directorv2/comp_runs.py @@ -0,0 +1,103 @@ +from datetime import datetime +from typing import Annotated, Any, NamedTuple + +from pydantic import ( + BaseModel, + BeforeValidator, + ConfigDict, + PositiveInt, +) + +from ..projects import ProjectID +from ..projects_nodes_io import NodeID +from ..projects_state import RunningState + + +class ComputationRunRpcGet(BaseModel): + project_uuid: ProjectID + iteration: int + state: RunningState + info: dict[str, Any] + submitted_at: datetime + started_at: datetime | None + ended_at: datetime | None + + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "project_uuid": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "iteration": 1, + "state": "SUCCESS", + "info": { + "wallet_id": 9866, + "user_email": "test@example.net", + "wallet_name": "test", + "product_name": "osparc", + "project_name": "test", + "project_metadata": { + "parent_node_id": "12e0c8b2-bad6-40fb-9948-8dec4f65d4d9", + "parent_node_name": "UJyfwFVYySnPCaLuQIaz", + "parent_project_id": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "parent_project_name": "qTjDmYPxeqAWfCKCQCYF", + "root_parent_node_id": "37176e84-d977-4993-bc49-d76fcfc6e625", + "root_parent_node_name": "UEXExIZVPeFzGRmMglPr", + "root_parent_project_id": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "root_parent_project_name": "FuDpjjFIyeNTWRUWCuKo", + }, + "node_id_names_map": {}, + "simcore_user_agent": "agent", + }, + "submitted_at": "2023-01-11 13:11:47.293595", + "started_at": "2023-01-11 13:11:47.293595", + "ended_at": "2023-01-11 13:11:47.293595", + } + ] + } + ) + + +class ComputationRunRpcGetPage(NamedTuple): + items: list[ComputationRunRpcGet] + total: PositiveInt + + +def _none_to_zero_float_pre_validator(value: Any): + if value is None: + return 0.0 + return value + + +class ComputationTaskRpcGet(BaseModel): + project_uuid: ProjectID + node_id: NodeID + state: RunningState + progress: Annotated[float, BeforeValidator(_none_to_zero_float_pre_validator)] + image: dict[str, Any] + started_at: datetime | None + ended_at: datetime | None + + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "project_uuid": "beb16d18-d57d-44aa-a638-9727fa4a72ef", + "node_id": "12e0c8b2-bad6-40fb-9948-8dec4f65d4d9", + "state": "SUCCESS", + "progress": 0.0, + "image": { + "name": "simcore/services/comp/ti-solutions-optimizer", + "tag": "1.0.19", + "node_requirements": {"CPU": 8.0, "RAM": 25769803776}, + }, + "started_at": "2023-01-11 13:11:47.293595", + "ended_at": "2023-01-11 13:11:47.293595", + } + ] + } + ) + + +class ComputationTaskRpcGetPage(NamedTuple): + items: list[ComputationTaskRpcGet] + total: PositiveInt diff --git a/packages/models-library/src/models_library/api_schemas_webserver/computations.py b/packages/models-library/src/models_library/api_schemas_webserver/computations.py index 3a251c6141bc..6a49129034fa 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/computations.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/computations.py @@ -1,13 +1,27 @@ -from typing import Annotated +from datetime import datetime +from typing import Annotated, Any from common_library.basic_types import DEFAULT_FACTORY -from pydantic import BaseModel, Field +from pydantic import ( + BaseModel, + ConfigDict, + Field, +) from ..api_schemas_directorv2.computations import ( ComputationGet as _DirectorV2ComputationGet, ) +from ..basic_types import IDStr from ..projects import CommitID, ProjectID -from ._base import InputSchemaWithoutCamelCase, OutputSchemaWithoutCamelCase +from ..projects_nodes_io import NodeID +from ..projects_state import RunningState +from ..rest_ordering import OrderBy, create_ordering_query_model_class +from ..rest_pagination import PageQueryParameters +from ._base import ( + InputSchemaWithoutCamelCase, + OutputSchema, + OutputSchemaWithoutCamelCase, +) class ComputationPathParams(BaseModel): @@ -41,3 +55,71 @@ class ComputationStarted(OutputSchemaWithoutCamelCase): json_schema_extra={"default": []}, ), ] = DEFAULT_FACTORY + + +### Computation Run + + +ComputationRunListOrderParams = create_ordering_query_model_class( + ordering_fields={ + "submitted_at", + "started_at", + "ended_at", + "state", + }, + default=OrderBy(field=IDStr("submitted_at")), + ordering_fields_api_to_column_map={ + "submitted_at": "created", + "started_at": "started", + "ended_at": "ended", + }, +) + + +class ComputationRunListQueryParams( + PageQueryParameters, + ComputationRunListOrderParams, # type: ignore[misc, valid-type] +): ... + + +class ComputationRunRestGet(OutputSchema): + project_uuid: ProjectID + iteration: int + state: RunningState + info: dict[str, Any] + submitted_at: datetime + started_at: datetime | None + ended_at: datetime | None + + +### Computation Task + + +class ComputationTaskPathParams(BaseModel): + project_id: ProjectID + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + +ComputationTaskListOrderParams = create_ordering_query_model_class( + ordering_fields={ + "started_at", + }, + default=OrderBy(field=IDStr("started_at")), + ordering_fields_api_to_column_map={"started_at": "start"}, +) + + +class ComputationTaskListQueryParams( + PageQueryParameters, + ComputationTaskListOrderParams, # type: ignore[misc, valid-type] +): ... + + +class ComputationTaskRestGet(OutputSchema): + project_uuid: ProjectID + node_id: NodeID + state: RunningState + progress: float + image: dict[str, Any] + started_at: datetime | None + ended_at: datetime | None diff --git a/packages/models-library/src/models_library/projects_state.py b/packages/models-library/src/models_library/projects_state.py index ca5698ed6b2c..25d437cc2d57 100644 --- a/packages/models-library/src/models_library/projects_state.py +++ b/packages/models-library/src/models_library/projects_state.py @@ -1,5 +1,5 @@ """ - Models both project and node states +Models both project and node states """ from enum import Enum, unique diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py new file mode 100644 index 000000000000..49c5aae9c92f --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/f65f7786cd4b_add_indexes_to_comp_tables.py @@ -0,0 +1,53 @@ +"""add indexes to comp tables + +Revision ID: f65f7786cd4b +Revises: cf8f743fd0b7 +Create Date: 2025-04-17 12:44:27.577984+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f65f7786cd4b" +down_revision = "cf8f743fd0b7" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_index("ix_comp_runs_user_id", "comp_runs", ["user_id"], unique=False) + op.create_index( + "ix_comp_tasks_project_id", "comp_tasks", ["project_id"], unique=False + ) + op.drop_index("idx_projects_last_change_date_desc", table_name="projects") + op.create_index( + "idx_projects_last_change_date_desc", + "projects", + ["last_change_date"], + unique=False, + postgresql_using="btree", + postgresql_ops={"last_change_date": "DESC"}, + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + "idx_projects_last_change_date_desc", + table_name="projects", + postgresql_using="btree", + postgresql_ops={"last_change_date": "DESC"}, + ) + op.create_index( + "idx_projects_last_change_date_desc", + "projects", + [sa.text("last_change_date DESC")], + unique=False, + ) + op.drop_index("ix_comp_tasks_project_id", table_name="comp_tasks") + op.drop_index("ix_comp_runs_user_id", table_name="comp_runs") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py index 26adb63077e1..af14196d1849 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py @@ -99,4 +99,5 @@ doc="the run uses on demand clusters", ), sa.UniqueConstraint("project_uuid", "user_id", "iteration"), + sa.Index("ix_comp_runs_user_id", "user_id"), ) diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py index 096447f7366b..15c3ddbacd1f 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py @@ -110,6 +110,7 @@ class NodeClass(enum.Enum): ), # ------ sa.UniqueConstraint("project_id", "node_id", name="project_node_uniqueness"), + sa.Index("ix_comp_tasks_project_id", "project_id"), ) register_modified_datetime_auto_update_trigger(comp_tasks) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/__init__.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py new file mode 100644 index 000000000000..74ae336188f2 --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py @@ -0,0 +1,85 @@ +# pylint: disable=too-many-arguments +import logging +from typing import Final + +from models_library.api_schemas_directorv2 import ( + DIRECTOR_V2_RPC_NAMESPACE, +) +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from pydantic import NonNegativeInt, TypeAdapter + +from ....logging_utils import log_decorator +from ... import RabbitMQRPCClient + +_logger = logging.getLogger(__name__) + + +_DEFAULT_TIMEOUT_S: Final[NonNegativeInt] = 20 + +_RPC_METHOD_NAME_ADAPTER: TypeAdapter[RPCMethodName] = TypeAdapter(RPCMethodName) + + +@log_decorator(_logger, level=logging.DEBUG) +async def list_computations_latest_iteration_page( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + product_name: ProductName, + user_id: UserID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationRunRpcGetPage: + result = await rabbitmq_rpc_client.request( + DIRECTOR_V2_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python( + "list_computations_latest_iteration_page" + ), + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, ComputationRunRpcGetPage) # nosec + return result + + +@log_decorator(_logger, level=logging.DEBUG) +async def list_computations_latest_iteration_tasks_page( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationTaskRpcGetPage: + result = await rabbitmq_rpc_client.request( + DIRECTOR_V2_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python( + "list_computations_latest_iteration_tasks_page" + ), + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, ComputationTaskRpcGetPage) # nosec + return result diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/__init__.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py new file mode 100644 index 000000000000..dbc32828e36c --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -0,0 +1,71 @@ +# pylint: disable=too-many-arguments +from fastapi import FastAPI +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from servicelib.rabbitmq import RPCRouter + +from ...modules.db.repositories.comp_runs import CompRunsRepository +from ...modules.db.repositories.comp_tasks import CompTasksRepository + +router = RPCRouter() + + +@router.expose(reraise_if_error_type=()) +async def list_computations_latest_iteration_page( + app: FastAPI, + *, + product_name: ProductName, + user_id: UserID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationRunRpcGetPage: + comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) + total, comp_runs = await comp_runs_repo.list_for_user__only_latest_iterations( + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + return ComputationRunRpcGetPage( + items=comp_runs, + total=total, + ) + + +@router.expose(reraise_if_error_type=()) +async def list_computations_latest_iteration_tasks_page( + app: FastAPI, + *, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationTaskRpcGetPage: + assert product_name # nosec NOTE: Whether project_id belong to the product_name was checked in the webserver + assert user_id # nosec NOTE: Whether user_id has access to the project was checked in the webserver + + comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine) + total, comp_runs = await comp_tasks_repo.list_computational_tasks_rpc_domain( + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + return ComputationTaskRpcGetPage( + items=comp_runs, + total=total, + ) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py new file mode 100644 index 000000000000..ad6bdba28c75 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py @@ -0,0 +1,34 @@ +import logging + +from fastapi import FastAPI +from models_library.api_schemas_directorv2 import ( + DIRECTOR_V2_RPC_NAMESPACE, +) +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RPCRouter + +from ...modules.rabbitmq import get_rabbitmq_rpc_server +from . import ( + _computations, +) + +_logger = logging.getLogger(__name__) + + +ROUTERS: list[RPCRouter] = [ + _computations.router, +] + + +def setup_rpc_api_routes(app: FastAPI) -> None: + async def startup() -> None: + with log_context( + _logger, + logging.INFO, + msg="Director-v2 startup RPC API Routes", + ): + rpc_server = get_rabbitmq_rpc_server(app) + for router in ROUTERS: + await rpc_server.register_router(router, DIRECTOR_V2_RPC_NAMESPACE, app) + + app.add_event_handler("startup", startup) diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index c53c841183e4..17835676cadd 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -17,6 +17,7 @@ make_http_error_handler_for_exception, ) from ..api.errors.validation_error import http422_error_handler +from ..api.rpc.routes import setup_rpc_api_routes from ..modules import ( catalog, comp_scheduler, @@ -183,6 +184,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: ) if dynamic_scheduler_enabled or computational_backend_enabled: rabbitmq.setup(app) + setup_rpc_api_routes(app) # Requires rabbitmq to be setup first redis.setup(app) if dynamic_scheduler_enabled: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py index 509df01c27fc..75e7a57aa7ed 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py @@ -119,14 +119,14 @@ async def _get_pipeline_dag(project_id: ProjectID, db_engine: Engine) -> nx.DiGr async def schedule_all_pipelines(app: FastAPI) -> None: with log_context(_logger, logging.DEBUG, msg="scheduling pipelines"): db_engine = get_db_engine(app) - runs_to_schedule = await CompRunsRepository.instance(db_engine).list( + runs_to_schedule = await CompRunsRepository.instance(db_engine).list_( filter_by_state=SCHEDULED_STATES, never_scheduled=True, processed_since=SCHEDULER_INTERVAL, ) possibly_lost_scheduled_pipelines = await CompRunsRepository.instance( db_engine - ).list( + ).list_( filter_by_state=SCHEDULED_STATES, scheduled_since=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index 22b8677611fa..28b9373cec43 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -116,7 +116,7 @@ class SortedTasks: async def _triage_changed_tasks( - changed_tasks: list[tuple[_Previous, _Current]] + changed_tasks: list[tuple[_Previous, _Current]], ) -> SortedTasks: started_tasks = [ current @@ -242,9 +242,13 @@ async def _set_schedule_done( async def _set_states_following_failed_to_aborted( self, project_id: ProjectID, dag: nx.DiGraph ) -> dict[NodeIDStr, CompTaskAtDB]: - tasks: dict[NodeIDStr, CompTaskAtDB] = await self._get_pipeline_tasks( - project_id, dag - ) + tasks = await self._get_pipeline_tasks(project_id, dag) + # Perform a reverse topological sort to ensure tasks are ordered from last to first + sorted_node_ids = list(reversed(list(nx.topological_sort(dag)))) + tasks = { + node_id: tasks[node_id] for node_id in sorted_node_ids if node_id in tasks + } + # we need the tasks ordered from the last task to the first node_ids_to_set_as_aborted: set[NodeIDStr] = set() for task in tasks.values(): if task.state == RunningState.FAILED: @@ -651,8 +655,10 @@ async def _schedule_tasks_to_stop( ) -> None: # get any running task and stop them comp_tasks_repo = CompTasksRepository.instance(self.db_engine) - await comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( - project_id + await ( + comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( + project_id + ) ) # stop any remaining running task, these are already submitted if tasks_to_stop := [ diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index a5dc296aa21e..8b26c0448345 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -1,12 +1,15 @@ import datetime import logging -from typing import Any, Final +from typing import Any, Final, cast import arrow import sqlalchemy as sa from aiopg.sa.result import RowProxy +from models_library.api_schemas_directorv2.comp_runs import ComputationRunRpcGet +from models_library.basic_types import IDStr from models_library.projects import ProjectID from models_library.projects_state import RunningState +from models_library.rest_ordering import OrderBy, OrderDirection from models_library.users import UserID from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import PositiveInt @@ -22,7 +25,7 @@ UserNotFoundError, ) from ....models.comp_runs import CompRunsAtDB, RunMetadataDict -from ....utils.db import RUNNING_STATE_TO_DB +from ....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB from ..tables import comp_runs from ._base import BaseRepository @@ -67,7 +70,7 @@ async def get( raise ComputationalRunNotFoundError return CompRunsAtDB.model_validate(row) - async def list( + async def list_( self, *, filter_by_state: set[RunningState] | None = None, @@ -139,6 +142,91 @@ async def list( ) ] + async def list_for_user__only_latest_iterations( + self, + *, + product_name: str, + user_id: UserID, + # pagination + offset: int, + limit: int, + # ordering + order_by: OrderBy | None = None, + ) -> tuple[int, list[ComputationRunRpcGet]]: + # NOTE: Currently, we list only pipelines created by the user themselves. + # If we want to list all pipelines that the user has read access to + # via project access rights, we need to join the `projects_to_groups` + # and `workspaces_access_rights` tables (which will make it slower, but we do + # the same for listing projects). + if order_by is None: + order_by = OrderBy(field=IDStr("run_id")) # default ordering + + base_select_query = sa.select( + comp_runs.c.project_uuid, + comp_runs.c.iteration, + comp_runs.c.result.label("state"), + comp_runs.c.metadata.label("info"), + comp_runs.c.created.label("submitted_at"), + comp_runs.c.started.label("started_at"), + comp_runs.c.ended.label("ended_at"), + ).select_from( + sa.select( + comp_runs.c.project_uuid, + sa.func.max(comp_runs.c.iteration).label( + "latest_iteration" + ), # <-- NOTE: We might create a boolean column with latest iteration for fast retrieval + ) + .where( + (comp_runs.c.user_id == user_id) + & ( + comp_runs.c.metadata["product_name"].astext == product_name + ) # <-- NOTE: We might create a separate column for this for fast retrieval + ) + .group_by(comp_runs.c.project_uuid) + .subquery("latest_runs") + .join( + comp_runs, + sa.and_( + comp_runs.c.project_uuid + == literal_column("latest_runs.project_uuid"), + comp_runs.c.iteration + == literal_column("latest_runs.latest_iteration"), + ), + ) + ) + + # Select total count from base_query + count_query = sa.select(sa.func.count()).select_from( + base_select_query.subquery() + ) + + # Ordering and pagination + if order_by.direction == OrderDirection.ASC: + list_query = base_select_query.order_by( + sa.asc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id + ) + else: + list_query = base_select_query.order_by( + desc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id + ) + list_query = list_query.offset(offset).limit(limit) + + async with self.db_engine.acquire() as conn: + total_count = await conn.scalar(count_query) + + result = await conn.execute(list_query) + items = [ + ComputationRunRpcGet.model_validate( + { + **row, + "state": DB_TO_RUNNING_STATE[row["state"]], + } + ) + async for row in result + ] + + return cast(int, total_count), items + async def create( self, *, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index 5068e144944a..93f39295feda 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -1,15 +1,17 @@ import logging from datetime import datetime -from typing import Any +from typing import Any, cast import arrow import sqlalchemy as sa from aiopg.sa.result import ResultProxy, RowProxy +from models_library.api_schemas_directorv2.comp_runs import ComputationTaskRpcGet from models_library.basic_types import IDStr from models_library.errors import ErrorDict from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState +from models_library.rest_ordering import OrderBy, OrderDirection from models_library.users import UserID from models_library.wallets import WalletInfo from servicelib.logging_utils import log_context @@ -22,7 +24,7 @@ from .....models.comp_tasks import CompTaskAtDB from .....modules.resource_usage_tracker_client import ResourceUsageTrackerClient from .....utils.computations import to_node_class -from .....utils.db import RUNNING_STATE_TO_DB +from .....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB from ....catalog import CatalogClient from ...tables import NodeClass, StateType, comp_tasks from .._base import BaseRepository @@ -75,6 +77,67 @@ async def list_computational_tasks( tasks.append(task_db) return tasks + async def list_computational_tasks_rpc_domain( + self, + *, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, + ) -> tuple[int, list[ComputationTaskRpcGet]]: + if order_by is None: + order_by = OrderBy(field=IDStr("task_id")) # default ordering + + base_select_query = ( + sa.select( + comp_tasks.c.project_id.label("project_uuid"), + comp_tasks.c.node_id, + comp_tasks.c.state, + comp_tasks.c.progress, + comp_tasks.c.image, + comp_tasks.c.start.label("started_at"), + comp_tasks.c.end.label("ended_at"), + ) + .select_from(comp_tasks) + .where( + (comp_tasks.c.project_id == f"{project_id}") + & (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL) + ) + ) + + # Select total count from base_query + count_query = sa.select(sa.func.count()).select_from( + base_select_query.subquery() + ) + + # Ordering and pagination + if order_by.direction == OrderDirection.ASC: + list_query = base_select_query.order_by( + sa.asc(getattr(comp_tasks.c, order_by.field)), comp_tasks.c.task_id + ) + else: + list_query = base_select_query.order_by( + sa.desc(getattr(comp_tasks.c, order_by.field)), comp_tasks.c.task_id + ) + list_query = list_query.offset(offset).limit(limit) + + async with self.db_engine.acquire() as conn: + total_count = await conn.scalar(count_query) + + result = await conn.execute(list_query) + items = [ + ComputationTaskRpcGet.model_validate( + { + **row, + "state": DB_TO_RUNNING_STATE[row["state"]], # Convert the state + } + ) + async for row in result + ] + return cast(int, total_count), items + async def task_exists(self, project_id: ProjectID, node_id: NodeID) -> bool: async with self.db_engine.acquire() as conn: nid: str | None = await conn.scalar( @@ -99,18 +162,18 @@ async def upsert_tasks_from_project( ) -> list[CompTaskAtDB]: # NOTE: really do an upsert here because of issue https://github.com/ITISFoundation/osparc-simcore/issues/2125 async with self.db_engine.acquire() as conn: - list_of_comp_tasks_in_project: list[ - CompTaskAtDB - ] = await _utils.generate_tasks_list_from_project( - project=project, - catalog_client=catalog_client, - published_nodes=published_nodes, - user_id=user_id, - product_name=product_name, - connection=conn, - rut_client=rut_client, - wallet_info=wallet_info, - rabbitmq_rpc_client=rabbitmq_rpc_client, + list_of_comp_tasks_in_project: list[CompTaskAtDB] = ( + await _utils.generate_tasks_list_from_project( + project=project, + catalog_client=catalog_client, + published_nodes=published_nodes, + user_id=user_id, + product_name=product_name, + connection=conn, + rut_client=rut_client, + wallet_info=wallet_info, + rabbitmq_rpc_client=rabbitmq_rpc_client, + ) ) # get current tasks result = await conn.execute( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py index a7cb4e1ba27a..86f6e7037912 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py @@ -58,7 +58,10 @@ async def on_startup() -> None: client_name="director-v2", settings=settings ) app.state.rabbitmq_rpc_client = await RabbitMQRPCClient.create( - client_name="director-v2", settings=settings + client_name="director-v2-rpc-client", settings=settings + ) + app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( + client_name="director-v2-rpc-server", settings=settings ) await app.state.rabbitmq_client.subscribe( @@ -73,6 +76,8 @@ async def on_shutdown() -> None: await app.state.rabbitmq_client.close() if app.state.rabbitmq_rpc_client: await app.state.rabbitmq_rpc_client.close() + if app.state.rabbitmq_rpc_server: + await app.state.rabbitmq_rpc_server.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -92,3 +97,8 @@ def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient: ) raise ConfigurationError(msg=msg) return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_client) + + +def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient: + assert app.state.rabbitmq_rpc_server # nosec + return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server) diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index 6337e7eea884..01790674d091 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -7,7 +7,7 @@ import json import logging import os -from collections.abc import AsyncIterable, AsyncIterator +from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable from copy import deepcopy from datetime import timedelta from pathlib import Path @@ -30,6 +30,8 @@ setenvs_from_envfile, ) from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.rabbitmq import RabbitMQRPCClient +from settings_library.rabbit import RabbitSettings from simcore_service_director_v2.core.application import init_app from simcore_service_director_v2.core.settings import AppSettings from starlette.testclient import ASGI3App, TestClient @@ -226,6 +228,15 @@ async def async_client(initialized_app: FastAPI) -> AsyncIterable[httpx.AsyncCli yield client +@pytest.fixture +async def rpc_client( + rabbit_service: RabbitSettings, + initialized_app: FastAPI, + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + @pytest.fixture() def minimal_app(client: TestClient) -> ASGI3App: # NOTICE that this app triggers events @@ -278,11 +289,19 @@ def fake_workbench_complete_adjacency( @pytest.fixture def disable_rabbitmq(mocker: MockerFixture) -> None: - def mock_setup(app: FastAPI) -> None: + def rabbitmq_mock_setup(app: FastAPI) -> None: app.state.rabbitmq_client = AsyncMock() + def rpc_api_routes_mock_setup(app: FastAPI) -> None: + app.state.rabbitmq_rpc_server = AsyncMock() + + mocker.patch( + "simcore_service_director_v2.modules.rabbitmq.setup", + side_effect=rabbitmq_mock_setup, + ) mocker.patch( - "simcore_service_director_v2.modules.rabbitmq.setup", side_effect=mock_setup + "simcore_service_director_v2.core.application.setup_rpc_api_routes", + side_effect=rpc_api_routes_mock_setup, ) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py new file mode 100644 index 000000000000..c2180f22e0bb --- /dev/null +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -0,0 +1,105 @@ +# pylint: disable=no-value-for-parameter +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-positional-arguments + +from collections.abc import Awaitable, Callable +from datetime import datetime, timezone +from typing import Any + +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.projects import ProjectAtDB +from models_library.projects_state import RunningState +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.director_v2 import ( + computations as rpc_computations, +) +from simcore_postgres_database.models.comp_pipeline import StateType +from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB +from simcore_service_director_v2.models.comp_runs import CompRunsAtDB +from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB + +pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"] +pytest_simcore_ops_services_selection = [ + "adminer", +] + + +async def test_rpc_list_computation_runs_and_tasks( + fake_workbench_without_outputs: dict[str, Any], + fake_workbench_adjacency: dict[str, Any], + registered_user: Callable[..., dict[str, Any]], + project: Callable[..., Awaitable[ProjectAtDB]], + create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], + create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], + create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], + rpc_client: RabbitMQRPCClient, +): + user = registered_user() + proj = await project(user, workbench=fake_workbench_without_outputs) + await create_pipeline( + project_id=f"{proj.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ) + comp_tasks = await create_tasks( + user=user, project=proj, state=StateType.PUBLISHED, progress=None + ) + comp_runs = await create_comp_run( + user=user, project=proj, result=RunningState.PUBLISHED + ) + assert comp_runs + + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 1 + assert isinstance(output, ComputationRunRpcGetPage) + assert output.items[0].iteration == 1 + + comp_runs_2 = await create_comp_run( + user=user, + project=proj, + result=RunningState.PENDING, + started=datetime.now(tz=timezone.utc), + iteration=2, + ) + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 1 + assert isinstance(output, ComputationRunRpcGetPage) + assert output.items[0].iteration == 2 + assert output.items[0].started_at is not None + assert output.items[0].ended_at is None + + comp_runs_3 = await create_comp_run( + user=user, + project=proj, + result=RunningState.SUCCESS, + started=datetime.now(tz=timezone.utc), + ended=datetime.now(tz=timezone.utc), + iteration=3, + ) + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 1 + assert isinstance(output, ComputationRunRpcGetPage) + assert output.items[0].iteration == 3 + assert output.items[0].ended_at is not None + + # Tasks + + output = await rpc_computations.list_computations_latest_iteration_tasks_page( + rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid + ) + assert output + assert output.total == 4 + assert isinstance(output, ComputationTaskRpcGetPage) + assert len(output.items) == 4 diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py index 1dea4f59cbef..b369490a75e4 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py @@ -79,10 +79,10 @@ async def test_list( run_metadata: RunMetadataDict, faker: Faker, ): - assert await CompRunsRepository(aiopg_engine).list() == [] + assert await CompRunsRepository(aiopg_engine).list_() == [] published_project = await publish_project() - assert await CompRunsRepository(aiopg_engine).list() == [] + assert await CompRunsRepository(aiopg_engine).list_() == [] created = await CompRunsRepository(aiopg_engine).create( user_id=published_project.user["id"], @@ -91,7 +91,7 @@ async def test_list( metadata=run_metadata, use_on_demand_clusters=faker.pybool(), ) - assert await CompRunsRepository(aiopg_engine).list() == [created] + assert await CompRunsRepository(aiopg_engine).list_() == [created] created = [created] + await asyncio.gather( *( @@ -106,7 +106,7 @@ async def test_list( ) ) assert sorted( - await CompRunsRepository(aiopg_engine).list(), key=lambda x: x.iteration + await CompRunsRepository(aiopg_engine).list_(), key=lambda x: x.iteration ) == sorted(created, key=lambda x: x.iteration) # test with filter of state @@ -114,13 +114,13 @@ async def test_list( s for s in RunningState if s is not RunningState.PUBLISHED } assert ( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( filter_by_state=any_state_but_published ) == [] ) assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( filter_by_state={RunningState.PUBLISHED} ), key=lambda x: x.iteration, @@ -128,7 +128,7 @@ async def test_list( # test with never scheduled filter, let's create a bunch of scheduled entries, assert sorted( - await CompRunsRepository(aiopg_engine).list(never_scheduled=True), + await CompRunsRepository(aiopg_engine).list_(never_scheduled=True), key=lambda x: x.iteration, ) == sorted(created, key=lambda x: x.iteration) comp_runs_marked_for_scheduling = random.sample(created, k=25) @@ -145,7 +145,7 @@ async def test_list( # filter them away created = [r for r in created if r not in comp_runs_marked_for_scheduling] assert sorted( - await CompRunsRepository(aiopg_engine).list(never_scheduled=True), + await CompRunsRepository(aiopg_engine).list_(never_scheduled=True), key=lambda x: x.iteration, ) == sorted(created, key=lambda x: x.iteration) @@ -170,7 +170,7 @@ async def test_list( # since they were just marked as processed now, we will get nothing assert ( sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, processed_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, @@ -200,7 +200,7 @@ async def test_list( ) # now we should get them assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, processed_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, @@ -233,14 +233,14 @@ async def test_list( ) # so the processed ones shall remain assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, processed_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, ) == sorted(comp_runs_marked_as_processed, key=lambda x: x.iteration) # the ones waiting for scheduling now assert sorted( - await CompRunsRepository(aiopg_engine).list( + await CompRunsRepository(aiopg_engine).list_( never_scheduled=False, scheduled_since=SCHEDULER_INTERVAL ), key=lambda x: x.iteration, diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index fb06b116c70b..bb4adba21357 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -1451,7 +1451,6 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData: project_uuid=running_project.project.uuid, task_ids=[ running_project.tasks[1].node_id, - running_project.tasks[2].node_id, running_project.tasks[3].node_id, ], expected_state=reboot_state.expected_task_state_group1, @@ -1460,7 +1459,7 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData: await assert_comp_tasks( sqlalchemy_async_engine, project_uuid=running_project.project.uuid, - task_ids=[running_project.tasks[4].node_id], + task_ids=[running_project.tasks[2].node_id, running_project.tasks[4].node_id], expected_state=reboot_state.expected_task_state_group2, expected_progress=reboot_state.expected_task_progress_group2, ) diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 9b0d03e6eed5..fb5056ad2e70 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -247,7 +247,7 @@ async def _() -> PublishedProject: @pytest.fixture async def published_project( - publish_project: Callable[[], Awaitable[PublishedProject]] + publish_project: Callable[[], Awaitable[PublishedProject]], ) -> PublishedProject: return await publish_project() diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 4ba28dd4bee8..73a7abf032c0 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2536,6 +2536,89 @@ paths: responses: '204': description: Successful Response + /v0/computations/-/iterations/latest: + get: + tags: + - computations + - projects + summary: List Computations Latest Iteration + operationId: list_computations_latest_iteration + parameters: + - name: order_by + in: query + required: false + schema: + type: string + contentMediaType: application/json + contentSchema: {} + default: '{"field":"created","direction":"asc"}' + title: Order By + - name: limit + in: query + required: false + schema: + type: integer + default: 20 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_list_ComputationRunRestGet__' + /v0/computations/{project_id}/iterations/latest/tasks: + get: + tags: + - computations + - projects + summary: List Computations Latest Iteration Tasks + operationId: list_computations_latest_iteration_tasks + parameters: + - name: project_id + in: path + required: true + schema: + type: string + format: uuid + title: Project Id + - name: order_by + in: query + required: false + schema: + type: string + contentMediaType: application/json + contentSchema: {} + default: '{"field":"start","direction":"asc"}' + title: Order By + - name: limit + in: query + required: false + schema: + type: integer + default: 20 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_list_ComputationTaskRestGet__' /v0/projects/{project_id}:xport: post: tags: @@ -8641,6 +8724,46 @@ components: - submitted - url title: ComputationGet + ComputationRunRestGet: + properties: + projectUuid: + type: string + format: uuid + title: Projectuuid + iteration: + type: integer + title: Iteration + state: + $ref: '#/components/schemas/RunningState' + info: + type: object + title: Info + submittedAt: + type: string + format: date-time + title: Submittedat + startedAt: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Startedat + endedAt: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Endedat + type: object + required: + - projectUuid + - iteration + - state + - info + - submittedAt + - startedAt + - endedAt + title: ComputationRunRestGet ComputationStart: properties: force_restart: @@ -8674,6 +8797,46 @@ components: required: - pipeline_id title: ComputationStarted + ComputationTaskRestGet: + properties: + projectUuid: + type: string + format: uuid + title: Projectuuid + nodeId: + type: string + format: uuid + title: Nodeid + state: + $ref: '#/components/schemas/RunningState' + progress: + type: number + title: Progress + image: + type: object + title: Image + startedAt: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Startedat + endedAt: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Endedat + type: object + required: + - projectUuid + - nodeId + - state + - progress + - image + - startedAt + - endedAt + title: ComputationTaskRestGet ConnectServiceToPricingPlanBodyParams: properties: serviceKey: @@ -10026,6 +10189,38 @@ components: title: Error type: object title: Envelope[list[ApiKeyGet]] + Envelope_list_ComputationRunRestGet__: + properties: + data: + anyOf: + - items: + $ref: '#/components/schemas/ComputationRunRestGet' + type: array + - type: 'null' + title: Data + error: + anyOf: + - {} + - type: 'null' + title: Error + type: object + title: Envelope[list[ComputationRunRestGet]] + Envelope_list_ComputationTaskRestGet__: + properties: + data: + anyOf: + - items: + $ref: '#/components/schemas/ComputationTaskRestGet' + type: array + - type: 'null' + title: Data + error: + anyOf: + - {} + - type: 'null' + title: Error + type: object + title: Envelope[list[ComputationTaskRestGet]] Envelope_list_DatasetMetaData__: properties: data: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py new file mode 100644 index 000000000000..1e5d035f83c5 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -0,0 +1,65 @@ +from aiohttp import web +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGetPage, + ComputationTaskRpcGetPage, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from pydantic import NonNegativeInt +from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations + +from ..projects.api import check_user_project_permission +from ..rabbitmq import get_rabbitmq_rpc_client + + +async def list_computations_latest_iteration( + app: web.Application, + product_name: ProductName, + user_id: UserID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> ComputationRunRpcGetPage: + """Returns the list of computations (only latest iterations)""" + rpc_client = get_rabbitmq_rpc_client(app) + return await computations.list_computations_latest_iteration_page( + rpc_client, + product_name=product_name, + user_id=user_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + + +async def list_computations_latest_iteration_tasks( + app: web.Application, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> ComputationTaskRpcGetPage: + """Returns the list of tasks for the latest iteration of a computation""" + + await check_user_project_permission( + app, project_id=project_id, user_id=user_id, product_name=product_name + ) + + rpc_client = get_rabbitmq_rpc_client(app) + return await computations.list_computations_latest_iteration_tasks_page( + rpc_client, + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py index 1bed5207e944..7fa642dff799 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/__init__.py @@ -1 +1 @@ -from . import rest +from . import computations_rest, rest diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py new file mode 100644 index 000000000000..9631d10f27ed --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -0,0 +1,130 @@ +import logging + +from aiohttp import web +from models_library.api_schemas_webserver.computations import ( + ComputationRunListQueryParams, + ComputationRunRestGet, + ComputationTaskListQueryParams, + ComputationTaskPathParams, + ComputationTaskRestGet, +) +from models_library.rest_base import RequestParameters +from models_library.rest_ordering import OrderBy +from models_library.rest_pagination import Page +from models_library.rest_pagination_utils import paginate_data +from models_library.users import UserID +from pydantic import Field +from servicelib.aiohttp.requests_validation import ( + parse_request_path_parameters_as, + parse_request_query_parameters_as, +) +from servicelib.mimetype_constants import MIMETYPE_APPLICATION_JSON +from servicelib.request_keys import RQT_USERID_KEY +from servicelib.rest_constants import RESPONSE_MODEL_POLICY + +from ..._meta import API_VTAG as VTAG +from ...constants import RQ_PRODUCT_KEY +from ...login.decorators import login_required +from ...security.decorators import permission_required +from .. import _computations_service + +_logger = logging.getLogger(__name__) + + +routes = web.RouteTableDef() + + +class ComputationsRequestContext(RequestParameters): + user_id: UserID = Field(..., alias=RQT_USERID_KEY) # type: ignore[literal-required] + product_name: str = Field(..., alias=RQ_PRODUCT_KEY) # type: ignore[literal-required] + + +@routes.get( + f"/{VTAG}/computations/-/iterations/latest", + name="list_computations_latest_iteration", +) +@login_required +@permission_required("services.pipeline.*") +@permission_required("project.read") +async def list_computations_latest_iteration(request: web.Request) -> web.Response: + + req_ctx = ComputationsRequestContext.model_validate(request) + query_params: ComputationRunListQueryParams = parse_request_query_parameters_as( + ComputationRunListQueryParams, request + ) + + _get = await _computations_service.list_computations_latest_iteration( + request.app, + product_name=req_ctx.product_name, + user_id=req_ctx.user_id, + # pagination + offset=query_params.offset, + limit=query_params.limit, + # ordering + order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), + ) + + page = Page[ComputationRunRestGet].model_validate( + paginate_data( + chunk=[ + ComputationRunRestGet.model_validate(task, from_attributes=True) + for task in _get.items + ], + total=_get.total, + limit=query_params.limit, + offset=query_params.offset, + request_url=request.url, + ) + ) + + return web.Response( + text=page.model_dump_json(**RESPONSE_MODEL_POLICY), + content_type=MIMETYPE_APPLICATION_JSON, + ) + + +@routes.get( + f"/{VTAG}/computations/{{project_id}}/iterations/latest/tasks", + name="list_computations_latest_iteration_tasks", +) +@login_required +@permission_required("services.pipeline.*") +@permission_required("project.read") +async def list_computations_latest_iteration_tasks( + request: web.Request, +) -> web.Response: + + req_ctx = ComputationsRequestContext.model_validate(request) + query_params: ComputationTaskListQueryParams = parse_request_query_parameters_as( + ComputationTaskListQueryParams, request + ) + path_params = parse_request_path_parameters_as(ComputationTaskPathParams, request) + + _get = await _computations_service.list_computations_latest_iteration_tasks( + request.app, + product_name=req_ctx.product_name, + user_id=req_ctx.user_id, + project_id=path_params.project_id, + # pagination + offset=query_params.offset, + limit=query_params.limit, + # ordering + order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), + ) + + page = Page[ComputationTaskRestGet].model_validate( + paginate_data( + chunk=[ + ComputationTaskRestGet.model_validate(task, from_attributes=True) + for task in _get.items + ], + total=_get.total, + limit=query_params.limit, + offset=query_params.offset, + request_url=request.url, + ) + ) + return web.Response( + text=page.model_dump_json(**RESPONSE_MODEL_POLICY), + content_type=MIMETYPE_APPLICATION_JSON, + ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py index fbd6448036d5..c9d5e9f13459 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/rest.py @@ -27,9 +27,9 @@ from ...products import products_web from ...security.decorators import permission_required from ...utils_aiohttp import envelope_json_response -from .. import _service +from .. import _director_v2_service from .._client import DirectorV2RestClient -from .._service_abc import get_project_run_policy +from .._director_v2_abc_service import get_project_run_policy from ._rest_exceptions import handle_rest_requests_exceptions _logger = logging.getLogger(__name__) @@ -58,13 +58,13 @@ async def start_computation(request: web.Request) -> web.Response: force_restart = body_params.force_restart # Group properties - group_properties = await _service.get_group_properties( + group_properties = await _director_v2_service.get_group_properties( request.app, product_name=req_ctx.product_name, user_id=req_ctx.user_id ) # Get wallet information product = products_web.get_current_product(request) - wallet_info = await _service.get_wallet_info( + wallet_info = await _director_v2_service.get_wallet_info( request.app, product=product, user_id=req_ctx.user_id, diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_service_abc_default.py b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_default_service.py similarity index 94% rename from services/web/server/src/simcore_service_webserver/director_v2/_service_abc_default.py rename to services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_default_service.py index 2a2cab073098..88e340fc3437 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_service_abc_default.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_default_service.py @@ -9,7 +9,7 @@ from aiohttp import web from models_library.projects import CommitID, ProjectID -from ._service_abc import AbstractProjectRunPolicy +from ._director_v2_abc_service import AbstractProjectRunPolicy log = logging.getLogger(__name__) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_service_abc.py b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_service.py similarity index 100% rename from services/web/server/src/simcore_service_webserver/director_v2/_service_abc.py rename to services/web/server/src/simcore_service_webserver/director_v2/_director_v2_abc_service.py diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_director_v2_service.py similarity index 100% rename from services/web/server/src/simcore_service_webserver/director_v2/_service.py rename to services/web/server/src/simcore_service_webserver/director_v2/_director_v2_service.py diff --git a/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py b/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py index 89a42fe0fa27..b750700e5ee8 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/director_v2_service.py @@ -1,5 +1,10 @@ from ._client import is_healthy -from ._service import ( +from ._director_v2_abc_service import ( + AbstractProjectRunPolicy, + get_project_run_policy, + set_project_run_policy, +) +from ._director_v2_service import ( create_or_update_pipeline, delete_pipeline, get_batch_tasks_outputs, @@ -7,11 +12,6 @@ is_pipeline_running, stop_pipeline, ) -from ._service_abc import ( - AbstractProjectRunPolicy, - get_project_run_policy, - set_project_run_policy, -) from .exceptions import DirectorServiceError # director-v2 module internal API diff --git a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py index 08df74ee1d84..d847156841b4 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/plugin.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/plugin.py @@ -11,8 +11,8 @@ from ..rest.plugin import setup_rest from . import _controller from ._client import DirectorV2RestClient, get_directorv2_client, set_directorv2_client -from ._service_abc import set_project_run_policy -from ._service_abc_default import DefaultProjectRunPolicy +from ._director_v2_abc_default_service import DefaultProjectRunPolicy +from ._director_v2_abc_service import set_project_run_policy _logger = logging.getLogger(__name__) @@ -38,6 +38,7 @@ def setup_director_v2(app: web.Application): if is_setup_completed(setup_rest.metadata()["module_name"], app): set_project_run_policy(app, DefaultProjectRunPolicy()) app.router.add_routes(_controller.rest.routes) + app.router.add_routes(_controller.computations_rest.routes) else: _logger.warning( diff --git a/services/web/server/tests/unit/conftest.py b/services/web/server/tests/unit/conftest.py index 7f3d23819916..d99275c43573 100644 --- a/services/web/server/tests/unit/conftest.py +++ b/services/web/server/tests/unit/conftest.py @@ -7,17 +7,20 @@ import json import sys -from collections.abc import Callable, Iterable +from collections.abc import AsyncIterator, Callable, Iterable from pathlib import Path from typing import Any import pytest import yaml +from aiohttp.test_utils import TestClient from models_library.products import ProductName from pytest_mock import MockFixture, MockType -from pytest_simcore.helpers.webserver_projects import empty_project_data +from pytest_simcore.helpers.webserver_login import UserInfoDict +from pytest_simcore.helpers.webserver_projects import NewProject, empty_project_data from simcore_service_webserver.application_settings_utils import AppConfigDict from simcore_service_webserver.constants import FRONTEND_APP_DEFAULT +from simcore_service_webserver.projects.models import ProjectDict CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent @@ -93,3 +96,23 @@ def disabled_setup_garbage_collector(mocker: MockFixture) -> MockType: @pytest.fixture(scope="session") def product_name() -> ProductName: return ProductName(FRONTEND_APP_DEFAULT) + + +@pytest.fixture +async def user_project( + client: TestClient, + fake_project: ProjectDict, + logged_user: UserInfoDict, + tests_data_dir: Path, + osparc_product_name: str, +) -> AsyncIterator[ProjectDict]: + async with NewProject( + fake_project, + client.app, + user_id=logged_user["id"], + product_name=osparc_product_name, + tests_data_dir=tests_data_dir, + ) as project: + print("-----> added project", project["name"]) + yield project + print("<----- removed project", project["name"]) diff --git a/services/web/server/tests/unit/with_dbs/01/conftest.py b/services/web/server/tests/unit/with_dbs/01/conftest.py index 0d136f206374..5e77d24abf3f 100644 --- a/services/web/server/tests/unit/with_dbs/01/conftest.py +++ b/services/web/server/tests/unit/with_dbs/01/conftest.py @@ -2,7 +2,6 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable - import pytest diff --git a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py index cb40a779378a..0e51d74ecca4 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py +++ b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py @@ -6,7 +6,18 @@ import pytest from aiohttp.test_utils import TestClient from faker import Faker +from models_library.api_schemas_directorv2.comp_runs import ( + ComputationRunRpcGet, + ComputationRunRpcGetPage, + ComputationTaskRpcGet, + ComputationTaskRpcGetPage, +) +from models_library.api_schemas_webserver.computations import ( + ComputationRunRestGet, + ComputationTaskRestGet, +) from models_library.projects import ProjectID +from pytest_mock import MockerFixture from pytest_simcore.helpers.assert_checks import assert_status from pytest_simcore.helpers.webserver_login import LoggedUser from pytest_simcore.helpers.webserver_parametrizations import ( @@ -16,6 +27,7 @@ from pytest_simcore.services_api_mocks_for_aiohttp_clients import AioResponsesMock from servicelib.aiohttp import status from simcore_service_webserver.db.models import UserRole +from simcore_service_webserver.projects.models import ProjectDict @pytest.fixture @@ -117,3 +129,74 @@ async def test_stop_computation( else expected.no_content ), ) + + +@pytest.fixture +def mock_rpc_list_computations_latest_iteration_tasks( + mocker: MockerFixture, +) -> ComputationRunRpcGetPage: + return mocker.patch( + "simcore_service_webserver.director_v2._computations_service.computations.list_computations_latest_iteration_page", + spec=True, + return_value=ComputationRunRpcGetPage( + items=[ + ComputationRunRpcGet.model_validate( + ComputationRunRpcGet.model_config["json_schema_extra"]["examples"][ + 0 + ] + ) + ], + total=1, + ), + ) + + +@pytest.fixture +def mock_rpc_list_computations_latest_iteration_tasks_page( + mocker: MockerFixture, +) -> ComputationTaskRpcGetPage: + return mocker.patch( + "simcore_service_webserver.director_v2._computations_service.computations.list_computations_latest_iteration_tasks_page", + spec=True, + return_value=ComputationTaskRpcGetPage( + items=[ + ComputationTaskRpcGet.model_validate( + ComputationTaskRpcGet.model_config["json_schema_extra"]["examples"][ + 0 + ] + ) + ], + total=1, + ), + ) + + +@pytest.mark.parametrize(*standard_role_response(), ids=str) +async def test_list_computations_latest_iteration( + director_v2_service_mock: AioResponsesMock, + user_project: ProjectDict, + client: TestClient, + logged_user: LoggedUser, + user_role: UserRole, + expected: ExpectedResponse, + mock_rpc_list_computations_latest_iteration_tasks: None, + mock_rpc_list_computations_latest_iteration_tasks_page: None, +): + assert client.app + url = client.app.router["list_computations_latest_iteration"].url_for() + resp = await client.get(f"{url}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationRunRestGet.model_validate(data[0]) + + url = client.app.router["list_computations_latest_iteration_tasks"].url_for( + project_id=f"{user_project['uuid']}" + ) + resp = await client.get(f"{url}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationTaskRestGet.model_validate(data[0]) diff --git a/services/web/server/tests/unit/with_dbs/02/conftest.py b/services/web/server/tests/unit/with_dbs/02/conftest.py index 674da4193f42..339e154e95e5 100644 --- a/services/web/server/tests/unit/with_dbs/02/conftest.py +++ b/services/web/server/tests/unit/with_dbs/02/conftest.py @@ -104,26 +104,6 @@ def mock_catalog_api( } -@pytest.fixture -async def user_project( - client: TestClient, - fake_project: ProjectDict, - logged_user: UserInfoDict, - tests_data_dir: Path, - osparc_product_name: str, -) -> AsyncIterator[ProjectDict]: - async with NewProject( - fake_project, - client.app, - user_id=logged_user["id"], - product_name=osparc_product_name, - tests_data_dir=tests_data_dir, - ) as project: - print("-----> added project", project["name"]) - yield project - print("<----- removed project", project["name"]) - - @pytest.fixture async def shared_project( client,