diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py index 00a982add87..89e387adffd 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py @@ -17,6 +17,8 @@ RegisteredFunctionJobCollection, ) from models_library.functions import ( + FunctionJobStatus, + FunctionOutputs, FunctionUserAccessRights, FunctionUserApiAccessRights, ) @@ -300,6 +302,82 @@ async def get_function_job( return TypeAdapter(RegisteredFunctionJob).validate_python(result) +@log_decorator(_logger, level=logging.DEBUG) +async def get_function_job_status( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + function_job_id: FunctionJobID, + product_name: ProductName, +) -> FunctionJobStatus: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("get_function_job_status"), + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionJobStatus).validate_python(result) + + +@log_decorator(_logger, level=logging.DEBUG) +async def get_function_job_outputs( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + function_job_id: FunctionJobID, + product_name: ProductName, +) -> FunctionOutputs: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("get_function_job_outputs"), + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionOutputs).validate_python(result) + + +@log_decorator(_logger, level=logging.DEBUG) +async def update_function_job_status( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("update_function_job_status"), + function_job_id=function_job_id, + job_status=job_status, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionJobStatus).validate_python(result) + + +@log_decorator(_logger, level=logging.DEBUG) +async def update_function_job_outputs( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("update_function_job_outputs"), + function_job_id=function_job_id, + outputs=outputs, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionOutputs).validate_python(result) + + @log_decorator(_logger, level=logging.DEBUG) async def delete_function_job( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py b/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py new file mode 100644 index 00000000000..354a0541f48 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py @@ -0,0 +1,87 @@ +from typing import Annotated + +from fastapi import Depends +from models_library.functions import ( + FunctionJob, + FunctionJobID, + FunctionJobStatus, + FunctionOutputs, + RegisteredFunction, +) +from models_library.products import ProductName +from models_library.users import UserID +from simcore_service_api_server.api.dependencies.authentication import ( + get_current_user_id, + get_product_name, +) +from simcore_service_api_server.api.dependencies.webserver_rpc import ( + get_wb_api_rpc_client, +) +from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient + + +async def get_stored_job_outputs( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> FunctionOutputs: + + return await wb_api_rpc.get_function_job_outputs( + function_job_id=function_job_id, user_id=user_id, product_name=product_name + ) + + +async def get_function_job_dependency( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> FunctionJob: + return await wb_api_rpc.get_function_job( + function_job_id=function_job_id, user_id=user_id, product_name=product_name + ) + + +async def get_function_from_functionjob( + function_job: Annotated[FunctionJob, Depends(get_function_job_dependency)], + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> RegisteredFunction: + return await wb_api_rpc.get_function( + function_id=function_job.function_uid, + user_id=user_id, + product_name=product_name, + ) + + +async def get_function_from_functionjobid( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> RegisteredFunction: + function_job = await get_function_job_dependency( + function_job_id=function_job_id, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ) + return await get_function_from_functionjob( + function_job=function_job, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ) + + +async def get_stored_job_status( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> FunctionJobStatus: + return await wb_api_rpc.get_function_job_status( + function_job_id=function_job_id, user_id=user_id, product_name=product_name + ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py index eb50f8bd0ef..b4eccee5af0 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py @@ -12,7 +12,13 @@ RegisteredFunctionJobCollection, ) from models_library.products import ProductName -from models_library.users import UserID # Import UserID +from models_library.users import UserID +from simcore_service_api_server.api.dependencies.functions import ( + get_stored_job_status, # Import UserID +) +from simcore_service_api_server.api.dependencies.functions import ( + get_function_from_functionjobid, +) from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet @@ -221,13 +227,30 @@ async def function_job_collection_status( job_statuses = await asyncio.gather( *[ function_job_status( - job_id, + function_job=await get_function_job( + function_job_id=function_job_id, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ), + function=await get_function_from_functionjobid( + function_job_id=function_job_id, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ), + stored_job_status=await get_stored_job_status( + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + wb_api_rpc=wb_api_rpc, + ), wb_api_rpc=wb_api_rpc, director2_api=director2_api, user_id=user_id, product_name=product_name, ) - for job_id in function_job_collection.job_ids + for function_job_id in function_job_collection.job_ids ] ) return FunctionJobCollectionStatus( diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index 3c3f7cec2c3..38e78a021e4 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -4,7 +4,6 @@ from fastapi_pagination.api import create_page from models_library.api_schemas_long_running_tasks.tasks import TaskGet from models_library.api_schemas_webserver.functions import ( - Function, FunctionClass, FunctionJob, FunctionJobID, @@ -12,13 +11,21 @@ FunctionOutputs, RegisteredFunctionJob, ) +from models_library.functions import RegisteredFunction from models_library.functions_errors import ( UnsupportedFunctionClassError, UnsupportedFunctionFunctionJobClassCombinationError, ) from models_library.products import ProductName +from models_library.projects_state import RunningState from models_library.users import UserID from servicelib.fastapi.dependencies import get_app +from simcore_service_api_server.api.dependencies.functions import ( + get_function_from_functionjob, + get_function_job_dependency, + get_stored_job_outputs, + get_stored_job_status, +) from sqlalchemy.ext.asyncio import AsyncEngine from ..._service_jobs import JobService @@ -171,19 +178,19 @@ async def delete_function_job( ), ) async def function_job_status( - function_job_id: FunctionJobID, + function_job: Annotated[ + RegisteredFunctionJob, Depends(get_function_job_dependency) + ], + function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)], + stored_job_status: Annotated[FunctionJobStatus, Depends(get_stored_job_status)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], ) -> FunctionJobStatus: - function, function_job = await get_function_from_functionjobid( - wb_api_rpc=wb_api_rpc, - function_job_id=function_job_id, - user_id=user_id, - product_name=product_name, - ) + if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED): + return stored_job_status if ( function.function_class == FunctionClass.PROJECT @@ -195,9 +202,7 @@ async def function_job_status( user_id=user_id, director2_api=director2_api, ) - return FunctionJobStatus(status=job_status.state) - - if (function.function_class == FunctionClass.SOLVER) and ( + elif (function.function_class == FunctionClass.SOLVER) and ( function_job.function_class == FunctionClass.SOLVER ): job_status = await solvers_jobs.inspect_job( @@ -207,11 +212,19 @@ async def function_job_status( user_id=user_id, director2_api=director2_api, ) - return FunctionJobStatus(status=job_status.state) + else: + raise UnsupportedFunctionFunctionJobClassCombinationError( + function_class=function.function_class, + function_job_class=function_job.function_class, + ) + + new_job_status = FunctionJobStatus(status=job_status.state) - raise UnsupportedFunctionFunctionJobClassCombinationError( - function_class=function.function_class, - function_job_class=function_job.function_class, + return await wb_api_rpc.update_function_job_status( + function_job_id=function_job.uid, + user_id=user_id, + product_name=product_name, + job_status=new_job_status, ) @@ -220,7 +233,7 @@ async def get_function_from_functionjobid( function_job_id: FunctionJobID, user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], -) -> tuple[Function, FunctionJob]: +) -> tuple[RegisteredFunction, RegisteredFunctionJob]: function_job = await get_function_job( wb_api_rpc=wb_api_rpc, function_job_id=function_job_id, @@ -251,26 +264,26 @@ async def get_function_from_functionjobid( ), ) async def function_job_outputs( - function_job_id: FunctionJobID, + function_job: Annotated[ + RegisteredFunctionJob, Depends(get_function_job_dependency) + ], + function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], + stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)], ) -> FunctionOutputs: - function, function_job = await get_function_from_functionjobid( - wb_api_rpc=wb_api_rpc, - function_job_id=function_job_id, - user_id=user_id, - product_name=product_name, - ) + if stored_job_outputs is not None: + return stored_job_outputs if ( function.function_class == FunctionClass.PROJECT and function_job.function_class == FunctionClass.PROJECT ): - return dict( + new_outputs = dict( ( await studies_jobs.get_study_job_outputs( study_id=function.project_id, @@ -281,12 +294,11 @@ async def function_job_outputs( ) ).results ) - - if ( + elif ( function.function_class == FunctionClass.SOLVER and function_job.function_class == FunctionClass.SOLVER ): - return dict( + new_outputs = dict( ( await solvers_jobs_read.get_job_outputs( solver_key=function.solver_key, @@ -299,7 +311,15 @@ async def function_job_outputs( ) ).results ) - raise UnsupportedFunctionClassError(function_class=function.function_class) + else: + raise UnsupportedFunctionClassError(function_class=function.function_class) + + return await wb_api_rpc.update_function_job_outputs( + function_job_id=function_job.uid, + user_id=user_id, + product_name=product_name, + outputs=new_outputs, + ) @function_job_router.post( diff --git a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py index bbfeefc4efc..c9ab2dda67c 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py @@ -36,6 +36,7 @@ from models_library.users import UserID from servicelib.fastapi.dependencies import get_reverse_url_mapper from simcore_service_api_server._service_jobs import JobService +from simcore_service_api_server.api.dependencies.functions import get_stored_job_status from ..._service_solvers import SolverService from ...models.pagination import Page, PaginationParams @@ -365,11 +366,12 @@ async def validate_function_inputs( ) async def run_function( # noqa: PLR0913 request: Request, + function_id: FunctionID, + to_run_function: Annotated[RegisteredFunction, Depends(get_function)], wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], - function_id: FunctionID, function_inputs: FunctionInputs, user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[str, Depends(get_product_name)], @@ -378,7 +380,6 @@ async def run_function( # noqa: PLR0913 x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()], x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()], ) -> RegisteredFunctionJob: - parent_project_uuid = ( x_simcore_parent_project_uuid if isinstance(x_simcore_parent_project_uuid, ProjectID) @@ -390,8 +391,6 @@ async def run_function( # noqa: PLR0913 else None ) - # Make sure the user is allowed to execute any function - # (read/write right is checked in the other endpoint called in this method) user_api_access_rights = await wb_api_rpc.get_functions_user_api_access_rights( user_id=user_id, product_name=product_name ) @@ -400,8 +399,7 @@ async def run_function( # noqa: PLR0913 user_id=user_id, function_id=function_id, ) - # Make sure the user is allowed to execute this particular function - # (read/write right is checked in the other endpoint called in this method) + user_permissions: FunctionUserAccessRights = ( await wb_api_rpc.get_function_user_permissions( function_id=function_id, user_id=user_id, product_name=product_name @@ -415,10 +413,6 @@ async def run_function( # noqa: PLR0913 from .function_jobs_routes import function_job_status - to_run_function = await wb_api_rpc.get_function( - function_id=function_id, user_id=user_id, product_name=product_name - ) - joined_inputs = _join_inputs( to_run_function.default_inputs, function_inputs, @@ -443,10 +437,17 @@ async def run_function( # noqa: PLR0913 ): for cached_function_job in cached_function_jobs: job_status = await function_job_status( + function=to_run_function, + function_job=cached_function_job, + stored_job_status=await get_stored_job_status( + function_job_id=cached_function_job.uid, + user_id=user_id, + product_name=product_name, + wb_api_rpc=wb_api_rpc, + ), wb_api_rpc=wb_api_rpc, - director2_api=director2_api, - function_job_id=cached_function_job.uid, user_id=user_id, + director2_api=director2_api, product_name=product_name, ) if job_status.status == RunningState.SUCCESS: @@ -563,9 +564,10 @@ async def delete_function( ), ) async def map_function( # noqa: PLR0913 + request: Request, function_id: FunctionID, + to_run_function: Annotated[RegisteredFunction, Depends(get_function)], function_inputs_list: FunctionInputsList, - request: Request, wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], @@ -577,11 +579,11 @@ async def map_function( # noqa: PLR0913 x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()], x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()], ) -> RegisteredFunctionJobCollection: - function_jobs = [] function_jobs = [ await run_function( wb_api_rpc=wb_api_rpc, function_id=function_id, + to_run_function=to_run_function, function_inputs=function_inputs, product_name=product_name, user_id=user_id, diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py index 2272006cfb5..608a65ac47e 100644 --- a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py +++ b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py @@ -24,6 +24,8 @@ ) from models_library.api_schemas_webserver.licensed_items import LicensedItemRpcGetPage from models_library.functions import ( + FunctionJobStatus, + FunctionOutputs, FunctionUserAccessRights, FunctionUserApiAccessRights, ) @@ -472,6 +474,66 @@ async def get_function_output_schema( function_id=function_id, ) + async def get_function_job_status( + self, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + ) -> FunctionJobStatus: + return await functions_rpc_interface.get_function_job_status( + self._client, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + async def get_function_job_outputs( + self, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + ) -> FunctionOutputs: + return await functions_rpc_interface.get_function_job_outputs( + self._client, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + async def update_function_job_status( + self, + *, + function_job_id: FunctionJobID, + user_id: UserID, + product_name: ProductName, + job_status: FunctionJobStatus, + ) -> FunctionJobStatus: + return await functions_rpc_interface.update_function_job_status( + self._client, + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + job_status=job_status, + ) + + async def update_function_job_outputs( + self, + *, + function_job_id: FunctionJobID, + user_id: UserID, + product_name: ProductName, + outputs: FunctionOutputs, + ) -> FunctionOutputs: + return await functions_rpc_interface.update_function_job_outputs( + self._client, + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + outputs=outputs, + ) + async def find_cached_function_jobs( self, *, diff --git a/services/api-server/tests/unit/api_functions/conftest.py b/services/api-server/tests/unit/api_functions/conftest.py index f248fcfcf2d..cc6f57af1f2 100644 --- a/services/api-server/tests/unit/api_functions/conftest.py +++ b/services/api-server/tests/unit/api_functions/conftest.py @@ -142,19 +142,17 @@ def mock_registered_solver_function( sample_output_schema: JSONFunctionOutputSchema, ) -> RegisteredFunction: return RegisteredSolverFunction( - **{ - "title": "test_function", - "function_class": FunctionClass.SOLVER, - "description": "A test function", - "input_schema": sample_input_schema, - "output_schema": sample_output_schema, - "default_inputs": None, - "uid": f"{uuid4()}", - "created_at": datetime.datetime.now(datetime.UTC), - "modified_at": datetime.datetime.now(datetime.UTC), - "solver_key": "simcore/services/comp/ans-model", - "solver_version": "1.0.1", - } + title="test_function", + function_class=FunctionClass.SOLVER, + description="A test function", + input_schema=sample_input_schema, + output_schema=sample_output_schema, + default_inputs=None, + uid=uuid4(), + created_at=datetime.datetime.now(datetime.UTC), + modified_at=datetime.datetime.now(datetime.UTC), + solver_key="simcore/services/comp/ans-model", + solver_version="1.0.1", ) @@ -266,3 +264,24 @@ def _mock( ) return _mock + + +@pytest.fixture() +def mock_handler_in_study_jobs_rest_interface( + mock_wb_api_server_rpc: MockerFixture, +) -> Callable[[str, Any, Exception | None], None]: + def _mock( + handler_name: str = "", + return_value: Any = None, + exception: Exception | None = None, + ) -> None: + from simcore_service_api_server.api.routes.functions_routes import studies_jobs + + mock_wb_api_server_rpc.patch.object( + studies_jobs, + handler_name, + return_value=return_value, + side_effect=exception, + ) + + return _mock diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py new file mode 100644 index 00000000000..230103d69eb --- /dev/null +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py @@ -0,0 +1,153 @@ +import datetime +from collections.abc import Callable +from typing import Any +from uuid import uuid4 + +import httpx +from httpx import AsyncClient +from models_library.api_schemas_webserver.functions import ( + RegisteredFunctionJobCollection, + RegisteredProjectFunction, +) +from models_library.rest_pagination import PageMetaInfoLimitOffset +from servicelib.aiohttp import status +from simcore_service_api_server._meta import API_VTAG + + +async def test_get_function_job_collection( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + auth: httpx.BasicAuth, +) -> None: + mock_registered_function_job_collection = ( + RegisteredFunctionJobCollection.model_validate( + { + "uid": str(uuid4()), + "title": "Test Collection", + "description": "A test function job collection", + "job_ids": [str(uuid4()), str(uuid4())], + "created_at": datetime.datetime.now(datetime.UTC), + } + ) + ) + + mock_handler_in_functions_rpc_interface( + "get_function_job_collection", mock_registered_function_job_collection + ) + + response = await client.get( + f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + assert ( + RegisteredFunctionJobCollection.model_validate(response.json()) + == mock_registered_function_job_collection + ) + + +async def test_list_function_job_collections( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + auth: httpx.BasicAuth, +) -> None: + mock_registered_function_job_collection = ( + RegisteredFunctionJobCollection.model_validate( + { + "uid": str(uuid4()), + "title": "Test Collection", + "description": "A test function job collection", + "job_ids": [str(uuid4()), str(uuid4())], + "created_at": datetime.datetime.now(datetime.UTC), + } + ) + ) + + mock_handler_in_functions_rpc_interface( + "list_function_job_collections", + ( + [mock_registered_function_job_collection for _ in range(5)], + PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), + ), + ) + + response = await client.get(f"{API_VTAG}/function_job_collections", auth=auth) + assert response.status_code == status.HTTP_200_OK + data = response.json()["items"] + assert len(data) == 5 + assert ( + RegisteredFunctionJobCollection.model_validate(data[0]) + == mock_registered_function_job_collection + ) + + +async def test_delete_function_job_collection( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job_collection: RegisteredFunctionJobCollection, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface("delete_function_job_collection", None) + + # Now, delete the function job collection + response = await client.delete( + f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data is None + + +async def test_get_function_job_collection_jobs( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job_collection: RegisteredFunctionJobCollection, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job_collection", mock_registered_function_job_collection + ) + + response = await client.get( + f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}/function_jobs", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert len(data) == len(mock_registered_function_job_collection.job_ids) + + +async def test_list_function_job_collections_with_function_filter( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job_collection: RegisteredFunctionJobCollection, + mock_registered_project_function: RegisteredProjectFunction, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "list_function_job_collections", + ( + [mock_registered_function_job_collection for _ in range(2)], + PageMetaInfoLimitOffset(total=5, count=2, limit=2, offset=1), + ), + ) + + response = await client.get( + f"{API_VTAG}/function_job_collections?function_id={mock_registered_project_function.uid}&limit=2&offset=1", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + + assert data["total"] == 5 + assert data["limit"] == 2 + assert data["offset"] == 1 + assert len(data["items"]) == 2 + assert ( + RegisteredFunctionJobCollection.model_validate(data["items"][0]) + == mock_registered_function_job_collection + ) diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py new file mode 100644 index 00000000000..96119039746 --- /dev/null +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -0,0 +1,184 @@ +# pylint: disable=unused-argument + +import uuid +from collections.abc import Callable +from datetime import datetime +from typing import Any + +import httpx +import pytest +from httpx import AsyncClient +from models_library.api_schemas_webserver.functions import ( + ProjectFunctionJob, + RegisteredProjectFunctionJob, +) +from models_library.functions import FunctionJobStatus, RegisteredProjectFunction +from models_library.projects_state import RunningState +from models_library.rest_pagination import PageMetaInfoLimitOffset +from servicelib.aiohttp import status +from simcore_service_api_server._meta import API_VTAG +from simcore_service_api_server.models.schemas.jobs import JobStatus + + +async def test_delete_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface("delete_function_job", None) + + # Now, delete the function job + response = await client.delete( + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + + +async def test_register_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_project_function_job: ProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + """Test the register_function_job endpoint.""" + + mock_handler_in_functions_rpc_interface( + "register_function_job", mock_registered_project_function_job + ) + + response = await client.post( + f"{API_VTAG}/function_jobs", + json=mock_project_function_job.model_dump(mode="json"), + auth=auth, + ) + + assert response.status_code == status.HTTP_200_OK + assert ( + RegisteredProjectFunctionJob.model_validate(response.json()) + == mock_registered_project_function_job + ) + + +async def test_get_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_project_function_job + ) + + # Now, get the function job + response = await client.get( + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + assert ( + RegisteredProjectFunctionJob.model_validate(response.json()) + == mock_registered_project_function_job + ) + + +async def test_list_function_jobs( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "list_function_jobs", + ( + [mock_registered_project_function_job for _ in range(5)], + PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), + ), + ) + + # Now, list function jobs + response = await client.get(f"{API_VTAG}/function_jobs", auth=auth) + assert response.status_code == status.HTTP_200_OK + data = response.json()["items"] + assert len(data) == 5 + assert ( + RegisteredProjectFunctionJob.model_validate(data[0]) + == mock_registered_project_function_job + ) + + +@pytest.mark.parametrize("job_status", ["SUCCESS", "FAILED", "STARTED"]) +async def test_get_function_job_status( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function: RegisteredProjectFunction, + mock_handler_in_study_jobs_rest_interface: Callable[[str, Any], None], + auth: httpx.BasicAuth, + job_status: str, +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_project_function_job + ) + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_project_function + ) + mock_handler_in_functions_rpc_interface( + "get_function_job_status", + FunctionJobStatus(status=job_status), + ) + mock_handler_in_study_jobs_rest_interface( + "inspect_study_job", + JobStatus( + job_id=uuid.uuid4(), + submitted_at=datetime.fromisoformat("2023-01-01T00:00:00"), + started_at=datetime.fromisoformat("2023-01-01T01:00:00"), + stopped_at=datetime.fromisoformat("2023-01-01T02:00:00"), + state=RunningState(value=job_status), + ), + ) + mock_handler_in_functions_rpc_interface( + "update_function_job_status", + FunctionJobStatus(status=job_status), + ) + + response = await client.get( + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}/status", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["status"] == job_status + + +@pytest.mark.parametrize("job_outputs", [{"X+Y": 42, "X-Y": 10}]) +async def test_get_function_job_outputs( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_project_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function: RegisteredProjectFunction, + auth: httpx.BasicAuth, + job_outputs: dict[str, Any], +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_project_function_job + ) + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_project_function + ) + mock_handler_in_functions_rpc_interface("get_function_job_outputs", job_outputs) + + response = await client.get( + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}/outputs", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data == job_outputs diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index e9886170e5b..a3e61a3ec15 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -18,16 +18,13 @@ from httpx import AsyncClient from models_library.api_schemas_long_running_tasks.tasks import TaskGet from models_library.functions import ( - FunctionJobCollection, FunctionUserAccessRights, FunctionUserApiAccessRights, ProjectFunction, - ProjectFunctionJob, RegisteredFunction, RegisteredFunctionJob, RegisteredFunctionJobCollection, RegisteredProjectFunction, - RegisteredProjectFunctionJob, ) from models_library.functions_errors import ( FunctionIDNotFoundError, @@ -312,301 +309,6 @@ async def test_delete_function( assert response.status_code == status.HTTP_200_OK -async def test_register_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_project_function_job: ProjectFunctionJob, - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - """Test the register_function_job endpoint.""" - - mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_project_function_job - ) - - response = await client.post( - f"{API_VTAG}/function_jobs", - json=mock_project_function_job.model_dump(mode="json"), - auth=auth, - ) - - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_project_function_job - ) - - -async def test_get_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_project_function_job - ) - - # Now, get the function job - response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_project_function_job - ) - - -async def test_list_function_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_project_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs - response = await client.get(f"{API_VTAG}/function_jobs", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_project_function_job - ) - - -async def test_list_function_jobs_with_function_filter( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - mock_registered_project_function: RegisteredProjectFunction, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_project_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs with a filter - response = await client.get( - f"{API_VTAG}/functions/{mock_registered_project_function.uid}/jobs", auth=auth - ) - - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_project_function_job - ) - - -async def test_delete_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface("delete_function_job", None) - - # Now, delete the function job - response = await client.delete( - f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - - -async def test_register_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_function_job_collection = FunctionJobCollection.model_validate( - { - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - } - ) - - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - **mock_function_job_collection.model_dump(), - "uid": str(uuid4()), - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "register_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.post( - f"{API_VTAG}/function_job_collections", - json=mock_function_job_collection.model_dump(mode="json"), - auth=auth, - ) - - # Assert - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredFunctionJobCollection.model_validate(response.json()) - == mock_registered_function_job_collection - ) - - -async def test_get_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - "uid": str(uuid4()), - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "get_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredFunctionJobCollection.model_validate(response.json()) - == mock_registered_function_job_collection - ) - - -async def test_list_function_job_collections( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - "uid": str(uuid4()), - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "list_function_job_collections", - ( - [mock_registered_function_job_collection for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - response = await client.get(f"{API_VTAG}/function_job_collections", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredFunctionJobCollection.model_validate(data[0]) - == mock_registered_function_job_collection - ) - - -async def test_delete_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface("delete_function_job_collection", None) - - # Now, delete the function job collection - response = await client.delete( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert data is None - - -async def test_get_function_job_collection_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "get_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}/function_jobs", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert len(data) == len(mock_registered_function_job_collection.job_ids) - - -async def test_list_function_job_collections_with_function_filter( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - mock_registered_project_function: RegisteredProjectFunction, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "list_function_job_collections", - ( - [mock_registered_function_job_collection for _ in range(2)], - PageMetaInfoLimitOffset(total=5, count=2, limit=2, offset=1), - ), - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections?function_id={mock_registered_project_function.uid}&limit=2&offset=1", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - - assert data["total"] == 5 - assert data["limit"] == 2 - assert data["offset"] == 1 - assert len(data["items"]) == 2 - assert ( - RegisteredFunctionJobCollection.model_validate(data["items"][0]) - == mock_registered_function_job_collection - ) - - @pytest.mark.parametrize("user_has_execute_right", [False, True]) @pytest.mark.parametrize( "funcapi_endpoint,endpoint_inputs", [("run", {}), ("map", [{}, {}])] @@ -644,6 +346,11 @@ async def test_run_map_function_not_allowed( ), ) + mock_handler_in_functions_rpc_interface( + "get_function", + mock_registered_project_function, + ) + # Monkeypatching MagicMock because otherwise it refuse to be used in an await statement async def async_magic(): pass diff --git a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py index 470c532d03e..1a59eef385e 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py +++ b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py @@ -10,6 +10,8 @@ FunctionJobCollection, FunctionJobCollectionsListFilters, FunctionJobID, + FunctionJobStatus, + FunctionOutputs, FunctionOutputSchema, FunctionUpdate, FunctionUserApiAccessRights, @@ -386,6 +388,74 @@ async def get_function_input_schema( ) +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def get_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionJobStatus: + return await _functions_service.get_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def get_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionOutputs: + return await _functions_service.get_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def update_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + return await _functions_service.update_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + job_status=job_status, + ) + + +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def update_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + return await _functions_service.update_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + outputs=outputs, + ) + + @router.expose(reraise_if_error_type=(FunctionIDNotFoundError,)) async def get_function_output_schema( app: web.Application, diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py index 65fc448f1e3..cd7e997c3d8 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py @@ -17,6 +17,7 @@ FunctionJobCollectionAccessRightsDB, FunctionJobCollectionsListFilters, FunctionJobID, + FunctionJobStatus, FunctionOutputs, FunctionOutputSchema, FunctionsApiAccessRights, @@ -325,12 +326,14 @@ async def get_function( function_id: FunctionID, ) -> RegisteredFunctionDB: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( + await check_user_permissions( app, connection=conn, user_id=user_id, product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTIONS], + object_id=function_id, + object_type="function", + permissions=["read"], ) result = await conn.execute( @@ -340,19 +343,7 @@ async def get_function( if row is None: raise FunctionIDNotFoundError(function_id=function_id) - registered_function = RegisteredFunctionDB.model_validate(row) - - await check_user_permissions( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - object_id=function_id, - object_type="function", - permissions=["read"], - ) - - return registered_function + return RegisteredFunctionDB.model_validate(row) async def list_functions( @@ -480,6 +471,138 @@ async def list_function_jobs( ) +async def get_function_job_status( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionJobStatus: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + await check_user_permissions( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["read"], + ) + + result = await conn.execute( + function_jobs_table.select().where( + function_jobs_table.c.uuid == function_job_id + ) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return FunctionJobStatus(status=row.status) + + +async def get_function_job_outputs( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionOutputs: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + await check_user_permissions( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["read"], + ) + + result = await conn.execute( + function_jobs_table.select().where( + function_jobs_table.c.uuid == function_job_id + ) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return TypeAdapter(FunctionOutputs).validate_python(row.outputs) + + +async def update_function_job_status( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + async with transaction_context(get_asyncpg_engine(app), connection) as transaction: + await check_user_permissions( + app, + connection=transaction, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["write"], + ) + + result = await transaction.execute( + function_jobs_table.update() + .where(function_jobs_table.c.uuid == function_job_id) + .values(status=job_status.status) + .returning(function_jobs_table.c.status) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return FunctionJobStatus(status=row.status) + + +async def update_function_job_outputs( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + async with transaction_context(get_asyncpg_engine(app), connection) as transaction: + await check_user_permissions( + app, + connection=transaction, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["write"], + ) + + result = await transaction.execute( + function_jobs_table.update() + .where(function_jobs_table.c.uuid == function_job_id) + .values(outputs=outputs) + .returning(function_jobs_table.c.outputs) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return TypeAdapter(FunctionOutputs).validate_python(row.outputs) + + async def list_function_job_collections( app: web.Application, connection: AsyncConnection | None = None, @@ -598,17 +721,6 @@ async def delete_function( function_id: FunctionID, ) -> None: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTIONS, - FunctionsApiAccessRights.WRITE_FUNCTIONS, - ], - ) - await check_user_permissions( app, connection=transaction, @@ -644,17 +756,6 @@ async def update_function( function: FunctionUpdate, ) -> RegisteredFunctionDB: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTIONS, - FunctionsApiAccessRights.WRITE_FUNCTIONS, - ], - ) - await check_user_permissions( app, transaction, @@ -662,7 +763,7 @@ async def update_function( product_name=product_name, object_id=function_id, object_type="function", - permissions=["write"], + permissions=["read", "write"], ) result = await transaction.execute( @@ -688,13 +789,6 @@ async def get_function_job( function_job_id: FunctionID, ) -> RegisteredFunctionJobDB: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], - ) await check_user_permissions( app, connection=conn, @@ -727,16 +821,6 @@ async def delete_function_job( function_job_id: FunctionID, ) -> None: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTION_JOBS, - FunctionsApiAccessRights.WRITE_FUNCTION_JOBS, - ], - ) await check_user_permissions( app, connection=transaction, @@ -775,14 +859,6 @@ async def find_cached_function_jobs( inputs: FunctionInputs, ) -> list[RegisteredFunctionJobDB] | None: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], - ) - jobs: list[RegisteredFunctionJobDB] = [] async for row in await conn.stream( function_jobs_table.select().where( @@ -821,13 +897,6 @@ async def get_function_job_collection( function_job_collection_id: FunctionID, ) -> tuple[RegisteredFunctionJobCollectionDB, list[FunctionJobID]]: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOB_COLLECTIONS], - ) await check_user_permissions( app, connection=conn, @@ -875,16 +944,6 @@ async def delete_function_job_collection( function_job_collection_id: FunctionID, ) -> None: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTION_JOB_COLLECTIONS, - FunctionsApiAccessRights.WRITE_FUNCTION_JOB_COLLECTIONS, - ], - ) await check_user_permissions( app, connection=transaction, @@ -1141,6 +1200,21 @@ async def check_user_permissions( object_type: Literal["function", "function_job", "function_job_collection"], permissions: list[Literal["read", "write", "execute"]], ) -> bool: + + api_access_rights = [ + getattr( + FunctionsApiAccessRights, f"{permission.upper()}_{object_type.upper()}S" + ) + for permission in permissions + ] + await check_user_api_access_rights( + app, + connection=connection, + user_id=user_id, + product_name=product_name, + api_access_rights=api_access_rights, + ) + user_permissions = await get_user_permissions( app, connection=connection, @@ -1227,6 +1301,6 @@ async def check_user_api_access_rights( for api_access_right in api_access_rights: if not getattr(user_api_access_rights, api_access_right): - raise _ERRORS_MAP[api_access_right] + raise _ERRORS_MAP[api_access_right](user_id=user_id) return True diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py index e23cf0317c3..df8ed7284ad 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py @@ -13,6 +13,8 @@ FunctionJobCollectionsListFilters, FunctionJobDB, FunctionJobID, + FunctionJobStatus, + FunctionOutputs, FunctionOutputSchema, FunctionUpdate, FunctionUserAccessRights, @@ -431,6 +433,70 @@ async def get_function_user_permissions( ) +async def get_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionJobStatus: + return await _functions_repository.get_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + +async def get_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionOutputs: + return await _functions_repository.get_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + +async def update_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + return await _functions_repository.update_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + outputs=outputs, + ) + + +async def update_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + return await _functions_repository.update_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + job_status=job_status, + ) + + async def get_functions_user_api_access_rights( app: web.Application, *, diff --git a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py index ef0ec12205c..6101d09fe5c 100644 --- a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py +++ b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py @@ -11,6 +11,7 @@ ProjectFunction, ProjectFunctionJob, ) +from models_library.functions import FunctionJobStatus from models_library.functions_errors import ( FunctionJobIDNotFoundError, FunctionJobReadAccessDeniedError, @@ -357,3 +358,133 @@ async def test_find_cached_function_jobs( # Assert the cached jobs does not contain the registered job for the other user assert cached_jobs is None + + +@pytest.mark.parametrize( + "user_role", + [UserRole.USER], +) +async def test_update_function_job_status( + client: TestClient, + rpc_client: RabbitMQRPCClient, + add_user_function_api_access_rights: None, + logged_user: UserInfoDict, + mock_function: ProjectFunction, + osparc_product_name: ProductName, +): + # Register the function first + registered_function = await functions_rpc.register_function( + rabbitmq_rpc_client=rpc_client, + function=mock_function, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + function_job = ProjectFunctionJob( + function_uid=registered_function.uid, + title="Test Function Job", + description="A test function job", + project_job_id=uuid4(), + inputs={"input1": "value1"}, + outputs={"output1": "result1"}, + ) + + # Register the function job + registered_job = await functions_rpc.register_function_job( + rabbitmq_rpc_client=rpc_client, + function_job=function_job, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + old_job_status = await functions_rpc.get_function_job_status( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + assert old_job_status.status == "created" + + # Update the function job status + new_status = FunctionJobStatus(status="COMPLETED") + updated_job_status = await functions_rpc.update_function_job_status( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + job_status=new_status, + ) + + # Assert the updated job status matches the new status + assert updated_job_status == new_status + + +@pytest.mark.parametrize( + "user_role", + [UserRole.USER], +) +async def test_update_function_job_outputs( + client: TestClient, + rpc_client: RabbitMQRPCClient, + add_user_function_api_access_rights: None, + logged_user: UserInfoDict, + mock_function: ProjectFunction, + osparc_product_name: ProductName, +): + # Register the function first + registered_function = await functions_rpc.register_function( + rabbitmq_rpc_client=rpc_client, + function=mock_function, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + function_job = ProjectFunctionJob( + function_uid=registered_function.uid, + title="Test Function Job", + description="A test function job", + project_job_id=uuid4(), + inputs={"input1": "value1"}, + outputs=None, + ) + + # Register the function job + registered_job = await functions_rpc.register_function_job( + rabbitmq_rpc_client=rpc_client, + function_job=function_job, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + received_outputs = await functions_rpc.get_function_job_outputs( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + assert received_outputs is None + + new_outputs = {"output1": "new_result1", "output2": "new_result2"} + + # Update the function job outputs + updated_outputs = await functions_rpc.update_function_job_outputs( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + outputs=new_outputs, + ) + + # Assert the updated outputs match the new outputs + assert updated_outputs == new_outputs + + # Update the function job outputs + received_outputs = await functions_rpc.get_function_job_outputs( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + assert received_outputs == new_outputs diff --git a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py index 617c1b00b9a..f4974ad3336 100644 --- a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py +++ b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py @@ -18,7 +18,7 @@ from models_library.functions_errors import ( FunctionIDNotFoundError, FunctionReadAccessDeniedError, - FunctionsReadApiAccessDeniedError, + FunctionsWriteApiAccessDeniedError, FunctionWriteAccessDeniedError, ) from models_library.products import ProductName @@ -98,7 +98,7 @@ async def test_register_get_delete_function( product_name=osparc_product_name, ) - with pytest.raises(FunctionsReadApiAccessDeniedError): + with pytest.raises(FunctionsWriteApiAccessDeniedError): # Attempt to delete the function in another product await functions_rpc.delete_function( rabbitmq_rpc_client=rpc_client, @@ -375,7 +375,7 @@ async def test_update_function_title( # Update the function's title by other user updated_title = "Updated Function Title by Other User" registered_function.title = updated_title - with pytest.raises(FunctionWriteAccessDeniedError): + with pytest.raises(FunctionReadAccessDeniedError): updated_function = await functions_rpc.update_function_title( rabbitmq_rpc_client=rpc_client, function_id=registered_function.uid,