diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/computations.py b/packages/models-library/src/models_library/api_schemas_directorv2/computations.py index 8a892676f63..00a6549b3fa 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/computations.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/computations.py @@ -13,7 +13,7 @@ from ..basic_types import IDStr from ..projects import ProjectID -from ..projects_nodes_io import NodeID +from ..projects_nodes_io import NodeID, SimcoreS3FileID from ..projects_pipeline import ComputationTask from ..users import UserID from ..wallets import WalletInfo @@ -126,6 +126,30 @@ class TaskLogFileGet(BaseModel): ] = None +class TaskLogFileIdGet(BaseModel): + task_id: NodeID + file_id: SimcoreS3FileID | None + + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "task_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "file_id": "1c46752c-b096-11ea-a3c4-02420a00392e/3fa85f64-5717-4562-b3fc-2c963f66afa6/logs/task_logs.txt", + }, + { + "task_id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", + "file_id": "1c46752c-b096-11ea-a3c4-02420a00392e/6ba7b810-9dad-11d1-80b4-00c04fd430c8/logs/debug.log", + }, + { + "task_id": "6ba7b811-9dad-11d1-80b4-00c04fd430c8", + "file_id": None, + }, + ] + } + ) + + class TasksSelection(BaseModel): nodes_ids: list[NodeID] diff --git a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py index 6c6c0609328..4c37ad32df4 100644 --- a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py +++ b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py @@ -30,6 +30,17 @@ class AsyncJobResult(BaseModel): class AsyncJobGet(BaseModel): + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "job_name": "export_data_task", + } + ] + } + ) + job_id: AsyncJobId job_name: AsyncJobName @@ -42,6 +53,18 @@ class AsyncJobAbort(BaseModel): class AsyncJobFilter(AsyncJobFilterBase): """Data for controlling access to an async job""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "product_name": "osparc", + "user_id": 123, + "client_name": "web_client", + } + ] + }, + ) + product_name: ProductName user_id: UserID client_name: Annotated[ diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/director_v2_rpc_server.py b/packages/pytest-simcore/src/pytest_simcore/helpers/director_v2_rpc_server.py new file mode 100644 index 00000000000..8b8d9b6cc4b --- /dev/null +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/director_v2_rpc_server.py @@ -0,0 +1,30 @@ +# pylint: disable=no-self-use +# pylint: disable=not-context-manager +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +from models_library.api_schemas_directorv2.computations import TaskLogFileIdGet +from models_library.projects import ProjectID +from pydantic import TypeAdapter, validate_call +from pytest_mock import MockType +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient + + +class DirectorV2SideEffects: + # pylint: disable=no-self-use + @validate_call(config={"arbitrary_types_allowed": True}) + async def get_computation_task_log_file_ids( + self, + rpc_client: RabbitMQRPCClient | MockType, + *, + project_id: ProjectID, + ) -> list[TaskLogFileIdGet]: + assert rpc_client + assert project_id + + return TypeAdapter(list[TaskLogFileIdGet]).validate_python( + TaskLogFileIdGet.model_json_schema()["examples"], + ) diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/storage_rpc_server.py b/packages/pytest-simcore/src/pytest_simcore/helpers/storage_rpc_server.py new file mode 100644 index 00000000000..72ce830784a --- /dev/null +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/storage_rpc_server.py @@ -0,0 +1,48 @@ +# pylint: disable=no-self-use +# pylint: disable=not-context-manager +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +from typing import Literal + +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobFilter, + AsyncJobGet, +) +from models_library.api_schemas_webserver.storage import PathToExport +from models_library.products import ProductName +from models_library.users import UserID +from pydantic import TypeAdapter, validate_call +from pytest_mock import MockType +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient + + +class StorageSideEffects: + # pylint: disable=no-self-use + @validate_call(config={"arbitrary_types_allowed": True}) + async def start_export_data( + self, + rabbitmq_rpc_client: RabbitMQRPCClient | MockType, + *, + user_id: UserID, + product_name: ProductName, + paths_to_export: list[PathToExport], + export_as: Literal["path", "download_link"], + ) -> tuple[AsyncJobGet, AsyncJobFilter]: + assert rabbitmq_rpc_client + assert user_id + assert product_name + assert paths_to_export + assert export_as + + async_job_get = TypeAdapter(AsyncJobGet).validate_python( + AsyncJobGet.model_json_schema()["examples"][0], + ) + async_job_filter = TypeAdapter(AsyncJobFilter).validate_python( + AsyncJobFilter.model_json_schema()["examples"][0], + ) + + return async_job_get, async_job_filter diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations_tasks.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations_tasks.py new file mode 100644 index 00000000000..5d12960444c --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations_tasks.py @@ -0,0 +1,41 @@ +# pylint: disable=too-many-arguments +import logging +from typing import Final + +from models_library.api_schemas_directorv2 import ( + DIRECTOR_V2_RPC_NAMESPACE, +) +from models_library.api_schemas_directorv2.computations import TaskLogFileIdGet +from models_library.projects import ProjectID +from models_library.rabbitmq_basic_types import RPCMethodName +from pydantic import TypeAdapter + +from ....logging_utils import log_decorator +from ... import RabbitMQRPCClient + +_logger = logging.getLogger(__name__) + + +_RPC_METHOD_NAME_ADAPTER: TypeAdapter[RPCMethodName] = TypeAdapter(RPCMethodName) + +_GET_COMPUTATION_TASK_LOG_FILE_IDS: Final[RPCMethodName] = ( + _RPC_METHOD_NAME_ADAPTER.validate_python("get_computation_task_log_file_ids") +) + + +@log_decorator(_logger, level=logging.DEBUG) +async def get_computation_task_log_file_ids( + rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID +) -> list[TaskLogFileIdGet]: + """ + Raises: + ComputationalTaskMissingError + """ + result = await rabbitmq_rpc_client.request( + DIRECTOR_V2_RPC_NAMESPACE, + _GET_COMPUTATION_TASK_LOG_FILE_IDS, + project_id=project_id, + ) + assert isinstance(result, list) # nosec + assert all(isinstance(item, TaskLogFileIdGet) for item in result) # nosec + return result diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/errors.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/errors.py new file mode 100644 index 00000000000..31e8dfb90bb --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/errors.py @@ -0,0 +1,9 @@ +from ..._errors import RPCInterfaceError + + +class BaseRpcError(RPCInterfaceError): + pass + + +class ComputationalTaskMissingError(BaseRpcError): + msg_template = "Computational run not found for project {project_id}" diff --git a/services/api-server/VERSION b/services/api-server/VERSION index 2003b639c40..78bc1abd14f 100644 --- a/services/api-server/VERSION +++ b/services/api-server/VERSION @@ -1 +1 @@ -0.9.2 +0.10.0 diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index 2543e1d0a79..40b9594f3e5 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "osparc.io public API", "description": "osparc-simcore public API specifications", - "version": "0.9.2" + "version": "0.10.0" }, "paths": { "/v0/meta": { @@ -5704,6 +5704,65 @@ } } }, + "/v0/function_jobs/{function_job_id}/log": { + "post": { + "tags": [ + "function_jobs" + ], + "summary": "Get Function Job Logs Task", + "description": "Get function job logs task\n\nNew in *version 0.10-rc1*", + "operationId": "get_function_job_logs_task", + "security": [ + { + "HTTPBasic": [] + } + ], + "parameters": [ + { + "name": "function_job_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "format": "uuid", + "title": "Function Job Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/TaskGet" + } + } + } + }, + "404": { + "description": "Function job not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/v0/function_job_collections": { "get": { "tags": [ @@ -7833,10 +7892,253 @@ } } } + }, + "/v0/tasks": { + "get": { + "tags": [ + "tasks" + ], + "summary": "List Tasks", + "description": "List all tasks\n\nNew in *version 0.10-rc1*", + "operationId": "list_tasks", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ApiServerEnvelope_list_TaskGet__" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + } + }, + "security": [ + { + "HTTPBasic": [] + } + ] + } + }, + "/v0/tasks/{task_id}": { + "get": { + "tags": [ + "tasks" + ], + "summary": "Get Task Status", + "description": "Get task status\n\nNew in *version 0.10-rc1*", + "operationId": "get_task_status", + "security": [ + { + "HTTPBasic": [] + } + ], + "parameters": [ + { + "name": "task_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "format": "uuid", + "title": "Task Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/TaskStatus" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, + "/v0/tasks/{task_id}:cancel": { + "post": { + "tags": [ + "tasks" + ], + "summary": "Cancel Task", + "description": "Cancel task\n\nNew in *version 0.10-rc1*", + "operationId": "cancel_task", + "security": [ + { + "HTTPBasic": [] + } + ], + "parameters": [ + { + "name": "task_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "format": "uuid", + "title": "Task Id" + } + } + ], + "responses": { + "204": { + "description": "Successful Response" + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, + "/v0/tasks/{task_id}/result": { + "get": { + "tags": [ + "tasks" + ], + "summary": "Get Task Result", + "description": "Get task result\n\nNew in *version 0.10-rc1*", + "operationId": "get_task_result", + "security": [ + { + "HTTPBasic": [] + } + ], + "parameters": [ + { + "name": "task_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "format": "uuid", + "title": "Task Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/TaskResult" + } + } + } + }, + "404": { + "description": "Task result not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + }, + "409": { + "description": "Task is cancelled", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorGet" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } } }, "components": { "schemas": { + "ApiServerEnvelope_list_TaskGet__": { + "properties": { + "data": { + "items": { + "$ref": "#/components/schemas/TaskGet" + }, + "type": "array", + "title": "Data" + } + }, + "type": "object", + "required": [ + "data" + ], + "title": "ApiServerEnvelope[list[TaskGet]]" + }, "Body_abort_multipart_upload_v0_files__file_id__abort_post": { "properties": { "client_file": { @@ -11152,6 +11454,127 @@ "kind": "input" } }, + "TaskGet": { + "properties": { + "task_id": { + "type": "string", + "title": "Task Id" + }, + "task_name": { + "type": "string", + "title": "Task Name" + }, + "status_href": { + "type": "string", + "title": "Status Href" + }, + "result_href": { + "type": "string", + "title": "Result Href" + }, + "abort_href": { + "type": "string", + "title": "Abort Href" + } + }, + "type": "object", + "required": [ + "task_id", + "task_name", + "status_href", + "result_href", + "abort_href" + ], + "title": "TaskGet" + }, + "TaskProgress": { + "properties": { + "task_id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Task Id" + }, + "message": { + "type": "string", + "title": "Message", + "default": "" + }, + "percent": { + "type": "number", + "maximum": 1.0, + "minimum": 0.0, + "title": "Percent", + "default": 0.0 + } + }, + "type": "object", + "title": "TaskProgress", + "description": "Helps the user to keep track of the progress. Progress is expected to be\ndefined as a float bound between 0.0 and 1.0" + }, + "TaskResult": { + "properties": { + "result": { + "anyOf": [ + {}, + { + "type": "null" + } + ], + "title": "Result" + }, + "error": { + "anyOf": [ + {}, + { + "type": "null" + } + ], + "title": "Error" + } + }, + "type": "object", + "required": [ + "result", + "error" + ], + "title": "TaskResult" + }, + "TaskStatus": { + "properties": { + "task_progress": { + "$ref": "#/components/schemas/TaskProgress" + }, + "done": { + "type": "boolean", + "title": "Done" + }, + "started": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": "null" + } + ], + "title": "Started" + } + }, + "type": "object", + "required": [ + "task_progress", + "done", + "started" + ], + "title": "TaskStatus" + }, "UnitExtraInfoTier": { "properties": { "CPU": { diff --git a/services/api-server/setup.cfg b/services/api-server/setup.cfg index 6bd6174290b..013330e985d 100644 --- a/services/api-server/setup.cfg +++ b/services/api-server/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.9.2 +current_version = 0.10.0 commit = True message = services/api-server version: {current_version} → {new_version} tag = False @@ -11,12 +11,12 @@ commit_args = --no-verify asyncio_mode = auto asyncio_default_fixture_loop_scope = function addopts = --strict-markers -markers = +markers = slow: marks tests as slow (deselect with '-m "not slow"') acceptance_test: "marks tests as 'acceptance tests' i.e. does the system do what the user expects? Typically those are workflows." testit: "marks test to run during development" [mypy] -plugins = +plugins = pydantic.mypy sqlalchemy.ext.mypy.plugin diff --git a/services/api-server/src/simcore_service_api_server/_service_jobs.py b/services/api-server/src/simcore_service_api_server/_service_jobs.py index faac214897f..a92ccb41fa0 100644 --- a/services/api-server/src/simcore_service_api_server/_service_jobs.py +++ b/services/api-server/src/simcore_service_api_server/_service_jobs.py @@ -1,8 +1,10 @@ import logging from collections.abc import Callable from dataclasses import dataclass +from pathlib import Path from common_library.exclude import as_dict_exclude_none +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet from models_library.api_schemas_webserver.projects import ProjectCreateNew, ProjectGet from models_library.products import ProductName from models_library.projects import ProjectID @@ -15,9 +17,9 @@ from models_library.users import UserID from pydantic import HttpUrl from servicelib.logging_utils import log_context -from simcore_service_api_server.models.basic_types import NameValueTuple -from .models.schemas.jobs import Job, JobInputs +from .models.basic_types import NameValueTuple +from .models.schemas.jobs import Job, JobID, JobInputs from .models.schemas.programs import Program from .models.schemas.solvers import Solver from .services_http.solver_job_models_converters import ( @@ -26,6 +28,8 @@ create_new_project_for_job, ) from .services_http.webserver import AuthSession +from .services_rpc.director_v2 import DirectorV2Service +from .services_rpc.storage import StorageService from .services_rpc.wb_api_server import WbApiRpcClient _logger = logging.getLogger(__name__) @@ -35,6 +39,8 @@ class JobService: _web_rest_client: AuthSession _web_rpc_client: WbApiRpcClient + _storage_rpc_client: StorageService + _directorv2_rpc_client: DirectorV2Service user_id: UserID product_name: ProductName @@ -147,3 +153,17 @@ async def create_job( job_id=job.id, ) return job, new_project + + async def start_log_export( + self, + job_id: JobID, + ) -> AsyncJobGet: + file_ids = await self._directorv2_rpc_client.get_computation_task_log_file_ids( + project_id=job_id + ) + async_job_get = await self._storage_rpc_client.start_data_export( + paths_to_export=[ + Path(elm.file_id) for elm in file_ids if elm.file_id is not None + ], + ) + return async_job_get diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/services.py b/services/api-server/src/simcore_service_api_server/api/dependencies/services.py index 35ba2ab74c7..0dd5c4911c6 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/services.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/services.py @@ -14,6 +14,8 @@ from ..._service_studies import StudyService from ...services_http.webserver import AuthSession from ...services_rpc.catalog import CatalogService +from ...services_rpc.director_v2 import DirectorV2Service +from ...services_rpc.storage import StorageService from ...services_rpc.wb_api_server import WbApiRpcClient from ...utils.client_base import BaseServiceClientApi from .authentication import get_current_user_id, get_product_name @@ -61,9 +63,29 @@ def get_catalog_service( ) +def get_storage_service( + rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> StorageService: + return StorageService( + _rpc_client=rpc_client, + _user_id=user_id, + _product_name=product_name, + ) + + +def get_directorv2_service( + rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)], +) -> DirectorV2Service: + return DirectorV2Service(_rpc_client=rpc_client) + + def get_job_service( web_rest_api: Annotated[AuthSession, Depends(get_webserver_session)], web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + storage_service: Annotated[StorageService, Depends(get_storage_service)], + directorv2_service: Annotated[DirectorV2Service, Depends(get_directorv2_service)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], ) -> JobService: @@ -74,6 +96,8 @@ def get_job_service( return JobService( _web_rest_client=web_rest_api, _web_rpc_client=web_rpc_api, + _storage_rpc_client=storage_service, + _directorv2_rpc_client=directorv2_service, user_id=user_id, product_name=product_name, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index cd461b95fb3..3c3f7cec2c3 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -1,7 +1,8 @@ from typing import Annotated, Final -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, Depends, FastAPI, status from fastapi_pagination.api import create_page +from models_library.api_schemas_long_running_tasks.tasks import TaskGet from models_library.api_schemas_webserver.functions import ( Function, FunctionClass, @@ -17,8 +18,10 @@ ) from models_library.products import ProductName from models_library.users import UserID +from servicelib.fastapi.dependencies import get_app from sqlalchemy.ext.asyncio import AsyncEngine +from ..._service_jobs import JobService from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet from ...services_http.director_v2 import DirectorV2Api @@ -27,7 +30,7 @@ from ...services_rpc.wb_api_server import WbApiRpcClient from ..dependencies.authentication import get_current_user_id, get_product_name from ..dependencies.database import get_db_asyncpg_engine -from ..dependencies.services import get_api_client +from ..dependencies.services import get_api_client, get_job_service from ..dependencies.webserver_http import get_webserver_session from ..dependencies.webserver_rpc import get_wb_api_rpc_client from . import solvers_jobs, solvers_jobs_read, studies_jobs @@ -297,3 +300,64 @@ async def function_job_outputs( ).results ) raise UnsupportedFunctionClassError(function_class=function.function_class) + + +@function_job_router.post( + "/{function_job_id:uuid}/log", + response_model=TaskGet, + responses={**_COMMON_FUNCTION_JOB_ERROR_RESPONSES}, + description=create_route_description( + base="Get function job logs task", + changelog=[ + FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), + ], + ), +) +async def get_function_job_logs_task( + function_job_id: FunctionJobID, + app: Annotated[FastAPI, Depends(get_app)], + job_service: Annotated[JobService, Depends(get_job_service)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + product_name: Annotated[ProductName, Depends(get_product_name)], +): + function, function_job = await get_function_from_functionjobid( + wb_api_rpc=wb_api_rpc, + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + ) + app_router = app.router + + if ( + function.function_class == FunctionClass.PROJECT + and function_job.function_class == FunctionClass.PROJECT + ): + async_job_get = await job_service.start_log_export( + job_id=function_job.project_job_id, + ) + _task_id = f"{async_job_get.job_id}" + return TaskGet( + task_id=_task_id, + task_name=async_job_get.job_name, + status_href=app_router.url_path_for("get_task_status", task_id=_task_id), + abort_href=app_router.url_path_for("cancel_task", task_id=_task_id), + result_href=app_router.url_path_for("get_task_result", task_id=_task_id), + ) + + if ( + function.function_class == FunctionClass.SOLVER + and function_job.function_class == FunctionClass.SOLVER + ): + async_job_get = await job_service.start_log_export( + job_id=function_job.solver_job_id, + ) + _task_id = f"{async_job_get.job_id}" + return TaskGet( + task_id=_task_id, + task_name=async_job_get.job_name, + status_href=app_router.url_path_for("get_task_status", task_id=_task_id), + abort_href=app_router.url_path_for("cancel_task", task_id=_task_id), + result_href=app_router.url_path_for("get_task_result", task_id=_task_id), + ) + raise UnsupportedFunctionClassError(function_class=function.function_class) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/tasks.py b/services/api-server/src/simcore_service_api_server/api/routes/tasks.py index ad80cb5daf4..ff0f12f2d69 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/tasks.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/tasks.py @@ -56,7 +56,7 @@ def _get_job_filter(user_id: UserID, product_name: ProductName) -> AsyncJobFilte FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), ], ), - include_in_schema=False, # TO BE RELEASED in 0.10-rc1 + include_in_schema=True, ) async def list_tasks( app: Annotated[FastAPI, Depends(get_app)], @@ -96,7 +96,7 @@ async def list_tasks( FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), ], ), - include_in_schema=False, # TO BE RELEASED in 0.10-rc1 + include_in_schema=True, ) async def get_task_status( task_id: AsyncJobId, @@ -128,7 +128,7 @@ async def get_task_status( FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), ], ), - include_in_schema=False, # TO BE RELEASED in 0.10-rc1 + include_in_schema=True, ) async def cancel_task( task_id: AsyncJobId, @@ -162,7 +162,7 @@ async def cancel_task( FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), ], ), - include_in_schema=False, # TO BE RELEASED in 0.10-rc1 + include_in_schema=True, ) async def get_task_result( task_id: AsyncJobId, diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/director_v2.py b/services/api-server/src/simcore_service_api_server/services_rpc/director_v2.py new file mode 100644 index 00000000000..cb3315c8116 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/services_rpc/director_v2.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from functools import partial + +from models_library.projects import ProjectID +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations_tasks +from servicelib.rabbitmq.rpc_interfaces.director_v2.errors import ( + ComputationalTaskMissingError, +) + +from ..exceptions.backend_errors import JobNotFoundError +from ..exceptions.service_errors_utils import service_exception_mapper + +_exception_mapper = partial(service_exception_mapper, service_name="DirectorV2") + + +@dataclass(frozen=True, kw_only=True) +class DirectorV2Service: + _rpc_client: RabbitMQRPCClient + + @_exception_mapper( + rpc_exception_map={ComputationalTaskMissingError: JobNotFoundError} + ) + async def get_computation_task_log_file_ids(self, *, project_id: ProjectID): + return await computations_tasks.get_computation_task_log_file_ids( + self._rpc_client, project_id=project_id + ) diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/storage.py b/services/api-server/src/simcore_service_api_server/services_rpc/storage.py new file mode 100644 index 00000000000..32c8d38e49b --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/services_rpc/storage.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass +from functools import partial + +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet +from models_library.api_schemas_webserver.storage import PathToExport +from models_library.products import ProductName +from models_library.users import UserID +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.storage import simcore_s3 as storage_rpc + +from ..exceptions.service_errors_utils import service_exception_mapper + +_exception_mapper = partial(service_exception_mapper, service_name="Storage") + + +@dataclass(frozen=True, kw_only=True) +class StorageService: + _rpc_client: RabbitMQRPCClient + _user_id: UserID + _product_name: ProductName + + @_exception_mapper(rpc_exception_map={}) + async def start_data_export( + self, + paths_to_export: list[PathToExport], + ) -> AsyncJobGet: + async_job_get, _ = await storage_rpc.start_export_data( + self._rpc_client, + user_id=self._user_id, + product_name=self._product_name, + paths_to_export=paths_to_export, + export_as="download_link", + ) + return async_job_get diff --git a/services/api-server/tests/unit/api_functions/conftest.py b/services/api-server/tests/unit/api_functions/conftest.py index 1df264f0e39..f25334c5a28 100644 --- a/services/api-server/tests/unit/api_functions/conftest.py +++ b/services/api-server/tests/unit/api_functions/conftest.py @@ -29,6 +29,8 @@ from models_library.functions import ( RegisteredFunctionJobCollection, RegisteredSolverFunction, + RegisteredSolverFunctionJob, + SolverFunctionJob, ) from models_library.functions_errors import FunctionIDNotFoundError from models_library.projects import ProjectID @@ -155,7 +157,7 @@ def mock_registered_solver_function( @pytest.fixture -def mock_function_job( +def mock_project_function_job( mock_registered_project_function: RegisteredFunction, ) -> FunctionJob: mock_function_job = { @@ -171,12 +173,40 @@ def mock_function_job( @pytest.fixture -def mock_registered_function_job( - mock_function_job: FunctionJob, +def mock_registered_project_function_job( + mock_project_function_job: FunctionJob, ) -> RegisteredFunctionJob: return RegisteredProjectFunctionJob( **{ - **mock_function_job.dict(), + **mock_project_function_job.dict(), + "uid": str(uuid4()), + "created_at": datetime.datetime.now(datetime.UTC), + } + ) + + +@pytest.fixture +def mock_solver_function_job( + mock_registered_solver_function: RegisteredFunction, +) -> FunctionJob: + return SolverFunctionJob( + title="Test Function Job", + description="A test function job", + function_uid=mock_registered_solver_function.uid, + inputs={"key": "value"}, + outputs=None, + function_class=FunctionClass.SOLVER, + solver_job_id=ProjectID(f"{uuid4()}"), + ) + + +@pytest.fixture +def mock_registered_solver_function_job( + mock_solver_function_job: FunctionJob, +) -> RegisteredFunctionJob: + return RegisteredSolverFunctionJob( + **{ + **mock_solver_function_job.dict(), "uid": str(uuid4()), "created_at": datetime.datetime.now(datetime.UTC), } @@ -185,15 +215,17 @@ def mock_registered_function_job( @pytest.fixture def mock_function_job_collection( - mock_registered_function_job: RegisteredFunctionJob, + mock_registered_project_function_job: RegisteredFunctionJob, ) -> FunctionJobCollection: mock_function_job_collection = { "title": "Test Function Job Collection", "description": "A test function job collection", - "function_uid": mock_registered_function_job.function_uid, + "function_uid": mock_registered_project_function_job.function_uid, "function_class": FunctionClass.PROJECT, "project_id": str(uuid4()), - "function_job_ids": [mock_registered_function_job.uid for _ in range(5)], + "function_job_ids": [ + mock_registered_project_function_job.uid for _ in range(5) + ], } return FunctionJobCollection(**mock_function_job_collection) diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index 98acc73352d..72751aacc6b 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -16,19 +16,19 @@ import respx from faker import Faker from httpx import AsyncClient -from models_library.api_schemas_webserver.functions import ( +from models_library.api_schemas_long_running_tasks.tasks import TaskGet +from models_library.functions import ( FunctionJobCollection, + FunctionUserAccessRights, + FunctionUserApiAccessRights, ProjectFunction, ProjectFunctionJob, + RegisteredFunction, + RegisteredFunctionJob, RegisteredFunctionJobCollection, RegisteredProjectFunction, RegisteredProjectFunctionJob, ) -from models_library.functions import ( - FunctionUserAccessRights, - FunctionUserApiAccessRights, - RegisteredFunctionJob, -) from models_library.functions_errors import ( FunctionIDNotFoundError, FunctionReadAccessDeniedError, @@ -320,62 +320,63 @@ async def test_delete_function( async def test_register_function_job( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_function_job: ProjectFunctionJob, - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_project_function_job: ProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: """Test the register_function_job endpoint.""" mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_function_job + "register_function_job", mock_registered_project_function_job ) response = await client.post( f"{API_VTAG}/function_jobs", - json=mock_function_job.model_dump(mode="json"), + json=mock_project_function_job.model_dump(mode="json"), auth=auth, ) assert response.status_code == status.HTTP_200_OK assert ( RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_function_job + == mock_registered_project_function_job ) async def test_get_function_job( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_function_job + "get_function_job", mock_registered_project_function_job ) # Now, get the function job response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", + auth=auth, ) assert response.status_code == status.HTTP_200_OK assert ( RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_function_job + == mock_registered_project_function_job ) async def test_list_function_jobs( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: mock_handler_in_functions_rpc_interface( "list_function_jobs", ( - [mock_registered_function_job for _ in range(5)], + [mock_registered_project_function_job for _ in range(5)], PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), ), ) @@ -387,14 +388,14 @@ async def test_list_function_jobs( assert len(data) == 5 assert ( RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_function_job + == mock_registered_project_function_job ) async def test_list_function_jobs_with_function_filter( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, mock_registered_project_function: RegisteredProjectFunction, auth: httpx.BasicAuth, ) -> None: @@ -402,7 +403,7 @@ async def test_list_function_jobs_with_function_filter( mock_handler_in_functions_rpc_interface( "list_function_jobs", ( - [mock_registered_function_job for _ in range(5)], + [mock_registered_project_function_job for _ in range(5)], PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), ), ) @@ -417,14 +418,14 @@ async def test_list_function_jobs_with_function_filter( assert len(data) == 5 assert ( RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_function_job + == mock_registered_project_function_job ) async def test_delete_function_job( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: @@ -432,7 +433,8 @@ async def test_delete_function_job( # Now, delete the function job response = await client.delete( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", + auth=auth, ) assert response.status_code == status.HTTP_200_OK @@ -696,7 +698,7 @@ async def test_run_project_function_parent_info( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_project_function: RegisteredProjectFunction, - mock_registered_function_job: RegisteredFunctionJob, + mock_registered_project_function_job: RegisteredFunctionJob, auth: httpx.BasicAuth, user_id: UserID, mocked_webserver_rest_api_base: respx.MockRouter, @@ -746,7 +748,7 @@ def _default_side_effect( ) mock_handler_in_functions_rpc_interface("find_cached_function_jobs", []) mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_function_job + "register_function_job", mock_registered_project_function_job ) mock_handler_in_functions_rpc_interface( "get_functions_user_api_access_rights", @@ -788,7 +790,7 @@ async def test_map_function_parent_info( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_project_function: RegisteredProjectFunction, - mock_registered_function_job: RegisteredFunctionJob, + mock_registered_project_function_job: RegisteredFunctionJob, auth: httpx.BasicAuth, user_id: UserID, mocked_webserver_rest_api_base: respx.MockRouter, @@ -842,7 +844,7 @@ def _default_side_effect( ) mock_handler_in_functions_rpc_interface("find_cached_function_jobs", []) mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_function_job + "register_function_job", mock_registered_project_function_job ) mock_handler_in_functions_rpc_interface( "get_functions_user_api_access_rights", @@ -879,3 +881,55 @@ def _default_side_effect( if expected_status_code == status.HTTP_200_OK: assert side_effect_checks["headers_checked"] == True assert response.status_code == expected_status_code + + +async def test_export_logs_project_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function: RegisteredFunction, + mock_registered_project_function_job: RegisteredFunctionJob, + mocked_directorv2_rpc_api: dict[str, MockType], + mocked_storage_rpc_api: dict[str, MockType], + auth: httpx.BasicAuth, + user_id: UserID, +): + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_project_function + ) + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_project_function_job + ) + + response = await client.post( + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}/log", + auth=auth, + ) + + assert response.status_code == status.HTTP_200_OK + TaskGet.model_validate(response.json()) + + +async def test_export_logs_solver_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_solver_function: RegisteredFunction, + mock_registered_solver_function_job: RegisteredFunctionJob, + mocked_directorv2_rpc_api: dict[str, MockType], + mocked_storage_rpc_api: dict[str, MockType], + auth: httpx.BasicAuth, + user_id: UserID, +): + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_solver_function + ) + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_solver_function_job + ) + + response = await client.post( + f"{API_VTAG}/function_jobs/{mock_registered_solver_function_job.uid}/log", + auth=auth, + ) + + assert response.status_code == status.HTTP_200_OK + TaskGet.model_validate(response.json()) diff --git a/services/api-server/tests/unit/conftest.py b/services/api-server/tests/unit/conftest.py index 84837222595..eb84d42623e 100644 --- a/services/api-server/tests/unit/conftest.py +++ b/services/api-server/tests/unit/conftest.py @@ -48,8 +48,10 @@ from pydantic import EmailStr, HttpUrl, TypeAdapter from pytest_mock import MockerFixture, MockType from pytest_simcore.helpers.catalog_rpc_server import CatalogRpcSideEffects +from pytest_simcore.helpers.director_v2_rpc_server import DirectorV2SideEffects from pytest_simcore.helpers.host import get_localhost_ip from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict +from pytest_simcore.helpers.storage_rpc_server import StorageSideEffects from pytest_simcore.helpers.webserver_rpc_server import WebserverRpcSideEffects from pytest_simcore.simcore_webserver_projects_rest_api import GET_PROJECT from requests.auth import HTTPBasicAuth @@ -611,6 +613,92 @@ def mocked_catalog_rpc_api( return mocks +@pytest.fixture +def directorv2_rpc_side_effects(request) -> Any: + if "param" in dir(request) and request.param is not None: + return request.param + return DirectorV2SideEffects() + + +@pytest.fixture +def mocked_directorv2_rpc_api( + mocked_app_dependencies: None, + mocker: MockerFixture, + directorv2_rpc_side_effects: Any, +) -> dict[str, MockType]: + """ + Mocks the director-v2's simcore service RPC API for testing purposes. + """ + from servicelib.rabbitmq.rpc_interfaces.director_v2 import ( + computations_tasks as directorv2_rpc, # keep import here + ) + + mocks = {} + + # Get all callable methods from the side effects class that are not built-ins + side_effect_methods = [ + method_name + for method_name in dir(directorv2_rpc_side_effects) + if not method_name.startswith("_") + and callable(getattr(directorv2_rpc_side_effects, method_name)) + ] + + # Create mocks for each method in directorv2_rpc that has a corresponding side effect + for method_name in side_effect_methods: + if hasattr(directorv2_rpc, method_name): + mocks[method_name] = mocker.patch.object( + directorv2_rpc, + method_name, + autospec=True, + side_effect=getattr(directorv2_rpc_side_effects, method_name), + ) + + return mocks + + +@pytest.fixture +def storage_rpc_side_effects(request) -> Any: + if "param" in dir(request) and request.param is not None: + return request.param + return StorageSideEffects() + + +@pytest.fixture +def mocked_storage_rpc_api( + mocked_app_dependencies: None, + mocker: MockerFixture, + storage_rpc_side_effects: Any, +) -> dict[str, MockType]: + """ + Mocks the storage's simcore service RPC API for testing purposes. + """ + from servicelib.rabbitmq.rpc_interfaces.storage import ( + simcore_s3 as storage_rpc, # keep import here + ) + + mocks = {} + + # Get all callable methods from the side effects class that are not built-ins + side_effect_methods = [ + method_name + for method_name in dir(storage_rpc_side_effects) + if not method_name.startswith("_") + and callable(getattr(storage_rpc_side_effects, method_name)) + ] + + # Create mocks for each method in storage_rpc that has a corresponding side effect + for method_name in side_effect_methods: + if hasattr(storage_rpc, method_name): + mocks[method_name] = mocker.patch.object( + storage_rpc, + method_name, + autospec=True, + side_effect=getattr(storage_rpc_side_effects, method_name), + ) + + return mocks + + # # Other Mocks # diff --git a/services/api-server/tests/unit/service/conftest.py b/services/api-server/tests/unit/service/conftest.py index 542f234d69e..10fc78b4474 100644 --- a/services/api-server/tests/unit/service/conftest.py +++ b/services/api-server/tests/unit/service/conftest.py @@ -22,6 +22,8 @@ from simcore_service_api_server._service_studies import StudyService from simcore_service_api_server.services_http.webserver import AuthSession from simcore_service_api_server.services_rpc.catalog import CatalogService +from simcore_service_api_server.services_rpc.director_v2 import DirectorV2Service +from simcore_service_api_server.services_rpc.storage import StorageService from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient @@ -71,6 +73,24 @@ def wb_api_rpc_client( return WbApiRpcClient(_client=mocked_rpc_client) +@pytest.fixture +def director_v2_rpc_client( + mocked_rpc_client: MockType, +) -> DirectorV2Service: + return DirectorV2Service(_rpc_client=mocked_rpc_client) + + +@pytest.fixture +def storage_rpc_client( + mocked_rpc_client: MockType, + user_id: UserID, + product_name: ProductName, +) -> StorageService: + return StorageService( + _rpc_client=mocked_rpc_client, _user_id=user_id, _product_name=product_name + ) + + @pytest.fixture def auth_session( mocker: MockerFixture, @@ -91,6 +111,8 @@ async def _create_project(project: ProjectCreateNew, **kwargs): @pytest.fixture def job_service( auth_session: AuthSession, + director_v2_rpc_client: DirectorV2Service, + storage_rpc_client: StorageService, wb_api_rpc_client: WbApiRpcClient, product_name: ProductName, user_id: UserID, @@ -98,6 +120,8 @@ def job_service( return JobService( _web_rest_client=auth_session, _web_rpc_client=wb_api_rpc_client, + _storage_rpc_client=storage_rpc_client, + _directorv2_rpc_client=director_v2_rpc_client, user_id=user_id, product_name=product_name, ) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 0b4a71301e0..30cb838be89 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -53,6 +53,7 @@ ComputationalRunNotFoundError, ComputationalSchedulerError, ConfigurationError, + PipelineTaskMissingError, PricingPlanUnitNotFoundError, ProjectNotFoundError, WalletNotEnoughCreditsError, @@ -70,6 +71,7 @@ from ...modules.db.repositories.users import UsersRepository from ...modules.resource_usage_tracker_client import ResourceUsageTrackerClient from ...utils import computations as utils +from ...utils.computations_tasks import validate_pipeline from ...utils.dags import ( compute_pipeline_details, compute_pipeline_started_timestamp, @@ -83,7 +85,6 @@ from ..dependencies.database import get_repository from ..dependencies.rabbitmq import rabbitmq_rpc_client from ..dependencies.rut_client import get_rut_client -from .computations_tasks import analyze_pipeline _PIPELINE_ABORT_TIMEOUT_S: Final[timedelta] = timedelta(seconds=30) @@ -453,9 +454,15 @@ async def get_computation( # check that project actually exists await project_repo.get_project(project_id) - pipeline_dag, all_tasks, _filtered_tasks = await analyze_pipeline( - project_id, comp_pipelines_repo, comp_tasks_repo - ) + try: + pipeline_dag, all_tasks, _filtered_tasks = await validate_pipeline( + project_id, comp_pipelines_repo, comp_tasks_repo + ) + except PipelineTaskMissingError as exc: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="The tasks referenced by the pipeline are missing", + ) from exc # create the complete DAG graph complete_dag = create_complete_dag_from_tasks(all_tasks) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py index 45f24d13835..a97b2b60f65 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations_tasks.py @@ -7,9 +7,8 @@ """ import logging -from typing import Annotated, NamedTuple +from typing import Annotated -import networkx as nx from fastapi import APIRouter, Depends, HTTPException from models_library.api_schemas_directorv2.computations import ( TaskLogFileGet, @@ -22,11 +21,11 @@ from servicelib.utils import logged_gather from starlette import status -from ...models.comp_pipelines import CompPipelineAtDB -from ...models.comp_tasks import CompTaskAtDB +from ...core.errors import PipelineTaskMissingError from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository from ...modules.db.repositories.comp_tasks import CompTasksRepository from ...utils import dask as dask_utils +from ...utils.computations_tasks import validate_pipeline from ..dependencies.database import get_repository log = logging.getLogger(__name__) @@ -37,48 +36,6 @@ # HELPERS ------------------------------------------------------------------- -class PipelineInfo(NamedTuple): - # NOTE: kept old names for legacy but should rename for clarity - pipeline_dag: nx.DiGraph - all_tasks: list[CompTaskAtDB] # all nodes in pipeline - filtered_tasks: list[CompTaskAtDB] # nodes that actually run i.e. part of the dag - - -async def analyze_pipeline( - project_id: ProjectID, - comp_pipelines_repo: CompPipelinesRepository, - comp_tasks_repo: CompTasksRepository, -) -> PipelineInfo: - """ - Loads and validates data from pipelines and tasks tables and - reports it back as PipelineInfo - """ - - # NOTE: Here it is assumed the project exists in comp_tasks/comp_pipeline - # get the project pipeline - pipeline_at_db: CompPipelineAtDB = await comp_pipelines_repo.get_pipeline( - project_id - ) - pipeline_dag: nx.DiGraph = pipeline_at_db.get_graph() - - # get the project task states - all_tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id) - - # filter the tasks by the effective pipeline - filtered_tasks = [ - t for t in all_tasks if f"{t.node_id}" in set(pipeline_dag.nodes()) - ] - - # check that we have the expected tasks - if len(filtered_tasks) != len(pipeline_dag): - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail="The tasks referenced by the pipeline are missing", - ) - - return PipelineInfo(pipeline_dag, all_tasks, filtered_tasks) - - # ROUTES HANDLERS -------------------------------------------------------------- @@ -101,7 +58,13 @@ async def get_all_tasks_log_files( Each log is only available when the corresponding task is done """ # gets computation task ids - info = await analyze_pipeline(project_id, comp_pipelines_repo, comp_tasks_repo) + try: + info = await validate_pipeline(project_id, comp_pipelines_repo, comp_tasks_repo) + except PipelineTaskMissingError as exc: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="The tasks referenced by the pipeline are missing", + ) from exc iter_task_ids = (t.node_id for t in info.filtered_tasks) tasks_logs_files: list[TaskLogFileGet] = await logged_gather( diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations_tasks.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations_tasks.py new file mode 100644 index 00000000000..203f2134946 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations_tasks.py @@ -0,0 +1,48 @@ +from pathlib import Path + +from fastapi import FastAPI +from models_library.api_schemas_directorv2.computations import TaskLogFileIdGet +from models_library.projects import ProjectID +from servicelib.rabbitmq import RPCRouter +from servicelib.rabbitmq.rpc_interfaces.director_v2.errors import ( + ComputationalTaskMissingError, +) +from simcore_sdk.node_ports_common import data_items_utils + +from ...constants import LOGS_FILE_NAME +from ...core.errors import PipelineNotFoundError, PipelineTaskMissingError +from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository +from ...modules.db.repositories.comp_tasks import CompTasksRepository +from ...utils.computations_tasks import validate_pipeline + +router = RPCRouter() + + +@router.expose(reraise_if_error_type=(ComputationalTaskMissingError,)) +async def get_computation_task_log_file_ids( + app: FastAPI, + project_id: ProjectID, +) -> list[TaskLogFileIdGet]: + comp_pipelines_repo = CompPipelinesRepository.instance(db_engine=app.state.engine) + comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine) + + try: + info = await validate_pipeline( + project_id=project_id, + comp_pipelines_repo=comp_pipelines_repo, + comp_tasks_repo=comp_tasks_repo, + ) + except (PipelineNotFoundError, PipelineTaskMissingError) as exc: + raise ComputationalTaskMissingError(project_id=project_id) from exc + + iter_task_ids = (t.node_id for t in info.filtered_tasks) + + return [ + TaskLogFileIdGet( + task_id=node_id, + file_id=data_items_utils.create_simcore_file_id( + Path(LOGS_FILE_NAME), f"{project_id}", f"{node_id}" + ), + ) + for node_id in iter_task_ids + ] diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py index ad6bdba28c7..c7cd84acd05 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/routes.py @@ -8,16 +8,12 @@ from servicelib.rabbitmq import RPCRouter from ...modules.rabbitmq import get_rabbitmq_rpc_server -from . import ( - _computations, -) +from . import _computations, _computations_tasks _logger = logging.getLogger(__name__) -ROUTERS: list[RPCRouter] = [ - _computations.router, -] +ROUTERS: list[RPCRouter] = [_computations.router, _computations_tasks.router] def setup_rpc_api_routes(app: FastAPI) -> None: diff --git a/services/director-v2/src/simcore_service_director_v2/constants.py b/services/director-v2/src/simcore_service_director_v2/constants.py index d4a5690d9bb..a5a3a8f72d4 100644 --- a/services/director-v2/src/simcore_service_director_v2/constants.py +++ b/services/director-v2/src/simcore_service_director_v2/constants.py @@ -15,13 +15,17 @@ # - local # - itisfoundation # - 10.0.0.0:8473 (IP & Port) -DYNAMIC_SIDECAR_DOCKER_IMAGE_RE = ( +DYNAMIC_SIDECAR_DOCKER_IMAGE_RE: Final[str] = ( r"^(([_a-zA-Z0-9:.-]+)/)?(dynamic-sidecar):([_a-zA-Z0-9.-]+)$" ) -REGEX_DY_SERVICE_SIDECAR = rf"^{DYNAMIC_SIDECAR_SERVICE_PREFIX}_[a-zA-Z0-9-_]*" -REGEX_DY_SERVICE_PROXY = rf"^{DYNAMIC_PROXY_SERVICE_PREFIX}_[a-zA-Z0-9-_]*" +LOGS_FILE_NAME: Final[str] = "logs.zip" -UNDEFINED_STR_METADATA = "undefined-metadata" -UNDEFINED_DOCKER_LABEL = "undefined-label" -UNDEFINED_API_BASE_URL = "https://api.local" +REGEX_DY_SERVICE_SIDECAR: Final[str] = ( + rf"^{DYNAMIC_SIDECAR_SERVICE_PREFIX}_[a-zA-Z0-9-_]*" +) +REGEX_DY_SERVICE_PROXY: Final[str] = rf"^{DYNAMIC_PROXY_SERVICE_PREFIX}_[a-zA-Z0-9-_]*" + +UNDEFINED_STR_METADATA: Final[str] = "undefined-metadata" +UNDEFINED_DOCKER_LABEL: Final[str] = "undefined-label" +UNDEFINED_API_BASE_URL: Final[str] = "https://api.local" diff --git a/services/director-v2/src/simcore_service_director_v2/core/errors.py b/services/director-v2/src/simcore_service_director_v2/core/errors.py index cd992d5155f..2e5f556c4ed 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/errors.py +++ b/services/director-v2/src/simcore_service_director_v2/core/errors.py @@ -67,6 +67,10 @@ class WalletNotEnoughCreditsError(DirectorError): msg_template = "Wallet '{wallet_name}' has {wallet_credit_amount} credits." +class PipelineTaskMissingError(DirectorError): + msg_template = "Pipeline associated with project_id {project_id} is missing task(s)" + + # # SCHEDULER ERRORS # diff --git a/services/director-v2/src/simcore_service_director_v2/utils/computations_tasks.py b/services/director-v2/src/simcore_service_director_v2/utils/computations_tasks.py new file mode 100644 index 00000000000..f37ec66936c --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/utils/computations_tasks.py @@ -0,0 +1,66 @@ +from typing import NamedTuple + +import networkx as nx +from models_library.projects import ProjectID +from simcore_service_director_v2.core.errors import PipelineTaskMissingError + +from ..models.comp_pipelines import CompPipelineAtDB +from ..models.comp_tasks import CompTaskAtDB +from ..modules.db.repositories.comp_pipelines import CompPipelinesRepository +from ..modules.db.repositories.comp_tasks import CompTasksRepository + + +class PipelineInfo(NamedTuple): + pipeline_dag: nx.DiGraph + all_tasks: list[CompTaskAtDB] + filtered_tasks: list[CompTaskAtDB] + + +async def _get_pipeline_info( + *, + project_id: ProjectID, + comp_pipelines_repo: CompPipelinesRepository, + comp_tasks_repo: CompTasksRepository, +) -> PipelineInfo: + + # NOTE: Here it is assumed the project exists in comp_tasks/comp_pipeline + # get the project pipeline + pipeline_at_db: CompPipelineAtDB = await comp_pipelines_repo.get_pipeline( + project_id + ) + pipeline_dag: nx.DiGraph = pipeline_at_db.get_graph() + + # get the project task states + all_tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id) + + # filter the tasks by the effective pipeline + filtered_tasks = [ + t for t in all_tasks if f"{t.node_id}" in set(pipeline_dag.nodes()) + ] + + return PipelineInfo(pipeline_dag, all_tasks, filtered_tasks) + + +async def validate_pipeline( + project_id: ProjectID, + comp_pipelines_repo: CompPipelinesRepository, + comp_tasks_repo: CompTasksRepository, +) -> PipelineInfo: + """ + Loads and validates data from pipelines and tasks tables and + reports it back as PipelineInfo + + raises PipelineTaskMissingError + """ + + pipeline_info = await _get_pipeline_info( + project_id=project_id, + comp_pipelines_repo=comp_pipelines_repo, + comp_tasks_repo=comp_tasks_repo, + ) + + # check that we have the expected tasks + if len(pipeline_info.filtered_tasks) != len(pipeline_info.pipeline_dag): + raise PipelineTaskMissingError(project_id=project_id) + + return pipeline_info diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 7ffbcd6e9f8..e170adf3e6e 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -40,7 +40,7 @@ from simcore_sdk.node_ports_v2.links import ItemValue as _NPItemValue from sqlalchemy.ext.asyncio import AsyncEngine -from ..constants import UNDEFINED_API_BASE_URL, UNDEFINED_DOCKER_LABEL +from ..constants import LOGS_FILE_NAME, UNDEFINED_API_BASE_URL, UNDEFINED_DOCKER_LABEL from ..core.errors import ( ComputationalBackendNotConnectedError, ComputationalSchedulerChangedError, @@ -261,9 +261,6 @@ async def compute_output_data_schema( return TaskOutputDataSchema.model_validate(output_data_schema) -_LOGS_FILE_NAME = "logs.zip" - - async def compute_service_log_file_upload_link( user_id: UserID, project_id: ProjectID, @@ -274,7 +271,7 @@ async def compute_service_log_file_upload_link( user_id=user_id, project_id=f"{project_id}", node_id=f"{node_id}", - file_name=_LOGS_FILE_NAME, + file_name=LOGS_FILE_NAME, link_type=file_link_type, file_size=ByteSize(0), # will create a single presigned link sha256_checksum=None, @@ -375,7 +372,7 @@ async def _get_service_log_file_download_link( user_id=user_id, project_id=f"{project_id}", node_id=f"{node_id}", - file_name=_LOGS_FILE_NAME, + file_name=LOGS_FILE_NAME, link_type=file_link_type, ) return value_link @@ -444,10 +441,10 @@ async def clean_task_output_and_log_files_if_invalid( user_id=user_id, project_id=f"{project_id}", node_id=f"{node_id}", - file_name=_LOGS_FILE_NAME, + file_name=LOGS_FILE_NAME, ): await port_utils.delete_target_link( - user_id, f"{project_id}", f"{node_id}", _LOGS_FILE_NAME + user_id, f"{project_id}", f"{node_id}", LOGS_FILE_NAME ) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations_tasks.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations_tasks.py new file mode 100644 index 00000000000..8117b89e04a --- /dev/null +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations_tasks.py @@ -0,0 +1,75 @@ +# pylint: disable=unused-argument + +from collections.abc import Awaitable, Callable +from typing import Any + +import pytest +from faker import Faker +from models_library.api_schemas_directorv2.computations import TaskLogFileIdGet +from models_library.projects import ProjectAtDB, ProjectID +from models_library.projects_state import RunningState +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.director_v2 import ( + computations_tasks as rpc_computations_tasks, +) +from servicelib.rabbitmq.rpc_interfaces.director_v2.errors import ( + ComputationalTaskMissingError, +) +from simcore_postgres_database.models.comp_pipeline import StateType +from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB +from simcore_service_director_v2.models.comp_runs import CompRunsAtDB +from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB + +_faker = Faker() + +pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"] +pytest_simcore_ops_services_selection = [ + "adminer", +] + + +async def test_get_computation_task_log_file_ids( + fake_workbench_without_outputs: dict[str, Any], + fake_workbench_adjacency: dict[str, Any], + create_registered_user: Callable[..., dict[str, Any]], + create_project: Callable[..., Awaitable[ProjectAtDB]], + create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], + create_tasks_from_project: Callable[..., Awaitable[list[CompTaskAtDB]]], + create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], + rpc_client: RabbitMQRPCClient, + with_product: dict[str, Any], +): + user = create_registered_user() + proj = await create_project(user, workbench=fake_workbench_without_outputs) + await create_pipeline( + project_id=f"{proj.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ) + comp_tasks = await create_tasks_from_project( + user=user, project=proj, state=StateType.PUBLISHED, progress=None + ) + comp_runs = await create_comp_run( + user=user, + project=proj, + result=RunningState.PUBLISHED, + dag_adjacency_list=fake_workbench_adjacency, + ) + assert comp_runs + + output = await rpc_computations_tasks.get_computation_task_log_file_ids( + rpc_client, project_id=proj.uuid + ) + assert isinstance(output, list) + assert len(output) <= len( + comp_tasks + ) # output doesn't contain e.g. filepickers and dynamic services + assert all(isinstance(elm, TaskLogFileIdGet) for elm in output) + + +async def test_get_computation_task_log_file_ids_no_pipeline( + rpc_client: RabbitMQRPCClient, +): + with pytest.raises(ComputationalTaskMissingError): + await rpc_computations_tasks.get_computation_task_log_file_ids( + rpc_client, project_id=ProjectID(_faker.uuid4()) + ) diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py index 66035ccfcda..e7924b6ab4e 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py @@ -48,7 +48,7 @@ from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB from simcore_service_director_v2.modules.dask_clients_pool import DaskClientsPool from simcore_service_director_v2.utils.dask import ( - _LOGS_FILE_NAME, + LOGS_FILE_NAME, _to_human_readable_resource_values, check_if_cluster_is_able_to_run_pipeline, clean_task_output_and_log_files_if_invalid, @@ -438,7 +438,7 @@ async def test_clean_task_output_and_log_files_if_invalid( mock.call( user_id=user_id, store_id=0, - s3_object=f"{published_project.project.uuid}/{sleeper_task.node_id}/{_LOGS_FILE_NAME}", + s3_object=f"{published_project.project.uuid}/{sleeper_task.node_id}/{LOGS_FILE_NAME}", ) ]