diff --git a/packages/models-library/src/models_library/functions.py b/packages/models-library/src/models_library/functions.py index df99fcc41891..f8a0f68d9255 100644 --- a/packages/models-library/src/models_library/functions.py +++ b/packages/models-library/src/models_library/functions.py @@ -190,21 +190,12 @@ class FunctionJobBase(BaseModel): function_class: FunctionClass -class RegisteredFunctionJobBase(FunctionJobBase): - uid: FunctionJobID - created_at: datetime.datetime - - class ProjectFunctionJob(FunctionJobBase): function_class: Literal[FunctionClass.PROJECT] = FunctionClass.PROJECT project_job_id: ProjectID | None job_creation_task_id: TaskID | None -class RegisteredProjectFunctionJob(ProjectFunctionJob, RegisteredFunctionJobBase): - pass - - class RegisteredProjectFunctionJobPatch(BaseModel): function_class: Literal[FunctionClass.PROJECT] = FunctionClass.PROJECT title: str | None @@ -221,10 +212,6 @@ class SolverFunctionJob(FunctionJobBase): job_creation_task_id: TaskID | None -class RegisteredSolverFunctionJob(SolverFunctionJob, RegisteredFunctionJobBase): - pass - - class RegisteredSolverFunctionJobPatch(BaseModel): function_class: Literal[FunctionClass.SOLVER] = FunctionClass.SOLVER title: str | None @@ -239,10 +226,6 @@ class PythonCodeFunctionJob(FunctionJobBase): function_class: Literal[FunctionClass.PYTHON_CODE] = FunctionClass.PYTHON_CODE -class RegisteredPythonCodeFunctionJob(PythonCodeFunctionJob, RegisteredFunctionJobBase): - pass - - class RegisteredPythonCodeFunctionJobPatch(BaseModel): function_class: Literal[FunctionClass.PYTHON_CODE] = FunctionClass.PYTHON_CODE title: str | None @@ -256,6 +239,24 @@ class RegisteredPythonCodeFunctionJobPatch(BaseModel): Field(discriminator="function_class"), ] + +class RegisteredFunctionJobBase(FunctionJobBase): + uid: FunctionJobID + created_at: datetime.datetime + + +class RegisteredProjectFunctionJob(ProjectFunctionJob, RegisteredFunctionJobBase): + pass + + +class RegisteredSolverFunctionJob(SolverFunctionJob, RegisteredFunctionJobBase): + pass + + +class RegisteredPythonCodeFunctionJob(PythonCodeFunctionJob, RegisteredFunctionJobBase): + pass + + RegisteredFunctionJob: TypeAlias = Annotated[ RegisteredProjectFunctionJob | RegisteredPythonCodeFunctionJob @@ -275,6 +276,36 @@ class FunctionJobStatus(BaseModel): status: str +class RegisteredFunctionJobWithStatusBase(RegisteredFunctionJobBase, FunctionJobBase): + status: FunctionJobStatus + + +class RegisteredProjectFunctionJobWithStatus( + RegisteredProjectFunctionJob, RegisteredFunctionJobWithStatusBase +): + pass + + +class RegisteredSolverFunctionJobWithStatus( + RegisteredSolverFunctionJob, RegisteredFunctionJobWithStatusBase +): + pass + + +class RegisteredPythonCodeFunctionJobWithStatus( + RegisteredPythonCodeFunctionJob, RegisteredFunctionJobWithStatusBase +): + pass + + +RegisteredFunctionJobWithStatus: TypeAlias = Annotated[ + RegisteredProjectFunctionJobWithStatus + | RegisteredPythonCodeFunctionJobWithStatus + | RegisteredSolverFunctionJobWithStatus, + Field(discriminator="function_class"), +] + + class FunctionJobCollection(BaseModel): """Model for a collection of function jobs""" @@ -309,6 +340,12 @@ class RegisteredFunctionJobDB(FunctionJobDB): created: datetime.datetime +class RegisteredFunctionJobWithStatusDB(FunctionJobDB): + uuid: FunctionJobID + created: datetime.datetime + status: str + + class FunctionDB(BaseModel): function_class: FunctionClass title: str = "" diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py index 0dea1ae55861..448532fda4ca 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py @@ -23,6 +23,7 @@ FunctionUserAccessRights, FunctionUserApiAccessRights, RegisteredFunctionJobPatch, + RegisteredFunctionJobWithStatus, ) from models_library.products import ProductName from models_library.rabbitmq_basic_types import RPCMethodName @@ -192,6 +193,40 @@ async def list_function_jobs( ).validate_python(result) +@log_decorator(_logger, level=logging.DEBUG) +async def list_function_jobs_with_status( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + pagination_offset: int, + pagination_limit: int, + filter_by_function_id: FunctionID | None = None, + filter_by_function_job_ids: list[FunctionJobID] | None = None, + filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, +) -> tuple[ + list[RegisteredFunctionJobWithStatus], + PageMetaInfoLimitOffset, +]: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("list_function_jobs_with_status"), + user_id=user_id, + product_name=product_name, + pagination_offset=pagination_offset, + pagination_limit=pagination_limit, + filter_by_function_id=filter_by_function_id, + filter_by_function_job_ids=filter_by_function_job_ids, + filter_by_function_job_collection_id=filter_by_function_job_collection_id, + ) + return TypeAdapter( + tuple[ + list[RegisteredFunctionJobWithStatus], + PageMetaInfoLimitOffset, + ] + ).validate_python(result) + + @log_decorator(_logger, level=logging.DEBUG) async def list_function_job_collections( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py index 9da18133af8f..83c7435b4eeb 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py @@ -41,9 +41,9 @@ async def ports( __all__ = ( "DBManager", - "exceptions", "FileLinkType", "Nodeports", "Port", + "exceptions", "ports", ) diff --git a/services/api-server/VERSION b/services/api-server/VERSION index ac454c6a1fc3..54d1a4f2a4a7 100644 --- a/services/api-server/VERSION +++ b/services/api-server/VERSION @@ -1 +1 @@ -0.12.0 +0.13.0 diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index 0c303480c223..4bd02be61c35 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "osparc.io public API", "description": "osparc-simcore public API specifications", - "version": "0.12.0" + "version": "0.13.0" }, "paths": { "/v0/meta": { @@ -5457,7 +5457,7 @@ "function_jobs" ], "summary": "List Function Jobs", - "description": "List function jobs\n\nNew in *version 0.8.0*\n\nAdded in *version 0.9.0*: add `created_at` field in the registered function-related objects\n\nAdded in *version 0.11.0*: add filter by `function_id`, `function_job_ids` and `function_job_collection_id`", + "description": "List function jobs\n\nNew in *version 0.8.0*\n\nAdded in *version 0.9.0*: add `created_at` field in the registered function-related objects\n\nAdded in *version 0.11.0*: add filter by `function_id`, `function_job_ids` and `function_job_collection_id`\n\nAdded in *version 0.13.0*: add include_status bool query parameter to list function jobs with their status", "operationId": "list_function_jobs", "security": [ { @@ -5465,6 +5465,18 @@ } ], "parameters": [ + { + "name": "include_status", + "in": "query", + "required": false, + "schema": { + "type": "boolean", + "description": "Include job status in response", + "default": false, + "title": "Include Status" + }, + "description": "Include job status in response" + }, { "name": "limit", "in": "query", @@ -5555,7 +5567,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/Page_Annotated_Union_RegisteredProjectFunctionJob__RegisteredPythonCodeFunctionJob__RegisteredSolverFunctionJob___FieldInfo_annotation_NoneType__required_True__discriminator__function_class____" + "$ref": "#/components/schemas/Page_Union_Annotated_Union_RegisteredProjectFunctionJobWithStatus__RegisteredPythonCodeFunctionJobWithStatus__RegisteredSolverFunctionJobWithStatus___FieldInfo_annotation_NoneType__required_True__discriminator__function_class_____Annotated_Union_RegisteredProjectFunctionJob__RegisteredPythonCodeFunctionJob__RegisteredSolverFunctionJob___FieldInfo_annotation_NoneType__required_True__discriminator__function_class_____" } } } @@ -10383,6 +10395,108 @@ ], "title": "Page[Study]" }, + "Page_Union_Annotated_Union_RegisteredProjectFunctionJobWithStatus__RegisteredPythonCodeFunctionJobWithStatus__RegisteredSolverFunctionJobWithStatus___FieldInfo_annotation_NoneType__required_True__discriminator__function_class_____Annotated_Union_RegisteredProjectFunctionJob__RegisteredPythonCodeFunctionJob__RegisteredSolverFunctionJob___FieldInfo_annotation_NoneType__required_True__discriminator__function_class_____": { + "properties": { + "items": { + "items": { + "anyOf": [ + { + "oneOf": [ + { + "$ref": "#/components/schemas/RegisteredProjectFunctionJobWithStatus" + }, + { + "$ref": "#/components/schemas/RegisteredPythonCodeFunctionJobWithStatus" + }, + { + "$ref": "#/components/schemas/RegisteredSolverFunctionJobWithStatus" + } + ], + "discriminator": { + "propertyName": "function_class", + "mapping": { + "PROJECT": "#/components/schemas/RegisteredProjectFunctionJobWithStatus", + "PYTHON_CODE": "#/components/schemas/RegisteredPythonCodeFunctionJobWithStatus", + "SOLVER": "#/components/schemas/RegisteredSolverFunctionJobWithStatus" + } + } + }, + { + "oneOf": [ + { + "$ref": "#/components/schemas/RegisteredProjectFunctionJob" + }, + { + "$ref": "#/components/schemas/RegisteredPythonCodeFunctionJob" + }, + { + "$ref": "#/components/schemas/RegisteredSolverFunctionJob" + } + ], + "discriminator": { + "propertyName": "function_class", + "mapping": { + "PROJECT": "#/components/schemas/RegisteredProjectFunctionJob", + "PYTHON_CODE": "#/components/schemas/RegisteredPythonCodeFunctionJob", + "SOLVER": "#/components/schemas/RegisteredSolverFunctionJob" + } + } + } + ] + }, + "type": "array", + "title": "Items" + }, + "total": { + "anyOf": [ + { + "type": "integer", + "minimum": 0 + }, + { + "type": "null" + } + ], + "title": "Total" + }, + "limit": { + "anyOf": [ + { + "type": "integer", + "minimum": 1 + }, + { + "type": "null" + } + ], + "title": "Limit" + }, + "offset": { + "anyOf": [ + { + "type": "integer", + "minimum": 0 + }, + { + "type": "null" + } + ], + "title": "Offset" + }, + "links": { + "$ref": "#/components/schemas/Links" + } + }, + "type": "object", + "required": [ + "items", + "total", + "limit", + "offset", + "links" + ], + "title": "Page[Union[Annotated[Union[RegisteredProjectFunctionJobWithStatus, RegisteredPythonCodeFunctionJobWithStatus, RegisteredSolverFunctionJobWithStatus], FieldInfo(annotation=NoneType, required=True, discriminator='function_class')], Annotated[Union[RegisteredProjectFunctionJob, RegisteredPythonCodeFunctionJob, RegisteredSolverFunctionJob], FieldInfo(annotation=NoneType, required=True, discriminator='function_class')]]]" + }, "PricingPlanClassification": { "type": "string", "enum": [ @@ -11133,6 +11247,103 @@ ], "title": "RegisteredProjectFunctionJob" }, + "RegisteredProjectFunctionJobWithStatus": { + "properties": { + "title": { + "type": "string", + "title": "Title", + "default": "" + }, + "description": { + "type": "string", + "title": "Description", + "default": "" + }, + "function_uid": { + "type": "string", + "format": "uuid", + "title": "Function Uid" + }, + "inputs": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Inputs" + }, + "outputs": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Outputs" + }, + "function_class": { + "type": "string", + "const": "PROJECT", + "title": "Function Class", + "default": "PROJECT" + }, + "uid": { + "type": "string", + "format": "uuid", + "title": "Uid" + }, + "created_at": { + "type": "string", + "format": "date-time", + "title": "Created At" + }, + "status": { + "$ref": "#/components/schemas/FunctionJobStatus" + }, + "project_job_id": { + "anyOf": [ + { + "type": "string", + "format": "uuid" + }, + { + "type": "null" + } + ], + "title": "Project Job Id" + }, + "job_creation_task_id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Job Creation Task Id" + } + }, + "type": "object", + "required": [ + "function_uid", + "inputs", + "outputs", + "uid", + "created_at", + "status", + "project_job_id", + "job_creation_task_id" + ], + "title": "RegisteredProjectFunctionJobWithStatus" + }, "RegisteredPythonCodeFunction": { "properties": { "function_class": { @@ -11291,6 +11502,78 @@ ], "title": "RegisteredPythonCodeFunctionJob" }, + "RegisteredPythonCodeFunctionJobWithStatus": { + "properties": { + "title": { + "type": "string", + "title": "Title", + "default": "" + }, + "description": { + "type": "string", + "title": "Description", + "default": "" + }, + "function_uid": { + "type": "string", + "format": "uuid", + "title": "Function Uid" + }, + "inputs": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Inputs" + }, + "outputs": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Outputs" + }, + "function_class": { + "type": "string", + "const": "PYTHON_CODE", + "title": "Function Class", + "default": "PYTHON_CODE" + }, + "uid": { + "type": "string", + "format": "uuid", + "title": "Uid" + }, + "created_at": { + "type": "string", + "format": "date-time", + "title": "Created At" + }, + "status": { + "$ref": "#/components/schemas/FunctionJobStatus" + } + }, + "type": "object", + "required": [ + "function_uid", + "inputs", + "outputs", + "uid", + "created_at", + "status" + ], + "title": "RegisteredPythonCodeFunctionJobWithStatus" + }, "RegisteredSolverFunction": { "properties": { "function_class": { @@ -11481,6 +11764,103 @@ ], "title": "RegisteredSolverFunctionJob" }, + "RegisteredSolverFunctionJobWithStatus": { + "properties": { + "title": { + "type": "string", + "title": "Title", + "default": "" + }, + "description": { + "type": "string", + "title": "Description", + "default": "" + }, + "function_uid": { + "type": "string", + "format": "uuid", + "title": "Function Uid" + }, + "inputs": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Inputs" + }, + "outputs": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Outputs" + }, + "function_class": { + "type": "string", + "const": "SOLVER", + "title": "Function Class", + "default": "SOLVER" + }, + "uid": { + "type": "string", + "format": "uuid", + "title": "Uid" + }, + "created_at": { + "type": "string", + "format": "date-time", + "title": "Created At" + }, + "status": { + "$ref": "#/components/schemas/FunctionJobStatus" + }, + "solver_job_id": { + "anyOf": [ + { + "type": "string", + "format": "uuid" + }, + { + "type": "null" + } + ], + "title": "Solver Job Id" + }, + "job_creation_task_id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Job Creation Task Id" + } + }, + "type": "object", + "required": [ + "function_uid", + "inputs", + "outputs", + "uid", + "created_at", + "status", + "solver_job_id", + "job_creation_task_id" + ], + "title": "RegisteredSolverFunctionJobWithStatus" + }, "RunningState": { "type": "string", "enum": [ diff --git a/services/api-server/setup.cfg b/services/api-server/setup.cfg index b6c8e79440d3..e263a7a4602e 100644 --- a/services/api-server/setup.cfg +++ b/services/api-server/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.12.0 +current_version = 0.13.0 commit = True message = services/api-server version: {current_version} → {new_version} tag = False diff --git a/services/api-server/src/simcore_service_api_server/_service_function_jobs.py b/services/api-server/src/simcore_service_api_server/_service_function_jobs.py index 4038ab9b6fc6..e1b3728e7eb1 100644 --- a/services/api-server/src/simcore_service_api_server/_service_function_jobs.py +++ b/services/api-server/src/simcore_service_api_server/_service_function_jobs.py @@ -10,11 +10,13 @@ FunctionJobCollectionID, FunctionJobID, FunctionJobStatus, + FunctionOutputs, FunctionSchemaClass, ProjectFunctionJob, RegisteredFunction, RegisteredFunctionJob, RegisteredFunctionJobPatch, + RegisteredFunctionJobWithStatus, RegisteredProjectFunctionJobPatch, RegisteredSolverFunctionJobPatch, SolverFunctionJob, @@ -36,6 +38,9 @@ from models_library.rpc_pagination import PageLimitInt from models_library.users import UserID from pydantic import ValidationError +from simcore_service_api_server._service_functions import FunctionService +from simcore_service_api_server.services_rpc.storage import StorageService +from sqlalchemy.ext.asyncio import AsyncEngine from ._service_jobs import JobService from .exceptions.function_errors import ( @@ -44,10 +49,8 @@ ) from .models.api_resources import JobLinks from .models.domain.functions import PreRegisteredFunctionJobData -from .models.schemas.jobs import ( - JobInputs, - JobPricingSpecification, -) +from .models.schemas.jobs import JobInputs, JobPricingSpecification +from .services_http.webserver import AuthSession from .services_rpc.wb_api_server import WbApiRpcClient @@ -70,7 +73,10 @@ class FunctionJobService: user_id: UserID product_name: ProductName _web_rpc_client: WbApiRpcClient + _storage_client: StorageService _job_service: JobService + _function_service: FunctionService + _webserver_api: AuthSession async def list_function_jobs( self, @@ -96,6 +102,33 @@ async def list_function_jobs( **pagination_kwargs, ) + async def list_function_jobs_with_status( + self, + *, + filter_by_function_id: FunctionID | None = None, + filter_by_function_job_ids: list[FunctionJobID] | None = None, + filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, + pagination_offset: PageOffsetInt | None = None, + pagination_limit: PageLimitInt | None = None, + ) -> tuple[ + list[RegisteredFunctionJobWithStatus], + PageMetaInfoLimitOffset, + ]: + """Lists all function jobs for a user with pagination""" + + pagination_kwargs = as_dict_exclude_none( + pagination_offset=pagination_offset, pagination_limit=pagination_limit + ) + + return await self._web_rpc_client.list_function_jobs_with_status( + user_id=self.user_id, + product_name=self.product_name, + filter_by_function_id=filter_by_function_id, + filter_by_function_job_ids=filter_by_function_job_ids, + filter_by_function_job_collection_id=filter_by_function_job_collection_id, + **pagination_kwargs, + ) + async def validate_function_inputs( self, *, function_id: FunctionID, inputs: FunctionInputs ) -> tuple[bool, str]: @@ -143,7 +176,7 @@ async def inspect_function_job( and function_job.function_class == FunctionClass.PROJECT ): if function_job.project_job_id is None: - raise FunctionJobProjectMissingError() + raise FunctionJobProjectMissingError job_status = await self._job_service.inspect_study_job( job_id=function_job.project_job_id, ) @@ -151,7 +184,7 @@ async def inspect_function_job( function_job.function_class == FunctionClass.SOLVER ): if function_job.solver_job_id is None: - raise FunctionJobProjectMissingError() + raise FunctionJobProjectMissingError job_status = await self._job_service.inspect_solver_job( solver_key=function.solver_key, version=function.solver_version, @@ -236,7 +269,7 @@ async def get_cached_function_job( if job_status.status == RunningState.SUCCESS: return cached_function_job - raise FunctionJobCacheNotFoundError() + raise FunctionJobCacheNotFoundError async def pre_register_function_job( self, @@ -456,3 +489,68 @@ async def map_function( x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, x_simcore_parent_node_id=x_simcore_parent_node_id, ) + + async def function_job_outputs( + self, + *, + function: RegisteredFunction, + function_job: RegisteredFunctionJob, + user_id: UserID, + product_name: ProductName, + stored_job_outputs: FunctionOutputs | None, + async_pg_engine: AsyncEngine, + ) -> FunctionOutputs: + + if stored_job_outputs is not None: + return stored_job_outputs + + try: + job_status = await self.inspect_function_job( + function=function, + function_job=function_job, + ) + except FunctionJobProjectMissingError: + return None + + if job_status.status != RunningState.SUCCESS: + return None + + if ( + function.function_class == FunctionClass.PROJECT + and function_job.function_class == FunctionClass.PROJECT + ): + if function_job.project_job_id is None: + return None + new_outputs = dict( + ( + await self._job_service.get_study_job_outputs( + study_id=function.project_id, + job_id=function_job.project_job_id, + ) + ).results + ) + elif ( + function.function_class == FunctionClass.SOLVER + and function_job.function_class == FunctionClass.SOLVER + ): + if function_job.solver_job_id is None: + return None + new_outputs = dict( + ( + await self._job_service.get_solver_job_outputs( + solver_key=function.solver_key, + version=function.solver_version, + job_id=function_job.solver_job_id, + async_pg_engine=async_pg_engine, + ) + ).results + ) + else: + raise UnsupportedFunctionClassError(function_class=function.function_class) + + return await self._web_rpc_client.update_function_job_outputs( + function_job_id=function_job.uid, + user_id=user_id, + product_name=product_name, + outputs=new_outputs, + ) diff --git a/services/api-server/src/simcore_service_api_server/_service_functions.py b/services/api-server/src/simcore_service_api_server/_service_functions.py index 4b6ba663bbe6..708ba0149be9 100644 --- a/services/api-server/src/simcore_service_api_server/_service_functions.py +++ b/services/api-server/src/simcore_service_api_server/_service_functions.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from common_library.exclude import as_dict_exclude_none -from models_library.functions import FunctionClass, RegisteredFunction +from models_library.functions import FunctionClass, FunctionID, RegisteredFunction from models_library.functions_errors import UnsupportedFunctionClassError from models_library.products import ProductName from models_library.rest_pagination import ( @@ -68,3 +68,11 @@ async def get_function_job_links( raise UnsupportedFunctionClassError( function_class=function.function_class, ) + + async def get_function(self, function_id: FunctionID) -> RegisteredFunction: + """Fetch a function by its ID""" + return await self._web_rpc_client.get_function( + user_id=self.user_id, + product_name=self.product_name, + function_id=function_id, + ) diff --git a/services/api-server/src/simcore_service_api_server/_service_jobs.py b/services/api-server/src/simcore_service_api_server/_service_jobs.py index 0300665e74d7..c01eac1fa23a 100644 --- a/services/api-server/src/simcore_service_api_server/_service_jobs.py +++ b/services/api-server/src/simcore_service_api_server/_service_jobs.py @@ -1,9 +1,13 @@ +# pylint: disable=too-many-instance-attributes + import logging from dataclasses import dataclass from pathlib import Path from uuid import UUID from common_library.exclude import as_dict_exclude_none +from fastapi import status +from fastapi.exceptions import HTTPException from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet from models_library.api_schemas_webserver.projects import ( ProjectCreateNew, @@ -15,31 +19,32 @@ from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes import InputID, InputTypes -from models_library.projects_nodes_io import NodeID -from models_library.rest_pagination import ( - PageMetaInfoLimitOffset, - PageOffsetInt, -) +from models_library.projects_nodes_io import BaseFileLink, NodeID +from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt from models_library.rpc.webserver.projects import ProjectJobRpcGet from models_library.rpc_pagination import PageLimitInt from models_library.users import UserID +from models_library.wallets import ZERO_CREDITS from servicelib.logging_utils import log_context +from sqlalchemy.ext.asyncio import AsyncEngine -from ._service_solvers import ( - SolverService, -) +from ._service_solvers import SolverService from .exceptions.backend_errors import JobAssetsMissingError -from .exceptions.custom_errors import SolverServiceListJobsFiltersError -from .models.api_resources import ( - JobLinks, - RelativeResourceName, - compose_resource_name, +from .exceptions.custom_errors import ( + InsufficientCreditsError, + MissingWalletError, + SolverServiceListJobsFiltersError, ) +from .models.api_resources import JobLinks, RelativeResourceName, compose_resource_name from .models.basic_types import NameValueTuple, VersionStr +from .models.domain.files import File as DomainFile +from .models.schemas.files import File as SchemaFile from .models.schemas.jobs import ( + ArgumentTypes, Job, JobID, JobInputs, + JobOutputs, JobPricingSpecification, JobStatus, ) @@ -54,9 +59,11 @@ create_jobstatus_from_task, create_new_project_for_job, ) -from .services_http.storage import StorageApi +from .services_http.solver_job_outputs import ResultsTypes, get_solver_output_results +from .services_http.storage import StorageApi, to_file_api_model from .services_http.study_job_models_converters import ( create_job_from_study, + create_job_outputs_from_project_outputs, get_project_and_file_inputs_from_job_inputs, ) from .services_http.webserver import AuthSession @@ -92,6 +99,7 @@ class JobService: _storage_rest_client: StorageApi _directorv2_rpc_client: DirectorV2Service _solver_service: SolverService + user_id: UserID product_name: ProductName @@ -273,12 +281,11 @@ async def start_log_export( file_ids = await self._directorv2_rpc_client.get_computation_task_log_file_ids( project_id=job_id ) - async_job_get = await self._storage_rpc_client.start_data_export( + return await self._storage_rpc_client.start_data_export( paths_to_export=[ Path(elm.file_id) for elm in file_ids if elm.file_id is not None ], ) - return async_job_get async def get_job( self, job_parent_resource_name: RelativeResourceName, job_id: JobID @@ -291,6 +298,94 @@ async def get_job( job_parent_resource_name=job_parent_resource_name, ) + async def get_solver_job_outputs( + self, + solver_key: SolverKeyId, + version: VersionStr, + job_id: JobID, + async_pg_engine: AsyncEngine, + ) -> JobOutputs: + job_name = compose_solver_job_resource_name(solver_key, version, job_id) + _logger.debug("Get Job '%s' outputs", job_name) + + project_marked_as_job = await self.get_job( + job_id=job_id, + job_parent_resource_name=Solver.compose_resource_name( + key=solver_key, version=version + ), + ) + node_ids = list(project_marked_as_job.workbench.keys()) + assert len(node_ids) == 1 # nosec + + if project_marked_as_job.storage_assets_deleted: + _logger.warning("Storage data for job '%s' has been deleted", job_name) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, detail="Assets have been deleted" + ) + + product_price = await self._web_rest_client.get_product_price() + if product_price.usd_per_credit is not None: + wallet = await self._web_rest_client.get_project_wallet( + project_id=project_marked_as_job.uuid + ) + if wallet is None: + raise MissingWalletError(job_id=project_marked_as_job.uuid) + wallet_with_credits = await self._web_rest_client.get_wallet( + wallet_id=wallet.wallet_id + ) + if wallet_with_credits.available_credits <= ZERO_CREDITS: + raise InsufficientCreditsError( + wallet_name=wallet_with_credits.name, + wallet_credit_amount=wallet_with_credits.available_credits, + ) + + outputs: dict[str, ResultsTypes] = await get_solver_output_results( + user_id=self.user_id, + project_uuid=job_id, + node_uuid=UUID(node_ids[0]), + db_engine=async_pg_engine, + ) + + results: dict[str, ArgumentTypes] = {} + for name, value in outputs.items(): + if isinstance(value, BaseFileLink): + file_id: UUID = DomainFile.create_id(*value.path.split("/")) + + found = await self._storage_rest_client.search_owned_files( + user_id=self.user_id, file_id=file_id, limit=1 + ) + if found: + assert len(found) == 1 # nosec + results[name] = SchemaFile.from_domain_model( + to_file_api_model(found[0]) + ) + else: + api_file = await self._storage_rest_client.create_soft_link( + user_id=self.user_id, + target_s3_path=value.path, + as_file_id=file_id, + ) + results[name] = SchemaFile.from_domain_model(api_file) + else: + results[name] = value + + return JobOutputs(job_id=job_id, results=results) + + async def get_study_job_outputs( + self, + study_id: StudyID, + job_id: JobID, + ) -> JobOutputs: + job_name = compose_study_job_resource_name(study_id, job_id) + _logger.debug("Getting Job Outputs for '%s'", job_name) + + project_outputs = await self._web_rest_client.get_project_outputs( + project_id=job_id + ) + return await create_job_outputs_from_project_outputs( + job_id, project_outputs, self.user_id, self._storage_rest_client + ) + async def delete_job_assets( self, job_parent_resource_name: RelativeResourceName, job_id: JobID ) -> None: diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/services.py b/services/api-server/src/simcore_service_api_server/api/dependencies/services.py index 1b722ad182b2..bb8142f1386c 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/services.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/services.py @@ -151,12 +151,19 @@ def get_function_service( def get_function_job_service( web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], job_service: Annotated[JobService, Depends(get_job_service)], + function_service: Annotated[FunctionService, Depends(get_function_service)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], + webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], + storage_service: Annotated[StorageService, Depends(get_storage_service)], + # async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], ) -> FunctionJobService: return FunctionJobService( _web_rpc_client=web_rpc_api, _job_service=job_service, + _function_service=function_service, + _storage_client=storage_service, + _webserver_api=webserver_api, user_id=user_id, product_name=product_name, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index b43f5840ce0c..f578d8243d42 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -1,10 +1,10 @@ +from collections.abc import Sequence from logging import getLogger from typing import Annotated, Final from common_library.error_codes import create_error_code -from fastapi import APIRouter, Depends, FastAPI, HTTPException, status +from fastapi import APIRouter, Depends, FastAPI, HTTPException, Query, status from fastapi_pagination.api import create_page -from fastapi_pagination.bases import AbstractPage from models_library.api_schemas_long_running_tasks.tasks import TaskGet from models_library.api_schemas_webserver.functions import ( FunctionClass, @@ -14,28 +14,26 @@ FunctionOutputs, RegisteredFunctionJob, ) -from models_library.functions import RegisteredFunction +from models_library.functions import RegisteredFunction, RegisteredFunctionJobWithStatus from models_library.functions_errors import ( UnsupportedFunctionClassError, UnsupportedFunctionFunctionJobClassCombinationError, ) from models_library.products import ProductName +from models_library.projects_state import RunningState from models_library.users import UserID from servicelib.celery.models import TaskUUID from servicelib.fastapi.dependencies import get_app from servicelib.logging_errors import create_troubleshootting_log_kwargs -from simcore_service_api_server.models.schemas.functions_filters import ( - FunctionJobsListFilters, -) from sqlalchemy.ext.asyncio import AsyncEngine from ..._service_function_jobs import FunctionJobService +from ..._service_functions import FunctionService from ..._service_jobs import JobService from ...exceptions.function_errors import FunctionJobProjectMissingError from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet -from ...services_http.storage import StorageApi -from ...services_http.webserver import AuthSession +from ...models.schemas.functions_filters import FunctionJobsListFilters from ...services_rpc.wb_api_server import WbApiRpcClient from ..dependencies.authentication import get_current_user_id, get_product_name from ..dependencies.celery import get_task_manager @@ -47,13 +45,11 @@ ) from ..dependencies.models_schemas_function_filters import get_function_jobs_filters from ..dependencies.services import ( - get_api_client, get_function_job_service, + get_function_service, get_job_service, ) -from ..dependencies.webserver_http import get_webserver_session from ..dependencies.webserver_rpc import get_wb_api_rpc_client -from . import solvers_jobs_read, studies_jobs from ._constants import ( FMSG_CHANGELOG_ADDED_IN_VERSION, FMSG_CHANGELOG_NEW_IN_VERSION, @@ -69,7 +65,7 @@ JOB_LIST_FILTER_PAGE_RELEASE_VERSION = "0.11.0" JOB_LOG_RELEASE_VERSION = "0.11.0" - +WITH_STATUS_RELEASE_VERSION = "0.13.0" _JOB_CREATION_TASK_STATUS_PREFIX: Final[str] = "JOB_CREATION_TASK_STATUS_" function_job_router = APIRouter() @@ -109,28 +105,92 @@ ) ) + if endpoint in ["list_function_jobs"]: + CHANGE_LOGS[endpoint].append( + FMSG_CHANGELOG_ADDED_IN_VERSION.format( + WITH_STATUS_RELEASE_VERSION, + "add include_status bool query parameter to list function jobs with their status", + ) + ) + @function_job_router.get( "", - response_model=Page[RegisteredFunctionJob], + response_model=Page[ + RegisteredFunctionJobWithStatus | RegisteredFunctionJob + ], # left-right order is important here description=create_route_description( base="List function jobs", changelog=CHANGE_LOGS["list_function_jobs"] ), ) async def list_function_jobs( + app: Annotated[FastAPI, Depends(get_app)], page_params: Annotated[PaginationParams, Depends()], function_job_service: Annotated[ FunctionJobService, Depends(get_function_job_service) ], + function_service: Annotated[FunctionService, Depends(get_function_service)], filters: Annotated[FunctionJobsListFilters, Depends(get_function_jobs_filters)], -) -> AbstractPage[RegisteredFunctionJob]: - function_jobs_list, meta = await function_job_service.list_function_jobs( - pagination_offset=page_params.offset, - pagination_limit=page_params.limit, - filter_by_function_job_ids=filters.function_job_ids, - filter_by_function_job_collection_id=filters.function_job_collection_id, - filter_by_function_id=filters.function_id, - ) + async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], + include_status: Annotated[ # noqa: FBT002 + bool, Query(description="Include job status in response") + ] = False, +): + function_jobs_list: Sequence[ + RegisteredFunctionJobWithStatus | RegisteredFunctionJob + ] = [] + if include_status: + function_jobs_list, meta = ( + await function_job_service.list_function_jobs_with_status( + pagination_offset=page_params.offset, + pagination_limit=page_params.limit, + filter_by_function_job_ids=filters.function_job_ids, + filter_by_function_job_collection_id=filters.function_job_collection_id, + filter_by_function_id=filters.function_id, + ) + ) + # the code below should ideally be in the service layer, but this can only be done if the + # celery status resolution is done in the service layer too + for function_job_wso in function_jobs_list: + if ( + function_job_wso.status.status + not in ( + RunningState.SUCCESS, + RunningState.FAILED, + ) + ) or function_job_wso.outputs is None: + function = await function_service.get_function( + function_id=function_job_wso.function_uid + ) + function_job_wso.status = await function_job_status( + app=app, + function=function, + function_job=function_job_wso, + function_job_service=function_job_service, + user_id=user_id, + product_name=product_name, + ) + if function_job_wso.status.status == RunningState.SUCCESS: + function_job_wso.outputs = ( + await function_job_service.function_job_outputs( + function_job=function_job_wso, + function=function, + user_id=user_id, + product_name=product_name, + stored_job_outputs=None, + async_pg_engine=async_pg_engine, + ) + ) + else: + function_jobs_list, meta = await function_job_service.list_function_jobs( + pagination_offset=page_params.offset, + pagination_limit=page_params.limit, + filter_by_function_job_ids=filters.function_job_ids, + filter_by_function_job_collection_id=filters.function_job_collection_id, + filter_by_function_id=filters.function_id, + ) return create_page( function_jobs_list, @@ -261,6 +321,7 @@ async def function_job_status( async def get_function_from_functionjobid( wb_api_rpc: WbApiRpcClient, function_job_id: FunctionJobID, + function_service: Annotated[FunctionService, Depends(get_function_service)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], ) -> tuple[RegisteredFunction, RegisteredFunctionJob]: @@ -271,14 +332,9 @@ async def get_function_from_functionjobid( product_name=product_name, ) - from .functions_routes import get_function - return ( - await get_function( - wb_api_rpc=wb_api_rpc, + await function_service.get_function( function_id=function_job.function_uid, - user_id=user_id, - product_name=product_name, ), function_job, ) @@ -297,70 +353,22 @@ async def function_job_outputs( function_job: Annotated[ RegisteredFunctionJob, Depends(get_function_job_dependency) ], + function_job_service: Annotated[ + FunctionJobService, Depends(get_function_job_service) + ], function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], - storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], - job_service: Annotated[JobService, Depends(get_job_service)], - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)], + async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], ) -> FunctionOutputs: - if stored_job_outputs is not None: - return stored_job_outputs - - if ( - function.function_class == FunctionClass.PROJECT - and function_job.function_class == FunctionClass.PROJECT - ): - if function_job.project_job_id is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Function job outputs not found", - ) - new_outputs = dict( - ( - await studies_jobs.get_study_job_outputs( - study_id=function.project_id, - job_id=function_job.project_job_id, - user_id=user_id, - webserver_api=webserver_api, - storage_client=storage_client, - ) - ).results - ) - elif ( - function.function_class == FunctionClass.SOLVER - and function_job.function_class == FunctionClass.SOLVER - ): - if function_job.solver_job_id is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Function job outputs not found", - ) - new_outputs = dict( - ( - await solvers_jobs_read.get_job_outputs( - solver_key=function.solver_key, - version=function.solver_version, - job_id=function_job.solver_job_id, - user_id=user_id, - webserver_api=webserver_api, - storage_client=storage_client, - job_service=job_service, - async_pg_engine=async_pg_engine, - ) - ).results - ) - else: - raise UnsupportedFunctionClassError(function_class=function.function_class) - - return await wb_api_rpc.update_function_job_outputs( - function_job_id=function_job.uid, + return await function_job_service.function_job_outputs( + function_job=function_job, user_id=user_id, product_name=product_name, - outputs=new_outputs, + function=function, + stored_job_outputs=stored_job_outputs, + async_pg_engine=async_pg_engine, ) @@ -378,14 +386,16 @@ async def function_job_outputs( async def get_function_job_logs_task( function_job_id: FunctionJobID, app: Annotated[FastAPI, Depends(get_app)], + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], job_service: Annotated[JobService, Depends(get_job_service)], + function_service: Annotated[FunctionService, Depends(get_function_service)], user_id: Annotated[UserID, Depends(get_current_user_id)], - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], product_name: Annotated[ProductName, Depends(get_product_name)], ): function, function_job = await get_function_from_functionjobid( wb_api_rpc=wb_api_rpc, function_job_id=function_job_id, + function_service=function_service, user_id=user_id, product_name=product_name, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py index 097fa1fc5b09..026b7d100c48 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py @@ -46,10 +46,7 @@ get_product_name, ) from ..dependencies.celery import ASYNC_JOB_CLIENT_NAME, get_task_manager -from ..dependencies.services import ( - get_function_job_service, - get_function_service, -) +from ..dependencies.services import get_function_job_service, get_function_service from ..dependencies.webserver_rpc import get_wb_api_rpc_client from ._constants import ( FMSG_CHANGELOG_ADDED_IN_VERSION, @@ -138,12 +135,10 @@ async def register_function( ) async def get_function( function_id: FunctionID, - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[ProductName, Depends(get_product_name)], + function_service: Annotated[FunctionService, Depends(get_function_service)], ) -> RegisteredFunction: - return await wb_api_rpc.get_function( - function_id=function_id, user_id=user_id, product_name=product_name + return await function_service.get_function( + function_id=function_id, ) @@ -328,7 +323,7 @@ async def validate_function_inputs( changelog=CHANGE_LOGS["run_function"], ), ) -async def run_function( # noqa: PLR0913 +async def run_function( request: Request, user_identity: Annotated[Identity, Depends(get_current_identity)], to_run_function: Annotated[RegisteredFunction, Depends(get_function)], @@ -444,7 +439,7 @@ async def delete_function( changelog=CHANGE_LOGS["map_function"], ), ) -async def map_function( # noqa: PLR0913 +async def map_function( request: Request, user_identity: Annotated[Identity, Depends(get_current_identity)], to_run_function: Annotated[RegisteredFunction, Depends(get_function)], diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py index 8aafc0aeb798..31c06ab52a02 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py @@ -4,7 +4,7 @@ from collections import deque from collections.abc import Callable from functools import partial -from typing import Annotated, Any, Union +from typing import Annotated, Any from uuid import UUID from fastapi import APIRouter, Depends, Request, status @@ -12,9 +12,7 @@ from fastapi.responses import RedirectResponse from fastapi_pagination.api import create_page from models_library.api_schemas_webserver.projects import ProjectGet -from models_library.projects_nodes_io import BaseFileLink from models_library.users import UserID -from models_library.wallets import ZERO_CREDITS from pydantic import HttpUrl, NonNegativeInt from pydantic.types import PositiveInt from servicelib.logging_utils import log_context @@ -23,28 +21,19 @@ from ..._service_jobs import JobService, compose_solver_job_resource_name from ..._service_solvers import SolverService -from ...exceptions.custom_errors import InsufficientCreditsError, MissingWalletError +from ...exceptions.custom_errors import MissingWalletError from ...exceptions.service_errors_utils import DEFAULT_BACKEND_SERVICE_STATUS_CODES from ...models.api_resources import parse_resources_ids from ...models.basic_types import LogStreamingResponse, NameValueTuple, VersionStr -from ...models.domain.files import File as DomainFile from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet -from ...models.schemas.files import File as SchemaFile -from ...models.schemas.jobs import ( - ArgumentTypes, - Job, - JobID, - JobLog, - JobMetadata, - JobOutputs, -) +from ...models.schemas.jobs import Job, JobID, JobLog, JobMetadata, JobOutputs from ...models.schemas.jobs_filters import JobMetadataFilter from ...models.schemas.model_adapter import ( PricingUnitGetLegacy, WalletGetWithAvailableCreditsLegacy, ) -from ...models.schemas.solvers import Solver, SolverKeyId +from ...models.schemas.solvers import SolverKeyId from ...services_http.director_v2 import DirectorV2Api from ...services_http.jobs import ( get_custom_metadata, @@ -55,8 +44,6 @@ create_job_from_project, get_solver_job_rest_interface_links, ) -from ...services_http.solver_job_outputs import ResultsTypes, get_solver_output_results -from ...services_http.storage import StorageApi, to_file_api_model from ..dependencies.application import get_reverse_url_mapper from ..dependencies.authentication import get_current_user_id from ..dependencies.database import get_db_asyncpg_engine @@ -69,10 +56,7 @@ FMSG_CHANGELOG_REMOVED_IN_VERSION_FORMAT, create_route_description, ) -from .solvers_jobs import ( - JOBS_STATUS_CODES, - METADATA_STATUS_CODES, -) +from .solvers_jobs import JOBS_STATUS_CODES, METADATA_STATUS_CODES from .wallets import WALLET_STATUS_CODES _logger = logging.getLogger(__name__) @@ -114,7 +98,7 @@ _LOGSTREAM_STATUS_CODES: dict[int | str, dict[str, Any]] = { status.HTTP_200_OK: { "description": "Returns a JobLog or an ErrorGet", - "model": Union[JobLog, ErrorGet], + "model": JobLog | ErrorGet, }, status.HTTP_409_CONFLICT: { "description": "Conflict: Logs are already being streamed", @@ -328,74 +312,16 @@ async def get_job_outputs( solver_key: SolverKeyId, version: VersionStr, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], job_service: Annotated[JobService, Depends(get_job_service)], - storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], + async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], ): - job_name = compose_solver_job_resource_name(solver_key, version, job_id) - _logger.debug("Get Job '%s' outputs", job_name) - - project_marked_as_job = await job_service.get_job( + return await job_service.get_solver_job_outputs( + solver_key=solver_key, + version=version, job_id=job_id, - job_parent_resource_name=Solver.compose_resource_name( - key=solver_key, version=version - ), - ) - node_ids = list(project_marked_as_job.workbench.keys()) - assert len(node_ids) == 1 # nosec - - if project_marked_as_job.storage_assets_deleted: - _logger.warning("Storage data for job '%s' has been deleted", job_name) - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, detail="Assets have been deleted" - ) - - product_price = await webserver_api.get_product_price() - if product_price.usd_per_credit is not None: - wallet = await webserver_api.get_project_wallet( - project_id=project_marked_as_job.uuid - ) - if wallet is None: - raise MissingWalletError(job_id=project_marked_as_job.uuid) - wallet_with_credits = await webserver_api.get_wallet(wallet_id=wallet.wallet_id) - if wallet_with_credits.available_credits <= ZERO_CREDITS: - raise InsufficientCreditsError( - wallet_name=wallet_with_credits.name, - wallet_credit_amount=wallet_with_credits.available_credits, - ) - - outputs: dict[str, ResultsTypes] = await get_solver_output_results( - user_id=user_id, - project_uuid=job_id, - node_uuid=UUID(node_ids[0]), - db_engine=async_pg_engine, + async_pg_engine=async_pg_engine, ) - results: dict[str, ArgumentTypes] = {} - for name, value in outputs.items(): - if isinstance(value, BaseFileLink): - file_id: UUID = DomainFile.create_id(*value.path.split("/")) - - found = await storage_client.search_owned_files( - user_id=user_id, file_id=file_id, limit=1 - ) - if found: - assert len(found) == 1 # nosec - results[name] = SchemaFile.from_domain_model( - to_file_api_model(found[0]) - ) - else: - api_file = await storage_client.create_soft_link( - user_id=user_id, target_s3_path=value.path, as_file_id=file_id - ) - results[name] = SchemaFile.from_domain_model(api_file) - else: - results[name] = value - - return JobOutputs(job_id=job_id, results=results) - @router.get( "/{solver_key:path}/releases/{version}/jobs/{job_id:uuid}/outputs/logfile", diff --git a/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py b/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py index 4eb72ed7788f..e8ad03e2342c 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py @@ -34,9 +34,7 @@ replace_custom_metadata, stop_project, ) -from ...services_http.storage import StorageApi from ...services_http.study_job_models_converters import ( - create_job_outputs_from_project_outputs, get_study_job_rest_interface_links, ) from ...services_http.webserver import AuthSession @@ -279,20 +277,13 @@ async def inspect_study_job( async def get_study_job_outputs( study_id: StudyID, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], - storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], + job_service: Annotated[JobService, Depends(get_job_service)], ): - job_name = compose_study_job_resource_name(study_id, job_id) - _logger.debug("Getting Job Outputs for '%s'", job_name) - - project_outputs = await webserver_api.get_project_outputs(project_id=job_id) - job_outputs: JobOutputs = await create_job_outputs_from_project_outputs( - job_id, project_outputs, user_id, storage_client + return await job_service.get_study_job_outputs( + study_id=study_id, + job_id=job_id, ) - return job_outputs - @router.get( "/{study_id}/jobs/{job_id}/outputs/log-links", diff --git a/services/api-server/src/simcore_service_api_server/celery_worker/worker_tasks/functions_tasks.py b/services/api-server/src/simcore_service_api_server/celery_worker/worker_tasks/functions_tasks.py index 4c3697c1ca80..e8cde1c5dc55 100644 --- a/services/api-server/src/simcore_service_api_server/celery_worker/worker_tasks/functions_tasks.py +++ b/services/api-server/src/simcore_service_api_server/celery_worker/worker_tasks/functions_tasks.py @@ -15,6 +15,7 @@ get_catalog_service, get_directorv2_service, get_function_job_service, + get_function_service, get_job_service, get_solver_service, get_storage_service, @@ -29,7 +30,9 @@ async def _assemble_function_job_service( - *, app: FastAPI, user_identity: Identity + *, + app: FastAPI, + user_identity: Identity, ) -> FunctionJobService: # This should ideally be done by a dependency injection system (like it is done in the api-server). # However, for that we would need to introduce a dependency injection system which is not coupled to, @@ -69,6 +72,12 @@ async def _assemble_function_job_service( product_name=user_identity.product_name, ) + function_service = get_function_service( + web_rpc_api=web_api_rpc_client, + user_id=user_identity.user_id, + product_name=user_identity.product_name, + ) + job_service = get_job_service( web_rest_api=web_server_rest_client, director2_api=director2_api, @@ -86,6 +95,9 @@ async def _assemble_function_job_service( job_service=job_service, user_id=user_identity.user_id, product_name=user_identity.product_name, + function_service=function_service, + webserver_api=web_server_rest_client, + storage_service=storage_service, ) diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py index 75f7c5a7d217..838406922d8d 100644 --- a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py +++ b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py @@ -29,6 +29,7 @@ FunctionUserAccessRights, FunctionUserApiAccessRights, RegisteredFunctionJobPatch, + RegisteredFunctionJobWithStatus, ) from models_library.licenses import LicensedItemID from models_library.products import ProductName @@ -250,7 +251,7 @@ async def mark_project_as_job( user_id: UserID, project_uuid: ProjectID, job_parent_resource_name: RelativeResourceName, - storage_assets_deleted: bool, + storage_assets_deleted: bool, # noqa: FBT001 ): await projects_rpc.mark_project_as_job( rpc_client=self._client, @@ -386,6 +387,31 @@ async def list_function_jobs( filter_by_function_job_collection_id=filter_by_function_job_collection_id, ) + async def list_function_jobs_with_status( + self, + *, + user_id: UserID, + product_name: ProductName, + pagination_offset: PageOffsetInt = 0, + pagination_limit: PageLimitInt = DEFAULT_NUMBER_OF_ITEMS_PER_PAGE, + filter_by_function_id: FunctionID | None = None, + filter_by_function_job_ids: list[FunctionJobID] | None = None, + filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, + ) -> tuple[ + list[RegisteredFunctionJobWithStatus], + PageMetaInfoLimitOffset, + ]: + return await functions_rpc_interface.list_function_jobs_with_status( + self._client, + user_id=user_id, + product_name=product_name, + pagination_offset=pagination_offset, + pagination_limit=pagination_limit, + filter_by_function_id=filter_by_function_id, + filter_by_function_job_ids=filter_by_function_job_ids, + filter_by_function_job_collection_id=filter_by_function_job_collection_id, + ) + async def list_function_job_collections( self, *, diff --git a/services/api-server/tests/unit/api_functions/celery/test_functions.py b/services/api-server/tests/unit/api_functions/celery/test_functions_celery.py similarity index 98% rename from services/api-server/tests/unit/api_functions/celery/test_functions.py rename to services/api-server/tests/unit/api_functions/celery/test_functions_celery.py index 130a05bebaa9..9aa57bcabcc3 100644 --- a/services/api-server/tests/unit/api_functions/celery/test_functions.py +++ b/services/api-server/tests/unit/api_functions/celery/test_functions_celery.py @@ -15,19 +15,14 @@ import httpx import pytest import respx -from celery import Celery, Task # pylint: disable=no-name-in-module -from celery.contrib.testing.worker import ( - TestWorkController, # pylint: disable=no-name-in-module -) +from celery import Celery, Task # type: ignore # pylint: disable=no-name-in-module +from celery.contrib.testing.worker import TestWorkController # type: ignore from celery_library.task import register_task from celery_library.types import register_pydantic_types from faker import Faker from fastapi import FastAPI, status from httpx import AsyncClient, BasicAuth, HTTPStatusError -from models_library.api_schemas_long_running_tasks.tasks import ( - TaskResult, - TaskStatus, -) +from models_library.api_schemas_long_running_tasks.tasks import TaskResult, TaskStatus from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobFilter from models_library.functions import ( FunctionClass, @@ -87,7 +82,7 @@ async def wait_for_task_result( client: AsyncClient, auth: BasicAuth, task_id: str, - timeout: float = 30.0, + timeout: float = 30.0, # noqa: ASYNC109 ) -> TaskResult: async for attempt in AsyncRetrying( diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py index 534051212a7b..a980886415ea 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -22,6 +22,7 @@ from models_library.functions import ( FunctionJobStatus, RegisteredProjectFunction, + RegisteredProjectFunctionJobWithStatus, TaskID, ) from models_library.products import ProductName @@ -33,6 +34,7 @@ from pytest_mock import MockerFixture, MockType from servicelib.celery.models import TaskFilter, TaskState, TaskStatus, TaskUUID from simcore_service_api_server._meta import API_VTAG +from simcore_service_api_server._service_function_jobs import FunctionJobService from simcore_service_api_server.api.routes import function_jobs_routes from simcore_service_api_server.api.routes.function_jobs_routes import ( _JOB_CREATION_TASK_STATUS_PREFIX, @@ -132,6 +134,61 @@ async def test_list_function_jobs( ) +@pytest.mark.parametrize("status_str", ["SUCCESS", "FAILED"]) +async def test_list_function_jobs_with_status( + client: AsyncClient, + mock_rabbitmq_rpc_client: MockerFixture, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function: RegisteredProjectFunction, + mock_registered_project_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, + mocker: MockerFixture, + status_str: str, +) -> None: + mock_status = FunctionJobStatus(status=status_str) + mock_outputs = {"X+Y": 42, "X-Y": 10} + mock_registered_project_function_job_with_status = ( + RegisteredProjectFunctionJobWithStatus( + **{ + **mock_registered_project_function_job.model_dump(), + "status": mock_status, + } + ) + ) + mock_handler_in_functions_rpc_interface( + "list_function_jobs_with_status", + ( + [mock_registered_project_function_job_with_status for _ in range(5)], + PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), + ), + ) + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_project_function + ) + + mock_function_job_outputs = mocker.patch.object( + FunctionJobService, "function_job_outputs", return_value=mock_outputs + ) + mock_handler_in_functions_rpc_interface("get_function_job_status", mock_status) + response = await client.get( + f"{API_VTAG}/function_jobs?include_status=true", auth=auth + ) + assert response.status_code == status.HTTP_200_OK + data = response.json()["items"] + assert len(data) == 5 + returned_function_job = RegisteredProjectFunctionJobWithStatus.model_validate( + data[0] + ) + if status_str == "SUCCESS": + mock_function_job_outputs.assert_called() + assert returned_function_job.outputs == mock_outputs + else: + mock_function_job_outputs.assert_not_called() + assert returned_function_job.outputs is None + + assert returned_function_job == mock_registered_project_function_job_with_status + + async def test_list_function_jobs_with_job_id_filter( client: AsyncClient, mock_rabbitmq_rpc_client: MockerFixture, @@ -203,10 +260,10 @@ def mocked_list_function_jobs(offset: int, limit: int): ( ProjectID(_faker.uuid4()), TaskID(_faker.uuid4()), - random.choice(list(TaskState)), + random.choice(list(TaskState)), # noqa: S311 ), - (None, None, random.choice(list(TaskState))), - (None, TaskID(_faker.uuid4()), random.choice(list(TaskState))), + (None, None, random.choice(list(TaskState))), # noqa: S311 + (None, TaskID(_faker.uuid4()), random.choice(list(TaskState))), # noqa: S311 ], ) async def test_get_function_job_status( @@ -227,8 +284,7 @@ async def test_get_function_job_status( _expected_return_status = ( status.HTTP_500_INTERNAL_SERVER_ERROR - if job_status != "SUCCESS" - and job_status != "FAILED" + if job_status not in ("SUCCESS", "FAILED") and (project_job_id is None and job_creation_task_id is None) else status.HTTP_200_OK ) @@ -299,11 +355,7 @@ async def _get_task_status( assert response.status_code == _expected_return_status if response.status_code == status.HTTP_200_OK: data = response.json() - if ( - project_job_id is not None - or job_status == "SUCCESS" - or job_status == "FAILED" - ): + if project_job_id is not None or job_status in ("SUCCESS", "FAILED"): assert data["status"] == job_status else: assert ( @@ -313,10 +365,23 @@ async def _get_task_status( @pytest.mark.parametrize( - "job_outputs, project_job_id", + "job_outputs, project_job_id, job_status, expected_output, use_db_cache", [ - (None, None), - ({"X+Y": 42, "X-Y": 10}, ProjectID(_faker.uuid4())), + (None, None, "created", None, True), + ( + {"X+Y": 42, "X-Y": 10}, + ProjectID(_faker.uuid4()), + RunningState.FAILED, + None, + False, + ), + ( + {"X+Y": 42, "X-Y": 10}, + ProjectID(_faker.uuid4()), + RunningState.SUCCESS, + {"X+Y": 42, "X-Y": 10}, + True, + ), ], ) async def test_get_function_job_outputs( @@ -328,14 +393,11 @@ async def test_get_function_job_outputs( auth: httpx.BasicAuth, job_outputs: dict[str, Any] | None, project_job_id: ProjectID | None, + job_status: str, + expected_output: dict[str, Any] | None, + use_db_cache: bool, ) -> None: - _expected_return_status = ( - status.HTTP_404_NOT_FOUND - if project_job_id is None and job_outputs is None - else status.HTTP_200_OK - ) - mock_handler_in_functions_rpc_interface( "get_function_job", mock_registered_project_function_job.model_copy( @@ -349,13 +411,21 @@ async def test_get_function_job_outputs( mock_handler_in_functions_rpc_interface( "get_function", mock_registered_project_function ) - mock_handler_in_functions_rpc_interface("get_function_job_outputs", job_outputs) + + if use_db_cache: + mock_handler_in_functions_rpc_interface("get_function_job_outputs", job_outputs) + else: + mock_handler_in_functions_rpc_interface("get_function_job_outputs", None) + + mock_handler_in_functions_rpc_interface( + "get_function_job_status", + FunctionJobStatus(status=job_status), + ) response = await client.get( f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}/outputs", auth=auth, ) - assert response.status_code == _expected_return_status - if response.status_code == status.HTTP_200_OK: - data = response.json() - assert data == job_outputs + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data == expected_output diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index c473d876cf50..4f916da3bfad 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -433,10 +433,7 @@ def _get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient: ) async def _get_wb_api_rpc_client(app: FastAPI) -> WbApiRpcClient: - wb_api_rpc_client = WbApiRpcClient( - _client=mocker.MagicMock(spec=RabbitMQRPCClient) - ) - return wb_api_rpc_client + return WbApiRpcClient(_client=mocker.MagicMock(spec=RabbitMQRPCClient)) mocker.patch.object( functions_tasks, "get_wb_api_rpc_client", _get_wb_api_rpc_client diff --git a/services/api-server/tests/unit/conftest.py b/services/api-server/tests/unit/conftest.py index 1865519bb1af..64c2adeac9e9 100644 --- a/services/api-server/tests/unit/conftest.py +++ b/services/api-server/tests/unit/conftest.py @@ -752,7 +752,7 @@ def mocked_solver_job_outputs(mocker) -> None: eTag=None, ) mocker.patch( - "simcore_service_api_server.api.routes.solvers_jobs_read.get_solver_output_results", + "simcore_service_api_server._service_jobs.get_solver_output_results", autospec=True, return_value=result, ) diff --git a/services/api-server/tests/unit/service/conftest.py b/services/api-server/tests/unit/service/conftest.py index f6b51d091c1c..fb04ff773d83 100644 --- a/services/api-server/tests/unit/service/conftest.py +++ b/services/api-server/tests/unit/service/conftest.py @@ -26,6 +26,7 @@ from simcore_service_api_server.services_rpc.director_v2 import DirectorV2Service from simcore_service_api_server.services_rpc.storage import StorageService from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient +from sqlalchemy.ext.asyncio import AsyncEngine async def catalog_rpc_side_effect(): @@ -118,8 +119,7 @@ def director2_api(mocker: MockerFixture) -> DirectorV2Api: def storage_rest_client( mocker: MockerFixture, ) -> StorageApi: - mock = mocker.AsyncMock(spec=StorageApi) - return mock + return mocker.AsyncMock(spec=StorageApi) @pytest.fixture @@ -153,6 +153,11 @@ def program_service( return ProgramService(catalog_service=catalog_service) +@pytest.fixture +def async_pg_engine(mocker: MockerFixture) -> AsyncEngine: + return mocker.MagicMock(spec=AsyncEngine) + + @pytest.fixture def job_service( auth_session: AuthSession, @@ -164,6 +169,7 @@ def job_service( product_name: ProductName, user_id: UserID, solver_service: SolverService, + async_pg_engine: AsyncEngine, ) -> JobService: return JobService( _web_rest_client=auth_session, diff --git a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py index 3f85890e6a9e..5b2e98f60dc0 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py +++ b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py @@ -21,6 +21,7 @@ RegisteredFunctionJob, RegisteredFunctionJobCollection, RegisteredFunctionJobPatch, + RegisteredFunctionJobWithStatus, ) from models_library.functions_errors import ( FunctionIDNotFoundError, @@ -252,6 +253,38 @@ async def list_function_jobs( ) +@router.expose( + reraise_if_error_type=( + FunctionJobsReadApiAccessDeniedError, + FunctionsReadApiAccessDeniedError, + ) +) +async def list_function_jobs_with_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + pagination_limit: int, + pagination_offset: int, + filter_by_function_id: FunctionID | None = None, + filter_by_function_job_ids: list[FunctionJobID] | None = None, + filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, +) -> tuple[ + list[RegisteredFunctionJobWithStatus], + PageMetaInfoLimitOffset, +]: + return await _functions_service.list_function_jobs_with_status( + app=app, + user_id=user_id, + product_name=product_name, + pagination_limit=pagination_limit, + pagination_offset=pagination_offset, + filter_by_function_id=filter_by_function_id, + filter_by_function_job_ids=filter_by_function_job_ids, + filter_by_function_job_collection_id=filter_by_function_job_collection_id, + ) + + @router.expose( reraise_if_error_type=( FunctionJobCollectionsReadApiAccessDeniedError, diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py index 614359330565..620f73bd3e2d 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py @@ -29,6 +29,7 @@ RegisteredFunctionDB, RegisteredFunctionJobCollectionDB, RegisteredFunctionJobDB, + RegisteredFunctionJobWithStatusDB, ) from models_library.functions_errors import ( FunctionBaseError, @@ -234,7 +235,7 @@ async def create_function_job( # noqa: PLR0913 return registered_function_job -async def patch_function_job( # noqa: PLR0913 +async def patch_function_job( app: web.Application, connection: AsyncConnection | None = None, *, @@ -269,9 +270,7 @@ async def patch_function_job( # noqa: PLR0913 ) row = result.one() - registered_function_job = RegisteredFunctionJobDB.model_validate(row) - - return registered_function_job + return RegisteredFunctionJobDB.model_validate(row) async def create_function_job_collection( @@ -506,7 +505,7 @@ async def list_functions( ) -async def list_function_jobs( +async def list_function_jobs_with_status( app: web.Application, connection: AsyncConnection | None = None, *, @@ -517,7 +516,7 @@ async def list_function_jobs( filter_by_function_id: FunctionID | None = None, filter_by_function_job_ids: list[FunctionJobID] | None = None, filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, -) -> tuple[list[RegisteredFunctionJobDB], PageMetaInfoLimitOffset]: +) -> tuple[list[RegisteredFunctionJobWithStatusDB], PageMetaInfoLimitOffset]: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: await check_user_api_access_rights( app, @@ -579,7 +578,7 @@ async def list_function_jobs( total=0, offset=pagination_offset, limit=pagination_limit, count=0 ) results = [ - RegisteredFunctionJobDB.model_validate(row) + RegisteredFunctionJobWithStatusDB.model_validate(row) async for row in await conn.stream( function_jobs_table.select() .where(filter_conditions) diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py index 5917e8172698..32920167cfe3 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py @@ -28,10 +28,14 @@ RegisteredFunctionJobCollection, RegisteredFunctionJobDB, RegisteredFunctionJobPatch, + RegisteredFunctionJobWithStatus, + RegisteredFunctionJobWithStatusDB, RegisteredProjectFunction, RegisteredProjectFunctionJob, + RegisteredProjectFunctionJobWithStatus, RegisteredSolverFunction, RegisteredSolverFunctionJob, + RegisteredSolverFunctionJobWithStatus, ) from models_library.functions_errors import ( FunctionJobPatchModelIncompatibleError, @@ -46,9 +50,7 @@ from servicelib.rabbitmq import RPCRouter from . import _functions_repository -from ._functions_exceptions import ( - FunctionGroupAccessRightsNotFoundError, -) +from ._functions_exceptions import FunctionGroupAccessRightsNotFoundError router = RPCRouter() @@ -262,15 +264,17 @@ async def list_function_jobs( filter_by_function_job_ids: list[FunctionJobID] | None = None, filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, ) -> tuple[list[RegisteredFunctionJob], PageMetaInfoLimitOffset]: - returned_function_jobs, page = await _functions_repository.list_function_jobs( - app=app, - user_id=user_id, - product_name=product_name, - pagination_limit=pagination_limit, - pagination_offset=pagination_offset, - filter_by_function_id=filter_by_function_id, - filter_by_function_job_ids=filter_by_function_job_ids, - filter_by_function_job_collection_id=filter_by_function_job_collection_id, + returned_function_jobs, page = ( + await _functions_repository.list_function_jobs_with_status( + app=app, + user_id=user_id, + product_name=product_name, + pagination_limit=pagination_limit, + pagination_offset=pagination_offset, + filter_by_function_id=filter_by_function_id, + filter_by_function_job_ids=filter_by_function_job_ids, + filter_by_function_job_collection_id=filter_by_function_job_collection_id, + ) ) return [ _decode_functionjob(returned_function_job) @@ -278,6 +282,38 @@ async def list_function_jobs( ], page +async def list_function_jobs_with_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + pagination_limit: int, + pagination_offset: int, + filter_by_function_id: FunctionID | None = None, + filter_by_function_job_ids: list[FunctionJobID] | None = None, + filter_by_function_job_collection_id: FunctionJobCollectionID | None = None, +) -> tuple[ + list[RegisteredFunctionJobWithStatus], + PageMetaInfoLimitOffset, +]: + returned_function_jobs_wso, page = ( + await _functions_repository.list_function_jobs_with_status( + app=app, + user_id=user_id, + product_name=product_name, + pagination_limit=pagination_limit, + pagination_offset=pagination_offset, + filter_by_function_id=filter_by_function_id, + filter_by_function_job_ids=filter_by_function_job_ids, + filter_by_function_job_collection_id=filter_by_function_job_collection_id, + ) + ) + return [ + _decode_functionjob_wso(returned_function_job_wso) + for returned_function_job_wso in returned_function_jobs_wso + ], page + + async def list_function_job_collections( app: web.Application, *, @@ -759,7 +795,7 @@ def _encode_functionjob( def _decode_functionjob( - functionjob_db: RegisteredFunctionJobDB, + functionjob_db: RegisteredFunctionJobWithStatusDB | RegisteredFunctionJobDB, ) -> RegisteredFunctionJob: if functionjob_db.function_class == FunctionClass.PROJECT: return RegisteredProjectFunctionJob( @@ -796,6 +832,46 @@ def _decode_functionjob( ) +def _decode_functionjob_wso( + functionjob_db: RegisteredFunctionJobWithStatusDB, +) -> RegisteredFunctionJobWithStatus: + if functionjob_db.function_class == FunctionClass.PROJECT: + return RegisteredProjectFunctionJobWithStatus( + uid=functionjob_db.uuid, + title=functionjob_db.title, + description="", + function_uid=functionjob_db.function_uuid, + inputs=functionjob_db.inputs, + outputs=functionjob_db.outputs, + project_job_id=functionjob_db.class_specific_data["project_job_id"], + created_at=functionjob_db.created, + status=FunctionJobStatus(status=functionjob_db.status), + job_creation_task_id=functionjob_db.class_specific_data.get( + "job_creation_task_id" + ), + ) + + if functionjob_db.function_class == FunctionClass.SOLVER: + return RegisteredSolverFunctionJobWithStatus( + uid=functionjob_db.uuid, + title=functionjob_db.title, + description="", + function_uid=functionjob_db.function_uuid, + inputs=functionjob_db.inputs, + outputs=functionjob_db.outputs, + solver_job_id=functionjob_db.class_specific_data["solver_job_id"], + created_at=functionjob_db.created, + status=FunctionJobStatus(status=functionjob_db.status), + job_creation_task_id=functionjob_db.class_specific_data.get( + "job_creation_task_id" + ), + ) + + raise UnsupportedFunctionJobClassError( + function_job_class=functionjob_db.function_class + ) + + def _patch_functionjob( function_job_db: RegisteredFunctionJobDB, patch: RegisteredFunctionJobPatch, diff --git a/services/web/server/tests/unit/with_dbs/04/functions/test_function_jobs_controller_rpc.py b/services/web/server/tests/unit/with_dbs/04/functions/test_function_jobs_controller_rpc.py index 80a998b1db7d..09db242f505a 100644 --- a/services/web/server/tests/unit/with_dbs/04/functions/test_function_jobs_controller_rpc.py +++ b/services/web/server/tests/unit/with_dbs/04/functions/test_function_jobs_controller_rpc.py @@ -9,9 +9,7 @@ from aiohttp.test_utils import TestClient from common_library.users_enums import UserRole from faker import Faker -from models_library.api_schemas_webserver.functions import ( - ProjectFunctionJob, -) +from models_library.api_schemas_webserver.functions import ProjectFunctionJob from models_library.functions import ( Function, FunctionClass, @@ -226,6 +224,60 @@ async def test_list_function_jobs( assert any(j.uid == registered_job.uid for j in jobs) +@pytest.mark.parametrize( + "user_role", + [UserRole.USER], +) +async def test_list_function_jobs_with_status( + client: TestClient, + add_user_function_api_access_rights: None, + rpc_client: RabbitMQRPCClient, + mock_function_factory: Callable[[FunctionClass], Function], + logged_user: UserInfoDict, + osparc_product_name: ProductName, +): + # Register the function first + registered_function = await functions_rpc.register_function( + rabbitmq_rpc_client=rpc_client, + function=mock_function_factory(FunctionClass.PROJECT), + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + assert registered_function.uid is not None + + function_job = ProjectFunctionJob( + function_uid=registered_function.uid, + title="Test Function Job", + description="A test function job", + project_job_id=uuid4(), + inputs={"input1": "value1"}, + outputs={"output1": "result1"}, + job_creation_task_id=None, + ) + + # Register the function job + registered_job = await functions_rpc.register_function_job( + rabbitmq_rpc_client=rpc_client, + function_job=function_job, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + # List function jobs + jobs, _ = await functions_rpc.list_function_jobs_with_status( + rabbitmq_rpc_client=rpc_client, + pagination_limit=10, + pagination_offset=0, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + # Assert the list contains the registered job + assert len(jobs) > 0 + assert jobs[0].status.status == "created" + assert any(j.uid == registered_job.uid for j in jobs) + + @pytest.mark.parametrize( "user_role", [UserRole.USER],