From c72b338b592d029b38fa648d968c1d0363df3a74 Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Thu, 17 Jul 2025 11:28:15 +0200 Subject: [PATCH 1/7] Adding code to store job output/status in main db --- .../functions/functions_rpc_interface.py | 19 ++ .../api/routes/function_jobs_routes.py | 26 +- .../services_rpc/wb_api_server.py | 15 ++ ...st_api_routers_function_job_collections.py | 153 +++++++++++ .../test_api_routers_function_jobs.py | 148 +++++++++++ .../test_api_routers_functions.py | 238 +----------------- .../functions/_controller/_functions_rpc.py | 20 ++ .../functions/_functions_repository.py | 81 ++++++ .../functions/_functions_service.py | 16 ++ 9 files changed, 473 insertions(+), 243 deletions(-) create mode 100644 services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py create mode 100644 services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py index 00a982add875..118dcae28ceb 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py @@ -17,6 +17,7 @@ RegisteredFunctionJobCollection, ) from models_library.functions import ( + FunctionJobStatus, FunctionUserAccessRights, FunctionUserApiAccessRights, ) @@ -300,6 +301,24 @@ async def get_function_job( return TypeAdapter(RegisteredFunctionJob).validate_python(result) +@log_decorator(_logger, level=logging.DEBUG) +async def get_function_job_status( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + function_job_id: FunctionJobID, + product_name: ProductName, +) -> FunctionJobStatus: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("get_function_job_status"), + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionJobStatus).validate_python(result) + + @log_decorator(_logger, level=logging.DEBUG) async def delete_function_job( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index cd461b95fb3a..fe874fe64807 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -3,7 +3,6 @@ from fastapi import APIRouter, Depends, status from fastapi_pagination.api import create_page from models_library.api_schemas_webserver.functions import ( - Function, FunctionClass, FunctionJob, FunctionJobID, @@ -11,11 +10,13 @@ FunctionOutputs, RegisteredFunctionJob, ) +from models_library.functions import RegisteredFunction from models_library.functions_errors import ( UnsupportedFunctionClassError, UnsupportedFunctionFunctionJobClassCombinationError, ) from models_library.products import ProductName +from models_library.projects_state import RunningState from models_library.users import UserID from sqlalchemy.ext.asyncio import AsyncEngine @@ -181,6 +182,12 @@ async def function_job_status( user_id=user_id, product_name=product_name, ) + old_job_status = await wb_api_rpc.get_function_job_status( + function_job_id=function_job.uid, user_id=user_id, product_name=product_name + ) + + if old_job_status.status in (RunningState.SUCCESS, RunningState.FAILED): + return old_job_status if ( function.function_class == FunctionClass.PROJECT @@ -192,7 +199,7 @@ async def function_job_status( user_id=user_id, director2_api=director2_api, ) - return FunctionJobStatus(status=job_status.state) + job_status = FunctionJobStatus(status=job_status.state) if (function.function_class == FunctionClass.SOLVER) and ( function_job.function_class == FunctionClass.SOLVER @@ -204,12 +211,13 @@ async def function_job_status( user_id=user_id, director2_api=director2_api, ) - return FunctionJobStatus(status=job_status.state) - - raise UnsupportedFunctionFunctionJobClassCombinationError( - function_class=function.function_class, - function_job_class=function_job.function_class, - ) + job_status = FunctionJobStatus(status=job_status.state) + else: + raise UnsupportedFunctionFunctionJobClassCombinationError( + function_class=function.function_class, + function_job_class=function_job.function_class, + ) + return job_status async def get_function_from_functionjobid( @@ -217,7 +225,7 @@ async def get_function_from_functionjobid( function_job_id: FunctionJobID, user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], -) -> tuple[Function, FunctionJob]: +) -> tuple[RegisteredFunction, RegisteredFunctionJob]: function_job = await get_function_job( wb_api_rpc=wb_api_rpc, function_job_id=function_job_id, diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py index 2272006cfb5d..b72609a13776 100644 --- a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py +++ b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py @@ -24,6 +24,7 @@ ) from models_library.api_schemas_webserver.licensed_items import LicensedItemRpcGetPage from models_library.functions import ( + FunctionJobStatus, FunctionUserAccessRights, FunctionUserApiAccessRights, ) @@ -472,6 +473,20 @@ async def get_function_output_schema( function_id=function_id, ) + async def get_function_job_status( + self, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + ) -> FunctionJobStatus: + return await functions_rpc_interface.get_function_job_status( + self._client, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + async def find_cached_function_jobs( self, *, diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py new file mode 100644 index 000000000000..230103d69eb4 --- /dev/null +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py @@ -0,0 +1,153 @@ +import datetime +from collections.abc import Callable +from typing import Any +from uuid import uuid4 + +import httpx +from httpx import AsyncClient +from models_library.api_schemas_webserver.functions import ( + RegisteredFunctionJobCollection, + RegisteredProjectFunction, +) +from models_library.rest_pagination import PageMetaInfoLimitOffset +from servicelib.aiohttp import status +from simcore_service_api_server._meta import API_VTAG + + +async def test_get_function_job_collection( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + auth: httpx.BasicAuth, +) -> None: + mock_registered_function_job_collection = ( + RegisteredFunctionJobCollection.model_validate( + { + "uid": str(uuid4()), + "title": "Test Collection", + "description": "A test function job collection", + "job_ids": [str(uuid4()), str(uuid4())], + "created_at": datetime.datetime.now(datetime.UTC), + } + ) + ) + + mock_handler_in_functions_rpc_interface( + "get_function_job_collection", mock_registered_function_job_collection + ) + + response = await client.get( + f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + assert ( + RegisteredFunctionJobCollection.model_validate(response.json()) + == mock_registered_function_job_collection + ) + + +async def test_list_function_job_collections( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + auth: httpx.BasicAuth, +) -> None: + mock_registered_function_job_collection = ( + RegisteredFunctionJobCollection.model_validate( + { + "uid": str(uuid4()), + "title": "Test Collection", + "description": "A test function job collection", + "job_ids": [str(uuid4()), str(uuid4())], + "created_at": datetime.datetime.now(datetime.UTC), + } + ) + ) + + mock_handler_in_functions_rpc_interface( + "list_function_job_collections", + ( + [mock_registered_function_job_collection for _ in range(5)], + PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), + ), + ) + + response = await client.get(f"{API_VTAG}/function_job_collections", auth=auth) + assert response.status_code == status.HTTP_200_OK + data = response.json()["items"] + assert len(data) == 5 + assert ( + RegisteredFunctionJobCollection.model_validate(data[0]) + == mock_registered_function_job_collection + ) + + +async def test_delete_function_job_collection( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job_collection: RegisteredFunctionJobCollection, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface("delete_function_job_collection", None) + + # Now, delete the function job collection + response = await client.delete( + f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data is None + + +async def test_get_function_job_collection_jobs( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job_collection: RegisteredFunctionJobCollection, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job_collection", mock_registered_function_job_collection + ) + + response = await client.get( + f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}/function_jobs", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert len(data) == len(mock_registered_function_job_collection.job_ids) + + +async def test_list_function_job_collections_with_function_filter( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job_collection: RegisteredFunctionJobCollection, + mock_registered_project_function: RegisteredProjectFunction, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "list_function_job_collections", + ( + [mock_registered_function_job_collection for _ in range(2)], + PageMetaInfoLimitOffset(total=5, count=2, limit=2, offset=1), + ), + ) + + response = await client.get( + f"{API_VTAG}/function_job_collections?function_id={mock_registered_project_function.uid}&limit=2&offset=1", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + + assert data["total"] == 5 + assert data["limit"] == 2 + assert data["offset"] == 1 + assert len(data["items"]) == 2 + assert ( + RegisteredFunctionJobCollection.model_validate(data["items"][0]) + == mock_registered_function_job_collection + ) diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py new file mode 100644 index 000000000000..c27eba58db33 --- /dev/null +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -0,0 +1,148 @@ +import uuid +from collections.abc import Callable +from datetime import datetime +from typing import Any + +import httpx +import pytest +from httpx import AsyncClient +from models_library.api_schemas_webserver.functions import ( + ProjectFunctionJob, + RegisteredProjectFunctionJob, +) +from models_library.functions import FunctionJobStatus, RegisteredProjectFunction +from models_library.projects_state import RunningState +from models_library.rest_pagination import PageMetaInfoLimitOffset +from servicelib.aiohttp import status +from simcore_service_api_server._meta import API_VTAG +from simcore_service_api_server.models.schemas.jobs import JobStatus + + +async def test_delete_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface("delete_function_job", None) + + # Now, delete the function job + response = await client.delete( + f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth + ) + assert response.status_code == status.HTTP_200_OK + + +async def test_register_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_function_job: ProjectFunctionJob, + mock_registered_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + """Test the register_function_job endpoint.""" + + mock_handler_in_functions_rpc_interface( + "register_function_job", mock_registered_function_job + ) + + response = await client.post( + f"{API_VTAG}/function_jobs", + json=mock_function_job.model_dump(mode="json"), + auth=auth, + ) + + assert response.status_code == status.HTTP_200_OK + assert ( + RegisteredProjectFunctionJob.model_validate(response.json()) + == mock_registered_function_job + ) + + +async def test_get_function_job( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_function_job + ) + + # Now, get the function job + response = await client.get( + f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth + ) + assert response.status_code == status.HTTP_200_OK + assert ( + RegisteredProjectFunctionJob.model_validate(response.json()) + == mock_registered_function_job + ) + + +async def test_list_function_jobs( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job: RegisteredProjectFunctionJob, + auth: httpx.BasicAuth, +) -> None: + + mock_handler_in_functions_rpc_interface( + "list_function_jobs", + ( + [mock_registered_function_job for _ in range(5)], + PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), + ), + ) + + # Now, list function jobs + response = await client.get(f"{API_VTAG}/function_jobs", auth=auth) + assert response.status_code == status.HTTP_200_OK + data = response.json()["items"] + assert len(data) == 5 + assert ( + RegisteredProjectFunctionJob.model_validate(data[0]) + == mock_registered_function_job + ) + + +@pytest.mark.parametrize("job_status", ["SUCCESS", "FAILED", "RUNNING"]) +async def test_get_function_job_status( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function: RegisteredProjectFunction, + auth: httpx.BasicAuth, + job_status: str, +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_function_job + ) + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_project_function + ) + mock_handler_in_functions_rpc_interface( + "get_function_job_status", + FunctionJobStatus(status=job_status), + ) + mock_handler_in_functions_rpc_interface( + "inspect_study_job", + JobStatus( + job_id=uuid.uuid4(), + submitted_at=datetime.fromisoformat("2023-01-01T00:00:00"), + started_at=datetime.fromisoformat("2023-01-01T01:00:00"), + stopped_at=datetime.fromisoformat("2023-01-01T02:00:00"), + state=RunningState(job_status), + ), + ) + + response = await client.get( + f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}/status", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["status"] == job_status diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index 98acc73352d1..1bb046bb14fd 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -19,7 +19,6 @@ from models_library.api_schemas_webserver.functions import ( FunctionJobCollection, ProjectFunction, - ProjectFunctionJob, RegisteredFunctionJobCollection, RegisteredProjectFunction, RegisteredProjectFunctionJob, @@ -317,80 +316,6 @@ async def test_delete_function( assert response.status_code == status.HTTP_200_OK -async def test_register_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_function_job: ProjectFunctionJob, - mock_registered_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - """Test the register_function_job endpoint.""" - - mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_function_job - ) - - response = await client.post( - f"{API_VTAG}/function_jobs", - json=mock_function_job.model_dump(mode="json"), - auth=auth, - ) - - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_function_job - ) - - -async def test_get_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_function_job - ) - - # Now, get the function job - response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_function_job - ) - - -async def test_list_function_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs - response = await client.get(f"{API_VTAG}/function_jobs", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_function_job - ) - - async def test_list_function_jobs_with_function_filter( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], @@ -421,22 +346,6 @@ async def test_list_function_jobs_with_function_filter( ) -async def test_delete_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface("delete_function_job", None) - - # Now, delete the function job - response = await client.delete( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth - ) - assert response.status_code == status.HTTP_200_OK - - async def test_register_function_job_collection( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], @@ -478,145 +387,6 @@ async def test_register_function_job_collection( ) -async def test_get_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - "uid": str(uuid4()), - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "get_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredFunctionJobCollection.model_validate(response.json()) - == mock_registered_function_job_collection - ) - - -async def test_list_function_job_collections( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - "uid": str(uuid4()), - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "list_function_job_collections", - ( - [mock_registered_function_job_collection for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - response = await client.get(f"{API_VTAG}/function_job_collections", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredFunctionJobCollection.model_validate(data[0]) - == mock_registered_function_job_collection - ) - - -async def test_delete_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface("delete_function_job_collection", None) - - # Now, delete the function job collection - response = await client.delete( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert data is None - - -async def test_get_function_job_collection_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "get_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}/function_jobs", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert len(data) == len(mock_registered_function_job_collection.job_ids) - - -async def test_list_function_job_collections_with_function_filter( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - mock_registered_project_function: RegisteredProjectFunction, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "list_function_job_collections", - ( - [mock_registered_function_job_collection for _ in range(2)], - PageMetaInfoLimitOffset(total=5, count=2, limit=2, offset=1), - ), - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections?function_id={mock_registered_project_function.uid}&limit=2&offset=1", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - - assert data["total"] == 5 - assert data["limit"] == 2 - assert data["offset"] == 1 - assert len(data["items"]) == 2 - assert ( - RegisteredFunctionJobCollection.model_validate(data["items"][0]) - == mock_registered_function_job_collection - ) - - @pytest.mark.parametrize("user_has_execute_right", [False, True]) @pytest.mark.parametrize( "funcapi_endpoint,endpoint_inputs", [("run", {}), ("map", [{}, {}])] @@ -758,7 +528,7 @@ def _default_side_effect( ), ) - headers = dict() + headers = {} if parent_project_uuid: headers[X_SIMCORE_PARENT_PROJECT_UUID] = parent_project_uuid if parent_node_uuid: @@ -802,7 +572,7 @@ async def test_map_function_parent_info( capture: str, ) -> None: - side_effect_checks = dict() + side_effect_checks = {} def _default_side_effect( side_effect_checks: dict, @@ -864,7 +634,7 @@ def _default_side_effect( ), ) - headers = dict() + headers = {} if parent_project_uuid: headers[X_SIMCORE_PARENT_PROJECT_UUID] = parent_project_uuid if parent_node_uuid: @@ -877,5 +647,5 @@ def _default_side_effect( headers=headers, ) if expected_status_code == status.HTTP_200_OK: - assert side_effect_checks["headers_checked"] == True + assert side_effect_checks["headers_checked"] is True assert response.status_code == expected_status_code diff --git a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py index f77b07011150..99fefface395 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py +++ b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py @@ -10,6 +10,7 @@ FunctionJobCollection, FunctionJobCollectionsListFilters, FunctionJobID, + FunctionJobStatus, FunctionOutputSchema, FunctionUserApiAccessRights, RegisteredFunction, @@ -385,6 +386,25 @@ async def get_function_input_schema( ) +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError)) +async def get_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionJobStatus: + """ + Returns the status of a function job. + """ + return await _functions_service.get_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + @router.expose(reraise_if_error_type=(FunctionIDNotFoundError,)) async def get_function_output_schema( app: web.Application, diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py index 7f9daf295d1f..09bc2f360df5 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py @@ -17,6 +17,7 @@ FunctionJobCollectionAccessRightsDB, FunctionJobCollectionsListFilters, FunctionJobID, + FunctionJobStatus, FunctionOutputs, FunctionOutputSchema, FunctionsApiAccessRights, @@ -473,6 +474,86 @@ async def list_function_jobs( ) +async def get_function_job_status( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionJobStatus: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + await check_user_api_access_rights( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], + ) + await check_user_permissions( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["read"], + ) + + result = await conn.execute( + function_jobs_table.select().where( + function_jobs_table.c.uuid == function_job_id + ) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return FunctionJobStatus(status=row.status) # type: ignore[no-any-return] + + +async def update_function_job_status( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + await check_user_api_access_rights( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], + ) + await check_user_permissions( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["write"], + ) + + result = await conn.execute( + function_jobs_table.update() + .where(function_jobs_table.c.uuid == function_job_id) + .values(status=job_status.status) + .returning(*_FUNCTION_JOBS_TABLE_COLS) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return FunctionJobStatus(status=row.status) # type: ignore[no-any-return] + + async def list_function_job_collections( app: web.Application, connection: AsyncConnection | None = None, diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py index d74dbe522174..49fec046fe01 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py @@ -13,6 +13,7 @@ FunctionJobCollectionsListFilters, FunctionJobDB, FunctionJobID, + FunctionJobStatus, FunctionOutputSchema, FunctionUserAccessRights, FunctionUserApiAccessRights, @@ -448,6 +449,21 @@ async def get_function_user_permissions( ) +async def get_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionJobStatus: + return await _functions_repository.get_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + async def get_functions_user_api_access_rights( app: web.Application, *, From dc3938053079c12fc0bba484e98042dfe4d0bc4a Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Tue, 22 Jul 2025 16:03:22 +0200 Subject: [PATCH 2/7] Add tests to status/output fetching function jobs --- .../functions/functions_rpc_interface.py | 59 ++++++++ .../api/routes/function_jobs_routes.py | 41 ++++-- .../services_rpc/wb_api_server.py | 47 +++++++ .../tests/unit/api_functions/conftest.py | 21 +++ .../test_api_routers_function_jobs.py | 40 +++++- .../functions/_controller/_functions_rpc.py | 58 +++++++- .../functions/_functions_repository.py | 94 ++++++++++++- .../functions/_functions_service.py | 50 +++++++ .../test_function_jobs_controller_rpc.py | 131 ++++++++++++++++++ 9 files changed, 516 insertions(+), 25 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py index 118dcae28ceb..89e387adffd5 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py @@ -18,6 +18,7 @@ ) from models_library.functions import ( FunctionJobStatus, + FunctionOutputs, FunctionUserAccessRights, FunctionUserApiAccessRights, ) @@ -319,6 +320,64 @@ async def get_function_job_status( return TypeAdapter(FunctionJobStatus).validate_python(result) +@log_decorator(_logger, level=logging.DEBUG) +async def get_function_job_outputs( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + function_job_id: FunctionJobID, + product_name: ProductName, +) -> FunctionOutputs: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("get_function_job_outputs"), + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionOutputs).validate_python(result) + + +@log_decorator(_logger, level=logging.DEBUG) +async def update_function_job_status( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("update_function_job_status"), + function_job_id=function_job_id, + job_status=job_status, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionJobStatus).validate_python(result) + + +@log_decorator(_logger, level=logging.DEBUG) +async def update_function_job_outputs( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + result = await rabbitmq_rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + TypeAdapter(RPCMethodName).validate_python("update_function_job_outputs"), + function_job_id=function_job_id, + outputs=outputs, + user_id=user_id, + product_name=product_name, + ) + return TypeAdapter(FunctionOutputs).validate_python(result) + + @log_decorator(_logger, level=logging.DEBUG) async def delete_function_job( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index fe874fe64807..465f40e39a6f 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -199,9 +199,7 @@ async def function_job_status( user_id=user_id, director2_api=director2_api, ) - job_status = FunctionJobStatus(status=job_status.state) - - if (function.function_class == FunctionClass.SOLVER) and ( + elif (function.function_class == FunctionClass.SOLVER) and ( function_job.function_class == FunctionClass.SOLVER ): job_status = await solvers_jobs.inspect_job( @@ -211,13 +209,20 @@ async def function_job_status( user_id=user_id, director2_api=director2_api, ) - job_status = FunctionJobStatus(status=job_status.state) else: raise UnsupportedFunctionFunctionJobClassCombinationError( function_class=function.function_class, function_job_class=function_job.function_class, ) - return job_status + + new_job_status = FunctionJobStatus(status=job_status.state) + + return await wb_api_rpc.update_function_job_status( + function_job_id=function_job.uid, + user_id=user_id, + product_name=product_name, + job_status=new_job_status, + ) async def get_function_from_functionjobid( @@ -255,7 +260,7 @@ async def get_function_from_functionjobid( changelog=CHANGE_LOGS["function_job_outputs"], ), ) -async def function_job_outputs( +async def get_function_job_outputs( function_job_id: FunctionJobID, webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], user_id: Annotated[UserID, Depends(get_current_user_id)], @@ -271,11 +276,18 @@ async def function_job_outputs( product_name=product_name, ) + old_job_outputs = await wb_api_rpc.get_function_job_outputs( + function_job_id=function_job.uid, user_id=user_id, product_name=product_name + ) + + if old_job_outputs is not None: + return old_job_outputs + if ( function.function_class == FunctionClass.PROJECT and function_job.function_class == FunctionClass.PROJECT ): - return dict( + new_outputs = dict( ( await studies_jobs.get_study_job_outputs( study_id=function.project_id, @@ -286,12 +298,11 @@ async def function_job_outputs( ) ).results ) - - if ( + elif ( function.function_class == FunctionClass.SOLVER and function_job.function_class == FunctionClass.SOLVER ): - return dict( + new_outputs = dict( ( await solvers_jobs_read.get_job_outputs( solver_key=function.solver_key, @@ -304,4 +315,12 @@ async def function_job_outputs( ) ).results ) - raise UnsupportedFunctionClassError(function_class=function.function_class) + else: + raise UnsupportedFunctionClassError(function_class=function.function_class) + + return await wb_api_rpc.update_function_job_outputs( + function_job_id=function_job.uid, + user_id=user_id, + product_name=product_name, + outputs=new_outputs, + ) diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py index b72609a13776..608a65ac47e7 100644 --- a/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py +++ b/services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py @@ -25,6 +25,7 @@ from models_library.api_schemas_webserver.licensed_items import LicensedItemRpcGetPage from models_library.functions import ( FunctionJobStatus, + FunctionOutputs, FunctionUserAccessRights, FunctionUserApiAccessRights, ) @@ -487,6 +488,52 @@ async def get_function_job_status( function_job_id=function_job_id, ) + async def get_function_job_outputs( + self, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + ) -> FunctionOutputs: + return await functions_rpc_interface.get_function_job_outputs( + self._client, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + async def update_function_job_status( + self, + *, + function_job_id: FunctionJobID, + user_id: UserID, + product_name: ProductName, + job_status: FunctionJobStatus, + ) -> FunctionJobStatus: + return await functions_rpc_interface.update_function_job_status( + self._client, + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + job_status=job_status, + ) + + async def update_function_job_outputs( + self, + *, + function_job_id: FunctionJobID, + user_id: UserID, + product_name: ProductName, + outputs: FunctionOutputs, + ) -> FunctionOutputs: + return await functions_rpc_interface.update_function_job_outputs( + self._client, + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + outputs=outputs, + ) + async def find_cached_function_jobs( self, *, diff --git a/services/api-server/tests/unit/api_functions/conftest.py b/services/api-server/tests/unit/api_functions/conftest.py index 1df264f0e397..72df189570f5 100644 --- a/services/api-server/tests/unit/api_functions/conftest.py +++ b/services/api-server/tests/unit/api_functions/conftest.py @@ -232,3 +232,24 @@ def _mock( ) return _mock + + +@pytest.fixture() +def mock_handler_in_study_jobs_rest_interface( + mock_wb_api_server_rpc: MockerFixture, +) -> Callable[[str, Any, Exception | None], None]: + def _mock( + handler_name: str = "", + return_value: Any = None, + exception: Exception | None = None, + ) -> None: + from simcore_service_api_server.api.routes.functions_routes import studies_jobs + + mock_wb_api_server_rpc.patch.object( + studies_jobs, + handler_name, + return_value=return_value, + side_effect=exception, + ) + + return _mock diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py index c27eba58db33..26023e11a969 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -1,3 +1,5 @@ +# pylint: disable=unused-argument + import uuid from collections.abc import Callable from datetime import datetime @@ -108,12 +110,13 @@ async def test_list_function_jobs( ) -@pytest.mark.parametrize("job_status", ["SUCCESS", "FAILED", "RUNNING"]) +@pytest.mark.parametrize("job_status", ["SUCCESS", "FAILED", "STARTED"]) async def test_get_function_job_status( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_function_job: RegisteredProjectFunctionJob, mock_registered_project_function: RegisteredProjectFunction, + mock_handler_in_study_jobs_rest_interface: Callable[[str, Any], None], auth: httpx.BasicAuth, job_status: str, ) -> None: @@ -128,16 +131,20 @@ async def test_get_function_job_status( "get_function_job_status", FunctionJobStatus(status=job_status), ) - mock_handler_in_functions_rpc_interface( + mock_handler_in_study_jobs_rest_interface( "inspect_study_job", JobStatus( job_id=uuid.uuid4(), submitted_at=datetime.fromisoformat("2023-01-01T00:00:00"), started_at=datetime.fromisoformat("2023-01-01T01:00:00"), stopped_at=datetime.fromisoformat("2023-01-01T02:00:00"), - state=RunningState(job_status), + state=RunningState(value=job_status), ), ) + mock_handler_in_functions_rpc_interface( + "update_function_job_status", + FunctionJobStatus(status=job_status), + ) response = await client.get( f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}/status", @@ -146,3 +153,30 @@ async def test_get_function_job_status( assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == job_status + + +@pytest.mark.parametrize("job_outputs", [{"X+Y": 42, "X-Y": 10}]) +async def test_get_function_job_outputs( + client: AsyncClient, + mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], + mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function: RegisteredProjectFunction, + auth: httpx.BasicAuth, + job_outputs: dict[str, Any], +) -> None: + + mock_handler_in_functions_rpc_interface( + "get_function_job", mock_registered_function_job + ) + mock_handler_in_functions_rpc_interface( + "get_function", mock_registered_project_function + ) + mock_handler_in_functions_rpc_interface("get_function_job_outputs", job_outputs) + + response = await client.get( + f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}/outputs", + auth=auth, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data == job_outputs diff --git a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py index 99fefface395..0d0ee041356e 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py +++ b/services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py @@ -11,6 +11,7 @@ FunctionJobCollectionsListFilters, FunctionJobID, FunctionJobStatus, + FunctionOutputs, FunctionOutputSchema, FunctionUserApiAccessRights, RegisteredFunction, @@ -386,7 +387,7 @@ async def get_function_input_schema( ) -@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError)) +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) async def get_function_job_status( app: web.Application, *, @@ -394,9 +395,6 @@ async def get_function_job_status( product_name: ProductName, function_job_id: FunctionJobID, ) -> FunctionJobStatus: - """ - Returns the status of a function job. - """ return await _functions_service.get_function_job_status( app=app, user_id=user_id, @@ -405,6 +403,58 @@ async def get_function_job_status( ) +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def get_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionOutputs: + return await _functions_service.get_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def update_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + return await _functions_service.update_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + job_status=job_status, + ) + + +@router.expose(reraise_if_error_type=(FunctionJobIDNotFoundError,)) +async def update_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + return await _functions_service.update_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + outputs=outputs, + ) + + @router.expose(reraise_if_error_type=(FunctionIDNotFoundError,)) async def get_function_output_schema( app: web.Application, diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py index dee02c4f49d4..808386d0165c 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py @@ -516,7 +516,46 @@ async def get_function_job_status( if row is None: raise FunctionJobIDNotFoundError(function_job_id=function_job_id) - return FunctionJobStatus(status=row.status) # type: ignore[no-any-return] + return FunctionJobStatus(status=row.status) + + +async def get_function_job_outputs( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionOutputs: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + await check_user_api_access_rights( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], + ) + await check_user_permissions( + app, + connection=conn, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["read"], + ) + + result = await conn.execute( + function_jobs_table.select().where( + function_jobs_table.c.uuid == function_job_id + ) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return TypeAdapter(FunctionOutputs).validate_python(row.outputs) async def update_function_job_status( @@ -528,17 +567,17 @@ async def update_function_job_status( function_job_id: FunctionJobID, job_status: FunctionJobStatus, ) -> FunctionJobStatus: - async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + async with transaction_context(get_asyncpg_engine(app), connection) as transaction: await check_user_api_access_rights( app, - connection=conn, + connection=transaction, user_id=user_id, product_name=product_name, api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], ) await check_user_permissions( app, - connection=conn, + connection=transaction, user_id=user_id, product_name=product_name, object_type="function_job", @@ -546,18 +585,59 @@ async def update_function_job_status( permissions=["write"], ) - result = await conn.execute( + result = await transaction.execute( function_jobs_table.update() .where(function_jobs_table.c.uuid == function_job_id) .values(status=job_status.status) - .returning(*_FUNCTION_JOBS_TABLE_COLS) + .returning(function_jobs_table.c.status) + ) + row = result.one_or_none() + + if row is None: + raise FunctionJobIDNotFoundError(function_job_id=function_job_id) + + return FunctionJobStatus(status=row.status) + + +async def update_function_job_outputs( + app: web.Application, + connection: AsyncConnection | None = None, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + async with transaction_context(get_asyncpg_engine(app), connection) as transaction: + await check_user_api_access_rights( + app, + connection=transaction, + user_id=user_id, + product_name=product_name, + api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], + ) + await check_user_permissions( + app, + connection=transaction, + user_id=user_id, + product_name=product_name, + object_type="function_job", + object_id=function_job_id, + permissions=["write"], + ) + + result = await transaction.execute( + function_jobs_table.update() + .where(function_jobs_table.c.uuid == function_job_id) + .values(outputs=outputs) + .returning(function_jobs_table.c.outputs) ) row = result.one_or_none() if row is None: raise FunctionJobIDNotFoundError(function_job_id=function_job_id) - return FunctionJobStatus(status=row.status) # type: ignore[no-any-return] + return TypeAdapter(FunctionOutputs).validate_python(row.outputs) async def list_function_job_collections( diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py index 49fec046fe01..ff680cba4060 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_service.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_service.py @@ -14,6 +14,7 @@ FunctionJobDB, FunctionJobID, FunctionJobStatus, + FunctionOutputs, FunctionOutputSchema, FunctionUserAccessRights, FunctionUserApiAccessRights, @@ -464,6 +465,55 @@ async def get_function_job_status( ) +async def get_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, +) -> FunctionOutputs: + return await _functions_repository.get_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + ) + + +async def update_function_job_outputs( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + outputs: FunctionOutputs, +) -> FunctionOutputs: + return await _functions_repository.update_function_job_outputs( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + outputs=outputs, + ) + + +async def update_function_job_status( + app: web.Application, + *, + user_id: UserID, + product_name: ProductName, + function_job_id: FunctionJobID, + job_status: FunctionJobStatus, +) -> FunctionJobStatus: + return await _functions_repository.update_function_job_status( + app=app, + user_id=user_id, + product_name=product_name, + function_job_id=function_job_id, + job_status=job_status, + ) + + async def get_functions_user_api_access_rights( app: web.Application, *, diff --git a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py index ef0ec12205cc..6101d09fe5c0 100644 --- a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py +++ b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_function_jobs_controller_rpc.py @@ -11,6 +11,7 @@ ProjectFunction, ProjectFunctionJob, ) +from models_library.functions import FunctionJobStatus from models_library.functions_errors import ( FunctionJobIDNotFoundError, FunctionJobReadAccessDeniedError, @@ -357,3 +358,133 @@ async def test_find_cached_function_jobs( # Assert the cached jobs does not contain the registered job for the other user assert cached_jobs is None + + +@pytest.mark.parametrize( + "user_role", + [UserRole.USER], +) +async def test_update_function_job_status( + client: TestClient, + rpc_client: RabbitMQRPCClient, + add_user_function_api_access_rights: None, + logged_user: UserInfoDict, + mock_function: ProjectFunction, + osparc_product_name: ProductName, +): + # Register the function first + registered_function = await functions_rpc.register_function( + rabbitmq_rpc_client=rpc_client, + function=mock_function, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + function_job = ProjectFunctionJob( + function_uid=registered_function.uid, + title="Test Function Job", + description="A test function job", + project_job_id=uuid4(), + inputs={"input1": "value1"}, + outputs={"output1": "result1"}, + ) + + # Register the function job + registered_job = await functions_rpc.register_function_job( + rabbitmq_rpc_client=rpc_client, + function_job=function_job, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + old_job_status = await functions_rpc.get_function_job_status( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + assert old_job_status.status == "created" + + # Update the function job status + new_status = FunctionJobStatus(status="COMPLETED") + updated_job_status = await functions_rpc.update_function_job_status( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + job_status=new_status, + ) + + # Assert the updated job status matches the new status + assert updated_job_status == new_status + + +@pytest.mark.parametrize( + "user_role", + [UserRole.USER], +) +async def test_update_function_job_outputs( + client: TestClient, + rpc_client: RabbitMQRPCClient, + add_user_function_api_access_rights: None, + logged_user: UserInfoDict, + mock_function: ProjectFunction, + osparc_product_name: ProductName, +): + # Register the function first + registered_function = await functions_rpc.register_function( + rabbitmq_rpc_client=rpc_client, + function=mock_function, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + function_job = ProjectFunctionJob( + function_uid=registered_function.uid, + title="Test Function Job", + description="A test function job", + project_job_id=uuid4(), + inputs={"input1": "value1"}, + outputs=None, + ) + + # Register the function job + registered_job = await functions_rpc.register_function_job( + rabbitmq_rpc_client=rpc_client, + function_job=function_job, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + received_outputs = await functions_rpc.get_function_job_outputs( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + assert received_outputs is None + + new_outputs = {"output1": "new_result1", "output2": "new_result2"} + + # Update the function job outputs + updated_outputs = await functions_rpc.update_function_job_outputs( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + outputs=new_outputs, + ) + + # Assert the updated outputs match the new outputs + assert updated_outputs == new_outputs + + # Update the function job outputs + received_outputs = await functions_rpc.get_function_job_outputs( + rabbitmq_rpc_client=rpc_client, + function_job_id=registered_job.uid, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + assert received_outputs == new_outputs From 521808af3808a34557e03b50b06dd27c593a6b2d Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Wed, 23 Jul 2025 16:27:45 +0200 Subject: [PATCH 3/7] Move some code in dependencies --- .../api/dependencies/functions.py | 67 +++++++ .../api/routes/function_jobs_routes.py | 47 ++--- .../api/routes/functions_routes.py | 30 +-- .../tests/unit/api_functions/conftest.py | 24 ++- .../test_api_routers_function_jobs.py | 42 +++-- .../test_api_routers_functions.py | 171 +----------------- .../functions/_functions_repository.py | 50 ++--- 7 files changed, 167 insertions(+), 264 deletions(-) create mode 100644 services/api-server/src/simcore_service_api_server/api/dependencies/functions.py diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py b/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py new file mode 100644 index 000000000000..06a2913ef004 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py @@ -0,0 +1,67 @@ +from typing import Annotated + +from fastapi import Depends +from models_library.functions import ( + FunctionJob, + FunctionJobID, + FunctionJobStatus, + FunctionOutputs, + RegisteredFunction, +) +from models_library.products import ProductName +from models_library.users import UserID +from simcore_service_api_server.api.dependencies.authentication import ( + get_current_user_id, + get_product_name, +) +from simcore_service_api_server.api.dependencies.webserver_rpc import ( + get_wb_api_rpc_client, +) +from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient + + +async def get_stored_job_outputs( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> FunctionOutputs: + + return await wb_api_rpc.get_function_job_outputs( + function_job_id=function_job_id, user_id=user_id, product_name=product_name + ) + + +async def get_function_job_dependency( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> FunctionJob: + return await wb_api_rpc.get_function_job( + function_job_id=function_job_id, user_id=user_id, product_name=product_name + ) + + +async def get_function_from_functionjob( + function_job: Annotated[FunctionJob, Depends(get_function_job_dependency)], + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> RegisteredFunction: + return await wb_api_rpc.get_function( + function_id=function_job.function_uid, + user_id=user_id, + product_name=product_name, + ) + + +async def get_stored_job_status( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> FunctionJobStatus: + return await wb_api_rpc.get_function_job_status( + function_job_id=function_job_id, user_id=user_id, product_name=product_name + ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index 83e487d73b37..21377d8935e6 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -20,6 +20,12 @@ from models_library.projects_state import RunningState from models_library.users import UserID from servicelib.fastapi.dependencies import get_app +from simcore_service_api_server.api.dependencies.functions import ( + get_function_from_functionjob, + get_function_job_dependency, + get_stored_job_outputs, + get_stored_job_status, +) from sqlalchemy.ext.asyncio import AsyncEngine from ..._service_jobs import JobService @@ -172,25 +178,19 @@ async def delete_function_job( ), ) async def function_job_status( - function_job_id: FunctionJobID, + function_job: Annotated[ + RegisteredFunctionJob, Depends(get_function_job_dependency) + ], + function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)], + stored_job_status: Annotated[FunctionJobStatus, Depends(get_stored_job_status)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], ) -> FunctionJobStatus: - function, function_job = await get_function_from_functionjobid( - wb_api_rpc=wb_api_rpc, - function_job_id=function_job_id, - user_id=user_id, - product_name=product_name, - ) - old_job_status = await wb_api_rpc.get_function_job_status( - function_job_id=function_job.uid, user_id=user_id, product_name=product_name - ) - - if old_job_status.status in (RunningState.SUCCESS, RunningState.FAILED): - return old_job_status + if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED): + return stored_job_status if ( function.function_class == FunctionClass.PROJECT @@ -264,27 +264,20 @@ async def get_function_from_functionjobid( ), ) async def get_function_job_outputs( - function_job_id: FunctionJobID, + function_job: Annotated[ + RegisteredFunctionJob, Depends(get_function_job_dependency) + ], + function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], + stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)], ) -> FunctionOutputs: - function, function_job = await get_function_from_functionjobid( - wb_api_rpc=wb_api_rpc, - function_job_id=function_job_id, - user_id=user_id, - product_name=product_name, - ) - - old_job_outputs = await wb_api_rpc.get_function_job_outputs( - function_job_id=function_job.uid, user_id=user_id, product_name=product_name - ) - - if old_job_outputs is not None: - return old_job_outputs + if stored_job_outputs is not None: + return stored_job_outputs if ( function.function_class == FunctionClass.PROJECT diff --git a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py index bbfeefc4efc0..c9ab2dda67cc 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py @@ -36,6 +36,7 @@ from models_library.users import UserID from servicelib.fastapi.dependencies import get_reverse_url_mapper from simcore_service_api_server._service_jobs import JobService +from simcore_service_api_server.api.dependencies.functions import get_stored_job_status from ..._service_solvers import SolverService from ...models.pagination import Page, PaginationParams @@ -365,11 +366,12 @@ async def validate_function_inputs( ) async def run_function( # noqa: PLR0913 request: Request, + function_id: FunctionID, + to_run_function: Annotated[RegisteredFunction, Depends(get_function)], wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], - function_id: FunctionID, function_inputs: FunctionInputs, user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[str, Depends(get_product_name)], @@ -378,7 +380,6 @@ async def run_function( # noqa: PLR0913 x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()], x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()], ) -> RegisteredFunctionJob: - parent_project_uuid = ( x_simcore_parent_project_uuid if isinstance(x_simcore_parent_project_uuid, ProjectID) @@ -390,8 +391,6 @@ async def run_function( # noqa: PLR0913 else None ) - # Make sure the user is allowed to execute any function - # (read/write right is checked in the other endpoint called in this method) user_api_access_rights = await wb_api_rpc.get_functions_user_api_access_rights( user_id=user_id, product_name=product_name ) @@ -400,8 +399,7 @@ async def run_function( # noqa: PLR0913 user_id=user_id, function_id=function_id, ) - # Make sure the user is allowed to execute this particular function - # (read/write right is checked in the other endpoint called in this method) + user_permissions: FunctionUserAccessRights = ( await wb_api_rpc.get_function_user_permissions( function_id=function_id, user_id=user_id, product_name=product_name @@ -415,10 +413,6 @@ async def run_function( # noqa: PLR0913 from .function_jobs_routes import function_job_status - to_run_function = await wb_api_rpc.get_function( - function_id=function_id, user_id=user_id, product_name=product_name - ) - joined_inputs = _join_inputs( to_run_function.default_inputs, function_inputs, @@ -443,10 +437,17 @@ async def run_function( # noqa: PLR0913 ): for cached_function_job in cached_function_jobs: job_status = await function_job_status( + function=to_run_function, + function_job=cached_function_job, + stored_job_status=await get_stored_job_status( + function_job_id=cached_function_job.uid, + user_id=user_id, + product_name=product_name, + wb_api_rpc=wb_api_rpc, + ), wb_api_rpc=wb_api_rpc, - director2_api=director2_api, - function_job_id=cached_function_job.uid, user_id=user_id, + director2_api=director2_api, product_name=product_name, ) if job_status.status == RunningState.SUCCESS: @@ -563,9 +564,10 @@ async def delete_function( ), ) async def map_function( # noqa: PLR0913 + request: Request, function_id: FunctionID, + to_run_function: Annotated[RegisteredFunction, Depends(get_function)], function_inputs_list: FunctionInputsList, - request: Request, wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], @@ -577,11 +579,11 @@ async def map_function( # noqa: PLR0913 x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()], x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()], ) -> RegisteredFunctionJobCollection: - function_jobs = [] function_jobs = [ await run_function( wb_api_rpc=wb_api_rpc, function_id=function_id, + to_run_function=to_run_function, function_inputs=function_inputs, product_name=product_name, user_id=user_id, diff --git a/services/api-server/tests/unit/api_functions/conftest.py b/services/api-server/tests/unit/api_functions/conftest.py index d401677953b7..cc6f57af1f20 100644 --- a/services/api-server/tests/unit/api_functions/conftest.py +++ b/services/api-server/tests/unit/api_functions/conftest.py @@ -142,19 +142,17 @@ def mock_registered_solver_function( sample_output_schema: JSONFunctionOutputSchema, ) -> RegisteredFunction: return RegisteredSolverFunction( - **{ - "title": "test_function", - "function_class": FunctionClass.SOLVER, - "description": "A test function", - "input_schema": sample_input_schema, - "output_schema": sample_output_schema, - "default_inputs": None, - "uid": f"{uuid4()}", - "created_at": datetime.datetime.now(datetime.UTC), - "modified_at": datetime.datetime.now(datetime.UTC), - "solver_key": "simcore/services/comp/ans-model", - "solver_version": "1.0.1", - } + title="test_function", + function_class=FunctionClass.SOLVER, + description="A test function", + input_schema=sample_input_schema, + output_schema=sample_output_schema, + default_inputs=None, + uid=uuid4(), + created_at=datetime.datetime.now(datetime.UTC), + modified_at=datetime.datetime.now(datetime.UTC), + solver_key="simcore/services/comp/ans-model", + solver_version="1.0.1", ) diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py index 26023e11a969..961190397466 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -23,7 +23,7 @@ async def test_delete_function_job( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: @@ -31,7 +31,8 @@ async def test_delete_function_job( # Now, delete the function job response = await client.delete( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", + auth=auth, ) assert response.status_code == status.HTTP_200_OK @@ -39,62 +40,63 @@ async def test_delete_function_job( async def test_register_function_job( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_function_job: ProjectFunctionJob, - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_project_function_job: ProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: """Test the register_function_job endpoint.""" mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_function_job + "register_function_job", mock_registered_project_function_job ) response = await client.post( f"{API_VTAG}/function_jobs", - json=mock_function_job.model_dump(mode="json"), + json=mock_project_function_job.model_dump(mode="json"), auth=auth, ) assert response.status_code == status.HTTP_200_OK assert ( RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_function_job + == mock_registered_project_function_job ) async def test_get_function_job( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_function_job + "get_function_job", mock_registered_project_function_job ) # Now, get the function job response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}", auth=auth + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", + auth=auth, ) assert response.status_code == status.HTTP_200_OK assert ( RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_function_job + == mock_registered_project_function_job ) async def test_list_function_jobs( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, ) -> None: mock_handler_in_functions_rpc_interface( "list_function_jobs", ( - [mock_registered_function_job for _ in range(5)], + [mock_registered_project_function_job for _ in range(5)], PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), ), ) @@ -106,7 +108,7 @@ async def test_list_function_jobs( assert len(data) == 5 assert ( RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_function_job + == mock_registered_project_function_job ) @@ -114,7 +116,7 @@ async def test_list_function_jobs( async def test_get_function_job_status( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, mock_registered_project_function: RegisteredProjectFunction, mock_handler_in_study_jobs_rest_interface: Callable[[str, Any], None], auth: httpx.BasicAuth, @@ -122,7 +124,7 @@ async def test_get_function_job_status( ) -> None: mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_function_job + "get_function_job", mock_registered_project_function_job ) mock_handler_in_functions_rpc_interface( "get_function", mock_registered_project_function @@ -147,7 +149,7 @@ async def test_get_function_job_status( ) response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}/status", + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}/status", auth=auth, ) assert response.status_code == status.HTTP_200_OK @@ -159,14 +161,14 @@ async def test_get_function_job_status( async def test_get_function_job_outputs( client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job: RegisteredProjectFunctionJob, + mock_registered_project_function_job: RegisteredProjectFunctionJob, mock_registered_project_function: RegisteredProjectFunction, auth: httpx.BasicAuth, job_outputs: dict[str, Any], ) -> None: mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_function_job + "get_function_job", mock_registered_project_function_job ) mock_handler_in_functions_rpc_interface( "get_function", mock_registered_project_function @@ -174,7 +176,7 @@ async def test_get_function_job_outputs( mock_handler_in_functions_rpc_interface("get_function_job_outputs", job_outputs) response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_function_job.uid}/outputs", + f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}/outputs", auth=auth, ) assert response.status_code == status.HTTP_200_OK diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index 58c2047005db..149c38ff3b29 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -18,16 +18,13 @@ from httpx import AsyncClient from models_library.api_schemas_long_running_tasks.tasks import TaskGet from models_library.functions import ( - FunctionJobCollection, FunctionUserAccessRights, FunctionUserApiAccessRights, ProjectFunction, - ProjectFunctionJob, RegisteredFunction, RegisteredFunctionJob, RegisteredFunctionJobCollection, RegisteredProjectFunction, - RegisteredProjectFunctionJob, ) from models_library.functions_errors import ( FunctionIDNotFoundError, @@ -317,169 +314,6 @@ async def test_delete_function( assert response.status_code == status.HTTP_200_OK -async def test_register_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_project_function_job: ProjectFunctionJob, - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - """Test the register_function_job endpoint.""" - - mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_project_function_job - ) - - response = await client.post( - f"{API_VTAG}/function_jobs", - json=mock_project_function_job.model_dump(mode="json"), - auth=auth, - ) - - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_project_function_job - ) - - -async def test_get_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_project_function_job - ) - - # Now, get the function job - response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_project_function_job - ) - - -async def test_list_function_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_project_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs - response = await client.get(f"{API_VTAG}/function_jobs", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_project_function_job - ) - - -async def test_list_function_jobs_with_function_filter( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - mock_registered_project_function: RegisteredProjectFunction, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_project_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs with a filter - response = await client.get( - f"{API_VTAG}/functions/{mock_registered_project_function.uid}/jobs", auth=auth - ) - - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_project_function_job - ) - - -async def test_delete_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - - mock_handler_in_functions_rpc_interface("delete_function_job", None) - - # Now, delete the function job - response = await client.delete( - f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - - -async def test_register_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_function_job_collection = FunctionJobCollection.model_validate( - { - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - } - ) - - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - **mock_function_job_collection.model_dump(), - "uid": str(uuid4()), - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "register_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.post( - f"{API_VTAG}/function_job_collections", - json=mock_function_job_collection.model_dump(mode="json"), - auth=auth, - ) - - # Assert - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredFunctionJobCollection.model_validate(response.json()) - == mock_registered_function_job_collection - ) - - @pytest.mark.parametrize("user_has_execute_right", [False, True]) @pytest.mark.parametrize( "funcapi_endpoint,endpoint_inputs", [("run", {}), ("map", [{}, {}])] @@ -517,6 +351,11 @@ async def test_run_map_function_not_allowed( ), ) + mock_handler_in_functions_rpc_interface( + "get_function", + mock_registered_project_function, + ) + # Monkeypatching MagicMock because otherwise it refuse to be used in an await statement async def async_magic(): pass diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py index dfcf426e9375..f33843d5b037 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py @@ -490,21 +490,21 @@ async def get_function_job_status( function_job_id: FunctionJobID, ) -> FunctionJobStatus: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( + await check_user_permissions( app, connection=conn, user_id=user_id, product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], + object_type="function_job", + object_id=function_job_id, + permissions=["read"], ) - await check_user_permissions( + await check_user_api_access_rights( app, connection=conn, user_id=user_id, product_name=product_name, - object_type="function_job", - object_id=function_job_id, - permissions=["read"], + api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], ) result = await conn.execute( @@ -529,21 +529,21 @@ async def get_function_job_outputs( function_job_id: FunctionJobID, ) -> FunctionOutputs: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( + await check_user_permissions( app, connection=conn, user_id=user_id, product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], + object_type="function_job", + object_id=function_job_id, + permissions=["read"], ) - await check_user_permissions( + await check_user_api_access_rights( app, connection=conn, user_id=user_id, product_name=product_name, - object_type="function_job", - object_id=function_job_id, - permissions=["read"], + api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], ) result = await conn.execute( @@ -569,21 +569,22 @@ async def update_function_job_status( job_status: FunctionJobStatus, ) -> FunctionJobStatus: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( + await check_user_permissions( app, connection=transaction, user_id=user_id, product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], + object_type="function_job", + object_id=function_job_id, + permissions=["write"], ) - await check_user_permissions( + + await check_user_api_access_rights( app, connection=transaction, user_id=user_id, product_name=product_name, - object_type="function_job", - object_id=function_job_id, - permissions=["write"], + api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], ) result = await transaction.execute( @@ -610,21 +611,22 @@ async def update_function_job_outputs( outputs: FunctionOutputs, ) -> FunctionOutputs: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( + await check_user_permissions( app, connection=transaction, user_id=user_id, product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], + object_type="function_job", + object_id=function_job_id, + permissions=["write"], ) - await check_user_permissions( + + await check_user_api_access_rights( app, connection=transaction, user_id=user_id, product_name=product_name, - object_type="function_job", - object_id=function_job_id, - permissions=["write"], + api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], ) result = await transaction.execute( From 592a347c1828ed0df8238d825b439683e1b0f780 Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Thu, 24 Jul 2025 11:09:13 +0200 Subject: [PATCH 4/7] Fix some pylint errors --- .../api/dependencies/functions.py | 20 ++ .../routes/function_job_collections_routes.py | 29 +- .../test_api_routers_functions.py | 295 ------------------ 3 files changed, 46 insertions(+), 298 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py b/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py index 06a2913ef004..354a0541f488 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/functions.py @@ -56,6 +56,26 @@ async def get_function_from_functionjob( ) +async def get_function_from_functionjobid( + function_job_id: FunctionJobID, + wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], +) -> RegisteredFunction: + function_job = await get_function_job_dependency( + function_job_id=function_job_id, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ) + return await get_function_from_functionjob( + function_job=function_job, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ) + + async def get_stored_job_status( function_job_id: FunctionJobID, wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py index eb50f8bd0efc..b4eccee5af0c 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py @@ -12,7 +12,13 @@ RegisteredFunctionJobCollection, ) from models_library.products import ProductName -from models_library.users import UserID # Import UserID +from models_library.users import UserID +from simcore_service_api_server.api.dependencies.functions import ( + get_stored_job_status, # Import UserID +) +from simcore_service_api_server.api.dependencies.functions import ( + get_function_from_functionjobid, +) from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet @@ -221,13 +227,30 @@ async def function_job_collection_status( job_statuses = await asyncio.gather( *[ function_job_status( - job_id, + function_job=await get_function_job( + function_job_id=function_job_id, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ), + function=await get_function_from_functionjobid( + function_job_id=function_job_id, + wb_api_rpc=wb_api_rpc, + user_id=user_id, + product_name=product_name, + ), + stored_job_status=await get_stored_job_status( + function_job_id=function_job_id, + user_id=user_id, + product_name=product_name, + wb_api_rpc=wb_api_rpc, + ), wb_api_rpc=wb_api_rpc, director2_api=director2_api, user_id=user_id, product_name=product_name, ) - for job_id in function_job_collection.job_ids + for function_job_id in function_job_collection.job_ids ] ) return FunctionJobCollectionStatus( diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index 06a313df580e..a3e61a3ec155 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -309,301 +309,6 @@ async def test_delete_function( assert response.status_code == status.HTTP_200_OK -async def test_register_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_project_function_job: ProjectFunctionJob, - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - """Test the register_function_job endpoint.""" - - mock_handler_in_functions_rpc_interface( - "register_function_job", mock_registered_project_function_job - ) - - response = await client.post( - f"{API_VTAG}/function_jobs", - json=mock_project_function_job.model_dump(mode="json"), - auth=auth, - ) - - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_project_function_job - ) - - -async def test_get_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "get_function_job", mock_registered_project_function_job - ) - - # Now, get the function job - response = await client.get( - f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredProjectFunctionJob.model_validate(response.json()) - == mock_registered_project_function_job - ) - - -async def test_list_function_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_project_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs - response = await client.get(f"{API_VTAG}/function_jobs", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_project_function_job - ) - - -async def test_list_function_jobs_with_function_filter( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - mock_registered_project_function: RegisteredProjectFunction, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "list_function_jobs", - ( - [mock_registered_project_function_job for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - # Now, list function jobs with a filter - response = await client.get( - f"{API_VTAG}/functions/{mock_registered_project_function.uid}/jobs", auth=auth - ) - - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredProjectFunctionJob.model_validate(data[0]) - == mock_registered_project_function_job - ) - - -async def test_delete_function_job( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_project_function_job: RegisteredProjectFunctionJob, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface("delete_function_job", None) - - # Now, delete the function job - response = await client.delete( - f"{API_VTAG}/function_jobs/{mock_registered_project_function_job.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - - -async def test_register_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_function_job_collection = FunctionJobCollection.model_validate( - { - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - } - ) - - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - **mock_function_job_collection.model_dump(), - "uid": str(uuid4()), - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "register_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.post( - f"{API_VTAG}/function_job_collections", - json=mock_function_job_collection.model_dump(mode="json"), - auth=auth, - ) - - # Assert - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredFunctionJobCollection.model_validate(response.json()) - == mock_registered_function_job_collection - ) - - -async def test_get_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - "uid": str(uuid4()), - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "get_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - assert ( - RegisteredFunctionJobCollection.model_validate(response.json()) - == mock_registered_function_job_collection - ) - - -async def test_list_function_job_collections( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - auth: httpx.BasicAuth, -) -> None: - mock_registered_function_job_collection = ( - RegisteredFunctionJobCollection.model_validate( - { - "uid": str(uuid4()), - "title": "Test Collection", - "description": "A test function job collection", - "job_ids": [str(uuid4()), str(uuid4())], - "created_at": datetime.datetime.now(datetime.UTC), - } - ) - ) - - mock_handler_in_functions_rpc_interface( - "list_function_job_collections", - ( - [mock_registered_function_job_collection for _ in range(5)], - PageMetaInfoLimitOffset(total=5, count=5, limit=10, offset=0), - ), - ) - - response = await client.get(f"{API_VTAG}/function_job_collections", auth=auth) - assert response.status_code == status.HTTP_200_OK - data = response.json()["items"] - assert len(data) == 5 - assert ( - RegisteredFunctionJobCollection.model_validate(data[0]) - == mock_registered_function_job_collection - ) - - -async def test_delete_function_job_collection( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface("delete_function_job_collection", None) - - # Now, delete the function job collection - response = await client.delete( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert data is None - - -async def test_get_function_job_collection_jobs( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "get_function_job_collection", mock_registered_function_job_collection - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections/{mock_registered_function_job_collection.uid}/function_jobs", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert len(data) == len(mock_registered_function_job_collection.job_ids) - - -async def test_list_function_job_collections_with_function_filter( - client: AsyncClient, - mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], - mock_registered_function_job_collection: RegisteredFunctionJobCollection, - mock_registered_project_function: RegisteredProjectFunction, - auth: httpx.BasicAuth, -) -> None: - mock_handler_in_functions_rpc_interface( - "list_function_job_collections", - ( - [mock_registered_function_job_collection for _ in range(2)], - PageMetaInfoLimitOffset(total=5, count=2, limit=2, offset=1), - ), - ) - - response = await client.get( - f"{API_VTAG}/function_job_collections?function_id={mock_registered_project_function.uid}&limit=2&offset=1", - auth=auth, - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - - assert data["total"] == 5 - assert data["limit"] == 2 - assert data["offset"] == 1 - assert len(data["items"]) == 2 - assert ( - RegisteredFunctionJobCollection.model_validate(data["items"][0]) - == mock_registered_function_job_collection - ) - - @pytest.mark.parametrize("user_has_execute_right", [False, True]) @pytest.mark.parametrize( "funcapi_endpoint,endpoint_inputs", [("run", {}), ("map", [{}, {}])] From 85a4c31745357795ded0bb6c1b457d2b9e270fdc Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Thu, 24 Jul 2025 11:20:54 +0200 Subject: [PATCH 5/7] Update openapi-specs --- services/api-server/openapi.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index a4832fe0b6b7..cd39d515bdb5 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -5641,9 +5641,9 @@ "tags": [ "function_jobs" ], - "summary": "Function Job Outputs", + "summary": "Get Function Job Outputs", "description": "Get function job outputs\n\nNew in *version 0.8.0*", - "operationId": "function_job_outputs", + "operationId": "get_function_job_outputs", "security": [ { "HTTPBasic": [] @@ -5676,7 +5676,7 @@ "type": "null" } ], - "title": "Response Function Job Outputs V0 Function Jobs Function Job Id Outputs Get" + "title": "Response Get Function Job Outputs V0 Function Jobs Function Job Id Outputs Get" } } } From b4d3cfa3f11bb1abb8d915d6fadf8dfe57b077a3 Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Thu, 24 Jul 2025 11:26:16 +0200 Subject: [PATCH 6/7] Revert api route method rename to keep same openapi.json --- services/api-server/openapi.json | 6 +++--- .../api/routes/function_jobs_routes.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index cd39d515bdb5..a4832fe0b6b7 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -5641,9 +5641,9 @@ "tags": [ "function_jobs" ], - "summary": "Get Function Job Outputs", + "summary": "Function Job Outputs", "description": "Get function job outputs\n\nNew in *version 0.8.0*", - "operationId": "get_function_job_outputs", + "operationId": "function_job_outputs", "security": [ { "HTTPBasic": [] @@ -5676,7 +5676,7 @@ "type": "null" } ], - "title": "Response Get Function Job Outputs V0 Function Jobs Function Job Id Outputs Get" + "title": "Response Function Job Outputs V0 Function Jobs Function Job Id Outputs Get" } } } diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index 21377d8935e6..38e78a021e4a 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -263,7 +263,7 @@ async def get_function_from_functionjobid( changelog=CHANGE_LOGS["function_job_outputs"], ), ) -async def get_function_job_outputs( +async def function_job_outputs( function_job: Annotated[ RegisteredFunctionJob, Depends(get_function_job_dependency) ], From 5eb4e9aac526c4ae1ecc86427fc806b718c76c80 Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Thu, 24 Jul 2025 16:14:53 +0200 Subject: [PATCH 7/7] Fix some duplicate code --- .../functions/_functions_repository.py | 133 +++--------------- .../test_functions_controller_rpc.py | 6 +- 2 files changed, 25 insertions(+), 114 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py index f33843d5b037..cd7e997c3d89 100644 --- a/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py +++ b/services/web/server/src/simcore_service_webserver/functions/_functions_repository.py @@ -326,12 +326,14 @@ async def get_function( function_id: FunctionID, ) -> RegisteredFunctionDB: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( + await check_user_permissions( app, connection=conn, user_id=user_id, product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTIONS], + object_id=function_id, + object_type="function", + permissions=["read"], ) result = await conn.execute( @@ -341,19 +343,7 @@ async def get_function( if row is None: raise FunctionIDNotFoundError(function_id=function_id) - registered_function = RegisteredFunctionDB.model_validate(row) - - await check_user_permissions( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - object_id=function_id, - object_type="function", - permissions=["read"], - ) - - return registered_function + return RegisteredFunctionDB.model_validate(row) async def list_functions( @@ -499,13 +489,6 @@ async def get_function_job_status( object_id=function_job_id, permissions=["read"], ) - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], - ) result = await conn.execute( function_jobs_table.select().where( @@ -538,13 +521,6 @@ async def get_function_job_outputs( object_id=function_job_id, permissions=["read"], ) - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], - ) result = await conn.execute( function_jobs_table.select().where( @@ -579,14 +555,6 @@ async def update_function_job_status( permissions=["write"], ) - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], - ) - result = await transaction.execute( function_jobs_table.update() .where(function_jobs_table.c.uuid == function_job_id) @@ -621,14 +589,6 @@ async def update_function_job_outputs( permissions=["write"], ) - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.WRITE_FUNCTION_JOBS], - ) - result = await transaction.execute( function_jobs_table.update() .where(function_jobs_table.c.uuid == function_job_id) @@ -761,17 +721,6 @@ async def delete_function( function_id: FunctionID, ) -> None: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTIONS, - FunctionsApiAccessRights.WRITE_FUNCTIONS, - ], - ) - await check_user_permissions( app, connection=transaction, @@ -807,17 +756,6 @@ async def update_function( function: FunctionUpdate, ) -> RegisteredFunctionDB: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTIONS, - FunctionsApiAccessRights.WRITE_FUNCTIONS, - ], - ) - await check_user_permissions( app, transaction, @@ -825,7 +763,7 @@ async def update_function( product_name=product_name, object_id=function_id, object_type="function", - permissions=["write"], + permissions=["read", "write"], ) result = await transaction.execute( @@ -851,13 +789,6 @@ async def get_function_job( function_job_id: FunctionID, ) -> RegisteredFunctionJobDB: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], - ) await check_user_permissions( app, connection=conn, @@ -890,16 +821,6 @@ async def delete_function_job( function_job_id: FunctionID, ) -> None: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTION_JOBS, - FunctionsApiAccessRights.WRITE_FUNCTION_JOBS, - ], - ) await check_user_permissions( app, connection=transaction, @@ -938,14 +859,6 @@ async def find_cached_function_jobs( inputs: FunctionInputs, ) -> list[RegisteredFunctionJobDB] | None: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOBS], - ) - jobs: list[RegisteredFunctionJobDB] = [] async for row in await conn.stream( function_jobs_table.select().where( @@ -984,13 +897,6 @@ async def get_function_job_collection( function_job_collection_id: FunctionID, ) -> tuple[RegisteredFunctionJobCollectionDB, list[FunctionJobID]]: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - await check_user_api_access_rights( - app, - connection=conn, - user_id=user_id, - product_name=product_name, - api_access_rights=[FunctionsApiAccessRights.READ_FUNCTION_JOB_COLLECTIONS], - ) await check_user_permissions( app, connection=conn, @@ -1038,16 +944,6 @@ async def delete_function_job_collection( function_job_collection_id: FunctionID, ) -> None: async with transaction_context(get_asyncpg_engine(app), connection) as transaction: - await check_user_api_access_rights( - app, - connection=transaction, - user_id=user_id, - product_name=product_name, - api_access_rights=[ - FunctionsApiAccessRights.READ_FUNCTION_JOB_COLLECTIONS, - FunctionsApiAccessRights.WRITE_FUNCTION_JOB_COLLECTIONS, - ], - ) await check_user_permissions( app, connection=transaction, @@ -1304,6 +1200,21 @@ async def check_user_permissions( object_type: Literal["function", "function_job", "function_job_collection"], permissions: list[Literal["read", "write", "execute"]], ) -> bool: + + api_access_rights = [ + getattr( + FunctionsApiAccessRights, f"{permission.upper()}_{object_type.upper()}S" + ) + for permission in permissions + ] + await check_user_api_access_rights( + app, + connection=connection, + user_id=user_id, + product_name=product_name, + api_access_rights=api_access_rights, + ) + user_permissions = await get_user_permissions( app, connection=connection, @@ -1390,6 +1301,6 @@ async def check_user_api_access_rights( for api_access_right in api_access_rights: if not getattr(user_api_access_rights, api_access_right): - raise _ERRORS_MAP[api_access_right] + raise _ERRORS_MAP[api_access_right](user_id=user_id) return True diff --git a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py index 617c1b00b9a8..f4974ad33364 100644 --- a/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py +++ b/services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py @@ -18,7 +18,7 @@ from models_library.functions_errors import ( FunctionIDNotFoundError, FunctionReadAccessDeniedError, - FunctionsReadApiAccessDeniedError, + FunctionsWriteApiAccessDeniedError, FunctionWriteAccessDeniedError, ) from models_library.products import ProductName @@ -98,7 +98,7 @@ async def test_register_get_delete_function( product_name=osparc_product_name, ) - with pytest.raises(FunctionsReadApiAccessDeniedError): + with pytest.raises(FunctionsWriteApiAccessDeniedError): # Attempt to delete the function in another product await functions_rpc.delete_function( rabbitmq_rpc_client=rpc_client, @@ -375,7 +375,7 @@ async def test_update_function_title( # Update the function's title by other user updated_title = "Updated Function Title by Other User" registered_function.title = updated_title - with pytest.raises(FunctionWriteAccessDeniedError): + with pytest.raises(FunctionReadAccessDeniedError): updated_function = await functions_rpc.update_function_title( rabbitmq_rpc_client=rpc_client, function_id=registered_function.uid,