From 551822d9b7c47d549b1ed047f7b4c25bf6dc1ca4 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 13 May 2025 16:22:52 +0200 Subject: [PATCH 1/7] work --- api/specs/web-server/_computations.py | 12 ++ .../api_schemas_webserver/computations.py | 10 +- .../director_v2/computations.py | 28 ++++ .../api/rpc/_computations.py | 30 +++++ .../modules/db/repositories/comp_runs.py | 74 +++++++++- .../api/v0/openapi.yaml | 45 +++++++ .../director_v2/_computations_service.py | 127 ++++++++++++++---- .../_controller/computations_rest.py | 54 +++++++- 8 files changed, 345 insertions(+), 35 deletions(-) diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index a1858091106a..6b81b2133214 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -5,7 +5,9 @@ from models_library.api_schemas_webserver.computations import ( ComputationGet, ComputationPathParams, + ComputationRunPathParams, ComputationRunRestGet, + ComputationRunWithFiltersListQueryParams, ComputationStart, ComputationStarted, ComputationTaskRestGet, @@ -68,7 +70,17 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]): response_model=Envelope[list[ComputationRunRestGet]], ) async def list_computations_latest_iteration( + _query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()], +): ... + + +@router.get( + "/computations/{project_id}/iterations", + response_model=Envelope[list[ComputationRunRestGet]], +) +async def list_computation_iterations( _query: Annotated[as_query(ComputationRunListQueryParams), Depends()], + _path: Annotated[ComputationRunPathParams, Depends()], ): ... diff --git a/packages/models-library/src/models_library/api_schemas_webserver/computations.py b/packages/models-library/src/models_library/api_schemas_webserver/computations.py index b98865f66299..a1f955b20fe9 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/computations.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/computations.py @@ -81,7 +81,10 @@ class ComputationStarted(OutputSchemaWithoutCamelCase): class ComputationRunListQueryParams( PageQueryParameters, ComputationRunListOrderParams, # type: ignore[misc, valid-type] -): +): ... + + +class ComputationRunWithFiltersListQueryParams(ComputationRunListQueryParams): filter_only_running: bool = Field( default=False, description="If true, only running computations are returned", @@ -100,6 +103,11 @@ class ComputationRunRestGet(OutputSchema): project_custom_metadata: dict[str, Any] +class ComputationRunPathParams(BaseModel): + project_id: ProjectID + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + ### Computation Task diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py index 3bc9f3e4059c..24720ecd5129 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/director_v2/computations.py @@ -58,6 +58,34 @@ async def list_computations_latest_iteration_page( return result +@log_decorator(_logger, level=logging.DEBUG) +async def list_computations_iterations_page( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationRunRpcGetPage: + result = await rabbitmq_rpc_client.request( + DIRECTOR_V2_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("list_computations_iterations_page"), + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, ComputationRunRpcGetPage) # nosec + return result + + @log_decorator(_logger, level=logging.DEBUG) async def list_computations_latest_iteration_tasks_page( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index 960ea9245f4a..d0caa15a8690 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -53,6 +53,36 @@ async def list_computations_latest_iteration_page( ) +@router.expose(reraise_if_error_type=()) +async def list_computations_iterations_page( + app: FastAPI, + *, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int = 0, + limit: int = 20, + # ordering + order_by: OrderBy | None = None, +) -> ComputationRunRpcGetPage: + comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) + total, comp_runs_output = ( + await comp_runs_repo.list_for_user_and_project_all_iterations( + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + ) + return ComputationRunRpcGetPage( + items=comp_runs_output, + total=total, + ) + + async def _fetch_task_log( user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet ) -> TaskLogFileGet | None: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index b34324308bbf..eb706af4b832 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -190,6 +190,16 @@ async def list_( ) ] + _COMPUTATION_RUNS_RPC_GET_COLUMNS = [ # noqa: RUF012 + comp_runs.c.project_uuid, + comp_runs.c.iteration, + comp_runs.c.result.label("state"), + comp_runs.c.metadata.label("info"), + comp_runs.c.created.label("submitted_at"), + comp_runs.c.started.label("started_at"), + comp_runs.c.ended.label("ended_at"), + ] + async def list_for_user__only_latest_iterations( self, *, @@ -212,13 +222,7 @@ async def list_for_user__only_latest_iterations( order_by = OrderBy(field=IDStr("run_id")) # default ordering base_select_query = sa.select( - comp_runs.c.project_uuid, - comp_runs.c.iteration, - comp_runs.c.result.label("state"), - comp_runs.c.metadata.label("info"), - comp_runs.c.created.label("submitted_at"), - comp_runs.c.started.label("started_at"), - comp_runs.c.ended.label("ended_at"), + *self._COMPUTATION_RUNS_RPC_GET_COLUMNS ).select_from( sa.select( comp_runs.c.project_uuid, @@ -286,6 +290,62 @@ async def list_for_user__only_latest_iterations( return cast(int, total_count), items + async def list_for_user_and_project_all_iterations( + self, + *, + product_name: str, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int, + limit: int, + # ordering + order_by: OrderBy | None = None, + ) -> tuple[int, list[ComputationRunRpcGet]]: + if order_by is None: + order_by = OrderBy(field=IDStr("run_id")) # default ordering + + base_select_query = sa.select( + *self._COMPUTATION_RUNS_RPC_GET_COLUMNS, + ).where( + (comp_runs.c.user_id == user_id) + & (comp_runs.c.project_uuid == f"{project_id}") + & ( + comp_runs.c.metadata["product_name"].astext == product_name + ) # <-- NOTE: We might create a separate column for this for fast retrieval + ) + + # Select total count from base_query + count_query = sa.select(sa.func.count()).select_from( + base_select_query.subquery() + ) + + # Ordering and pagination + if order_by.direction == OrderDirection.ASC: + list_query = base_select_query.order_by( + sa.asc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id + ) + else: + list_query = base_select_query.order_by( + desc(getattr(comp_runs.c, order_by.field)), comp_runs.c.run_id + ) + list_query = list_query.offset(offset).limit(limit) + + async with pass_or_acquire_connection(self.db_engine) as conn: + total_count = await conn.scalar(count_query) + + items = [ + ComputationRunRpcGet.model_validate( + { + **row, + "state": DB_TO_RUNNING_STATE[row["state"]], + } + ) + async for row in await conn.stream(list_query) + ] + + return cast(int, total_count), items + async def create( self, *, diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 6979ff2e9571..db0da6a500ab 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2581,6 +2581,51 @@ paths: application/json: schema: $ref: '#/components/schemas/Envelope_list_ComputationRunRestGet__' + /v0/computations/{project_id}/iterations: + get: + tags: + - computations + - projects + summary: List Computation Iterations + operationId: list_computation_iterations + parameters: + - name: project_id + in: path + required: true + schema: + type: string + format: uuid + title: Project Id + - name: order_by + in: query + required: false + schema: + type: string + contentMediaType: application/json + contentSchema: {} + default: '{"field":"created","direction":"asc"}' + title: Order By + - name: limit + in: query + required: false + schema: + type: integer + default: 20 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_list_ComputationRunRestGet__' /v0/computations/{project_id}/iterations/latest/tasks: get: tags: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py index e863f11a3d0a..a2e5045e4e12 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -1,6 +1,7 @@ from decimal import Decimal from aiohttp import web +from models_library.api_schemas_directorv2.comp_runs import ComputationRunRpcGet from models_library.computations import ( ComputationRunWithAttributes, ComputationTaskWithAttributes, @@ -8,12 +9,16 @@ from models_library.products import ProductName from models_library.projects import ProjectID from models_library.rest_ordering import OrderBy +from models_library.services_types import ServiceRunID from models_library.users import UserID from pydantic import NonNegativeInt from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import ( credit_transactions, ) +from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import ( + CreditTransactionNotFoundError, +) from servicelib.utils import limited_gather from ..products.products_service import is_product_billable @@ -28,6 +33,37 @@ from ..rabbitmq import get_rabbitmq_rpc_client +async def _fetch_projects_metadata( + app: web.Application, + project_uuids: list[ProjectID], +) -> list[dict]: + """Batch fetch project metadata with concurrency control""" + # NOTE: MD: can be improved with a single batch call + return await limited_gather( + *[ + get_project_custom_metadata_or_empty_dict(app, project_uuid=uuid) + for uuid in project_uuids + ], + limit=20, + ) + + +async def _fetch_root_project_names( + app: web.Application, items: list[ComputationRunRpcGet] +) -> list[str]: + """Resolve root project names from computation items""" + root_uuids: list[ProjectID] = [] + for item in items: + if root_id := item.info.get("project_metadata", {}).get( + "root_parent_project_id" + ): + root_uuids.append(ProjectID(root_id)) + else: + root_uuids.append(item.project_uuid) + + return await batch_get_project_name(app, projects_uuids=root_uuids) + + async def list_computations_latest_iteration( app: web.Application, product_name: ProductName, @@ -52,32 +88,12 @@ async def list_computations_latest_iteration( order_by=order_by, ) - # Get projects metadata (NOTE: MD: can be improved with a single batch call) - _projects_metadata = await limited_gather( - *[ - get_project_custom_metadata_or_empty_dict( - app, project_uuid=item.project_uuid - ) - for item in _runs_get.items - ], - limit=20, + # Get projects metadata + _projects_metadata = await _fetch_projects_metadata( + app, project_uuids=[item.project_uuid for item in _runs_get.items] ) - # Get Root project names - _projects_root_uuids: list[ProjectID] = [] - for item in _runs_get.items: - if ( - prj_root_id := item.info.get("project_metadata", {}).get( - "root_parent_project_id", None - ) - ) is not None: - _projects_root_uuids.append(ProjectID(prj_root_id)) - else: - _projects_root_uuids.append(item.project_uuid) - - _projects_root_names = await batch_get_project_name( - app, projects_uuids=_projects_root_uuids - ) + _projects_root_names = await _fetch_root_project_names(app, _runs_get.items) _computational_runs_output = [ ComputationRunWithAttributes( @@ -99,6 +115,67 @@ async def list_computations_latest_iteration( return _runs_get.total, _computational_runs_output +async def list_computation_iterations( + app: web.Application, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> tuple[int, list[ComputationRunWithAttributes]]: + """Returns the list of computations for a specific project (all iterations)""" + rpc_client = get_rabbitmq_rpc_client(app) + _runs_get = await computations.list_computations_iterations_page( + rpc_client, + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + + # Get projects metadata + _projects_metadata = await _fetch_projects_metadata(app, project_uuids=[project_id]) + assert len(_projects_metadata) == 1 # nosec + # Get Root project names + _projects_root_names = await _fetch_root_project_names(app, [_runs_get.items[0]]) + assert len(_projects_root_names) == 1 # nosec + + _computational_runs_output = [ + ComputationRunWithAttributes( + project_uuid=item.project_uuid, + iteration=item.iteration, + state=item.state, + info=item.info, + submitted_at=item.submitted_at, + started_at=item.started_at, + ended_at=item.ended_at, + root_project_name=_projects_root_names[0], + project_custom_metadata=_projects_metadata[0], + ) + for item in _runs_get.items + ] + + return _runs_get.total, _computational_runs_output + + +async def _get_credits_or_zero_by_service_run_id( + rpc_client, service_run_id: ServiceRunID +) -> Decimal: + try: + return ( + await credit_transactions.get_transaction_current_credits_by_service_run_id( + rpc_client, service_run_id=service_run_id + ) + ) + except CreditTransactionNotFoundError: + return Decimal(0) + + async def list_computations_latest_iteration_tasks( app: web.Application, product_name: ProductName, @@ -138,7 +215,7 @@ async def list_computations_latest_iteration_tasks( # NOTE: MD: can be improved with a single batch call _service_run_osparc_credits = await limited_gather( *[ - credit_transactions.get_transaction_current_credits_by_service_run_id( + _get_credits_or_zero_by_service_run_id( rpc_client, service_run_id=_run_id ) for _run_id in _service_run_ids diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index 57ccb08e5694..711e9bcf4ab2 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -3,7 +3,9 @@ from aiohttp import web from models_library.api_schemas_webserver.computations import ( ComputationRunListQueryParams, + ComputationRunPathParams, ComputationRunRestGet, + ComputationRunWithFiltersListQueryParams, ComputationTaskListQueryParams, ComputationTaskPathParams, ComputationTaskRestGet, @@ -49,8 +51,10 @@ class ComputationsRequestContext(RequestParameters): async def list_computations_latest_iteration(request: web.Request) -> web.Response: req_ctx = ComputationsRequestContext.model_validate(request) - query_params: ComputationRunListQueryParams = parse_request_query_parameters_as( - ComputationRunListQueryParams, request + query_params: ComputationRunWithFiltersListQueryParams = ( + parse_request_query_parameters_as( + ComputationRunWithFiltersListQueryParams, request + ) ) total, items = await _computations_service.list_computations_latest_iteration( @@ -85,6 +89,52 @@ async def list_computations_latest_iteration(request: web.Request) -> web.Respon ) +@routes.get( + f"/{VTAG}/computations/{{project_id}}/iterations", + name="list_computations_latest_iteration", +) +@login_required +@permission_required("services.pipeline.*") +@permission_required("project.read") +async def list_computation_iterations(request: web.Request) -> web.Response: + + req_ctx = ComputationsRequestContext.model_validate(request) + query_params: ComputationRunListQueryParams = parse_request_query_parameters_as( + ComputationRunListQueryParams, request + ) + path_params = parse_request_path_parameters_as(ComputationRunPathParams, request) + + total, items = await _computations_service.list_computation_iterations( + request.app, + product_name=req_ctx.product_name, + user_id=req_ctx.user_id, + project_id=path_params.project_id, + # pagination + offset=query_params.offset, + limit=query_params.limit, + # ordering + order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), + ) + + page = Page[ComputationRunRestGet].model_validate( + paginate_data( + chunk=[ + ComputationRunRestGet.model_validate(run, from_attributes=True) + for run in items + ], + total=total, + limit=query_params.limit, + offset=query_params.offset, + request_url=request.url, + ) + ) + + return web.Response( + text=page.model_dump_json(**RESPONSE_MODEL_POLICY), + content_type=MIMETYPE_APPLICATION_JSON, + ) + + @routes.get( f"/{VTAG}/computations/{{project_id}}/iterations/latest/tasks", name="list_computations_latest_iteration_tasks", From 2b7026df3742042e671056bdb6cb913e5950c0a9 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 13 May 2025 16:44:03 +0200 Subject: [PATCH 2/7] adding unit tests --- .../test_api_rpc_computations.py | 110 +++++++++++++++++- 1 file changed, 106 insertions(+), 4 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py index c2180f22e0bb..3b179dc54e2d 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -7,7 +7,7 @@ # pylint: disable=too-many-positional-arguments from collections.abc import Awaitable, Callable -from datetime import datetime, timezone +from datetime import UTC, datetime, timedelta from typing import Any from models_library.api_schemas_directorv2.comp_runs import ( @@ -66,7 +66,7 @@ async def test_rpc_list_computation_runs_and_tasks( user=user, project=proj, result=RunningState.PENDING, - started=datetime.now(tz=timezone.utc), + started=datetime.now(tz=UTC), iteration=2, ) output = await rpc_computations.list_computations_latest_iteration_page( @@ -82,8 +82,8 @@ async def test_rpc_list_computation_runs_and_tasks( user=user, project=proj, result=RunningState.SUCCESS, - started=datetime.now(tz=timezone.utc), - ended=datetime.now(tz=timezone.utc), + started=datetime.now(tz=UTC), + ended=datetime.now(tz=UTC), iteration=3, ) output = await rpc_computations.list_computations_latest_iteration_page( @@ -103,3 +103,105 @@ async def test_rpc_list_computation_runs_and_tasks( assert output.total == 4 assert isinstance(output, ComputationTaskRpcGetPage) assert len(output.items) == 4 + + +async def test_rpc_list_computation_runs_with_filtering( + fake_workbench_without_outputs: dict[str, Any], + fake_workbench_adjacency: dict[str, Any], + registered_user: Callable[..., dict[str, Any]], + project: Callable[..., Awaitable[ProjectAtDB]], + create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], + create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], + create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], + rpc_client: RabbitMQRPCClient, +): + user = registered_user() + + proj_1 = await project(user, workbench=fake_workbench_without_outputs) + await create_pipeline( + project_id=f"{proj_1.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ) + comp_tasks = await create_tasks( + user=user, project=proj_1, state=StateType.PUBLISHED, progress=None + ) + comp_runs = await create_comp_run( + user=user, project=proj_1, result=RunningState.PUBLISHED + ) + + proj_2 = await project(user, workbench=fake_workbench_without_outputs) + await create_pipeline( + project_id=f"{proj_2.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ) + comp_tasks = await create_tasks( + user=user, project=proj_2, state=StateType.SUCCESS, progress=None + ) + comp_runs = await create_comp_run( + user=user, project=proj_2, result=RunningState.SUCCESS + ) + + # Test default behaviour `filter_only_running=False` + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"] + ) + assert output.total == 2 + + # Test filtering + output = await rpc_computations.list_computations_latest_iteration_page( + rpc_client, product_name="osparc", user_id=user["id"], filter_only_running=True + ) + assert output.total == 1 + assert output.items[0].project_uuid == proj_1.uuid + + +async def test_rpc_list_computation_runs_history( + fake_workbench_without_outputs: dict[str, Any], + fake_workbench_adjacency: dict[str, Any], + registered_user: Callable[..., dict[str, Any]], + project: Callable[..., Awaitable[ProjectAtDB]], + create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], + create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], + create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], + rpc_client: RabbitMQRPCClient, +): + user = registered_user() + + proj = await project(user, workbench=fake_workbench_without_outputs) + await create_pipeline( + project_id=f"{proj.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ) + comp_tasks = await create_tasks( + user=user, project=proj, state=StateType.PUBLISHED, progress=None + ) + comp_runs_1 = await create_comp_run( + user=user, + project=proj, + result=RunningState.SUCCESS, + started=datetime.now(tz=UTC) - timedelta(minutes=120), + ended=datetime.now(tz=UTC) - timedelta(minutes=100), + iteration=1, + ) + comp_runs_2 = await create_comp_run( + user=user, + project=proj, + result=RunningState.SUCCESS, + started=datetime.now(tz=UTC) - timedelta(minutes=90), + ended=datetime.now(tz=UTC) - timedelta(minutes=60), + iteration=2, + ) + comp_runs_3 = await create_comp_run( + user=user, + project=proj, + result=RunningState.FAILED, + started=datetime.now(tz=UTC) - timedelta(minutes=50), + ended=datetime.now(tz=UTC), + iteration=3, + ) + + output = await rpc_computations.list_computations_iterations_page( + rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid + ) + assert output.total == 3 + assert isinstance(output, ComputationRunRpcGetPage) From f8e76f79ac5cdda7b459d3d1be232d60f07c48c2 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 13 May 2025 16:59:59 +0200 Subject: [PATCH 3/7] fix --- .../director_v2/_controller/computations_rest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index 711e9bcf4ab2..a67d7f83e5e9 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -91,7 +91,7 @@ async def list_computations_latest_iteration(request: web.Request) -> web.Respon @routes.get( f"/{VTAG}/computations/{{project_id}}/iterations", - name="list_computations_latest_iteration", + name="list_computation_iterations", ) @login_required @permission_required("services.pipeline.*") From 73a094f31d493e734eecfa23f3f77fb04a6a1649 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 14 May 2025 09:23:27 +0200 Subject: [PATCH 4/7] add unit test --- .../with_dbs/01/test_director_v2_handlers.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py index 9446c8262711..363c36cf9958 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py +++ b/services/web/server/tests/unit/with_dbs/01/test_director_v2_handlers.py @@ -151,6 +151,31 @@ def mock_rpc_list_computations_latest_iteration_tasks( ) +@pytest.fixture +def mock_rpc_list_computation_iterations( + mocker: MockerFixture, + user_project: ProjectDict, +) -> ComputationRunRpcGetPage: + project_uuid = user_project["uuid"] + example_1 = ComputationRunRpcGet.model_config["json_schema_extra"]["examples"][0] + example_1["project_uuid"] = project_uuid + example_2 = ComputationRunRpcGet.model_config["json_schema_extra"]["examples"][0] + example_2["project_uuid"] = project_uuid + example_2["iteration"] = 2 + + return mocker.patch( + "simcore_service_webserver.director_v2._computations_service.computations.list_computations_iterations_page", + spec=True, + return_value=ComputationRunRpcGetPage( + items=[ + ComputationRunRpcGet.model_validate(example_1), + ComputationRunRpcGet.model_validate(example_2), + ], + total=2, + ), + ) + + @pytest.fixture def mock_rpc_list_computations_latest_iteration_tasks_page( mocker: MockerFixture, @@ -180,6 +205,7 @@ async def test_list_computations_latest_iteration( expected: ExpectedResponse, mock_rpc_list_computations_latest_iteration_tasks: None, mock_rpc_list_computations_latest_iteration_tasks_page: None, + mock_rpc_list_computation_iterations: None, ): assert client.app url = client.app.router["list_computations_latest_iteration"].url_for() @@ -191,6 +217,19 @@ async def test_list_computations_latest_iteration( assert ComputationRunRestGet.model_validate(data[0]) assert data[0]["rootProjectName"] == user_project["name"] + url = client.app.router["list_computation_iterations"].url_for( + project_id=f"{user_project['uuid']}" + ) + resp = await client.get(f"{url}") + data, _ = await assert_status( + resp, status.HTTP_200_OK if user_role == UserRole.GUEST else expected.ok + ) + if user_role != UserRole.ANONYMOUS: + assert ComputationRunRestGet.model_validate(data[0]) + assert len(data) == 2 + assert data[0]["rootProjectName"] == user_project["name"] + assert data[1]["rootProjectName"] == user_project["name"] + url = client.app.router["list_computations_latest_iteration_tasks"].url_for( project_id=f"{user_project['uuid']}" ) From e9ff1306fb3f3690796d1117c18cd217cadd364d Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 14 May 2025 12:24:39 +0200 Subject: [PATCH 5/7] review @pcrespov --- api/specs/web-server/_computations.py | 7 +- .../src/pytest_simcore/db_entries_mocks.py | 2 +- .../integration/01/test_computation_api.py | 36 +-- .../02/test_dynamic_services_routes.py | 10 +- ...t_dynamic_sidecar_nodeports_integration.py | 4 +- ...ixed_dynamic_sidecar_and_legacy_project.py | 7 +- .../test_api_route_computations.py | 48 ++-- .../test_api_route_computations_tasks.py | 4 +- .../test_api_rpc_computations.py | 12 +- .../with_dbs/comp_scheduler/test_manager.py | 4 +- .../comp_scheduler/test_scheduler_dask.py | 4 +- .../tests/unit/with_dbs/conftest.py | 12 +- .../tests/unit/with_dbs/test_cli.py | 4 +- ...db_repositories_groups_extra_properties.py | 7 +- .../test_modules_db_repositories_projects.py | 4 +- .../unit/with_dbs/test_utils_rabbitmq.py | 4 +- .../api/v0/openapi.yaml | 114 +++++--- .../director_v2/_computations_service.py | 16 +- .../_computations_service_utils.py | 247 ++++++++++++++++++ 19 files changed, 418 insertions(+), 128 deletions(-) create mode 100644 services/web/server/src/simcore_service_webserver/director_v2/_computations_service_utils.py diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index 6b81b2133214..b40c2550a168 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -2,6 +2,7 @@ from _common import as_query from fastapi import APIRouter, Depends, status +from fastapi_pagination import Page from models_library.api_schemas_webserver.computations import ( ComputationGet, ComputationPathParams, @@ -67,7 +68,7 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]): @router.get( "/computations/-/iterations/latest", - response_model=Envelope[list[ComputationRunRestGet]], + response_model=Page[ComputationRunRestGet], ) async def list_computations_latest_iteration( _query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()], @@ -76,7 +77,7 @@ async def list_computations_latest_iteration( @router.get( "/computations/{project_id}/iterations", - response_model=Envelope[list[ComputationRunRestGet]], + response_model=Page[ComputationRunRestGet], ) async def list_computation_iterations( _query: Annotated[as_query(ComputationRunListQueryParams), Depends()], @@ -86,7 +87,7 @@ async def list_computation_iterations( @router.get( "/computations/{project_id}/iterations/latest/tasks", - response_model=Envelope[list[ComputationTaskRestGet]], + response_model=Page[ComputationTaskRestGet], ) async def list_computations_latest_iteration_tasks( _query: Annotated[as_query(ComputationTaskListQueryParams), Depends()], diff --git a/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py b/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py index 20038c9a58ad..6159e3d72202 100644 --- a/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py +++ b/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py @@ -23,7 +23,7 @@ @pytest.fixture() -def registered_user( +def create_registered_user( postgres_db: sa.engine.Engine, faker: Faker ) -> Iterator[Callable[..., dict]]: created_user_ids = [] diff --git a/services/director-v2/tests/integration/01/test_computation_api.py b/services/director-v2/tests/integration/01/test_computation_api.py index e3dd12061d89..1c1afe2f5fbe 100644 --- a/services/director-v2/tests/integration/01/test_computation_api.py +++ b/services/director-v2/tests/integration/01/test_computation_api.py @@ -188,12 +188,12 @@ def test_invalid_computation( async def test_start_empty_computation_is_refused( async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() empty_project = await project(user) with pytest.raises( httpx.HTTPStatusError, match=f"{status.HTTP_422_UNPROCESSABLE_ENTITY}" @@ -391,7 +391,7 @@ class PartialComputationParams: async def test_run_partial_computation( wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]], async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], update_project_workbench_with_comp_tasks: Callable, fake_workbench_without_outputs: dict[str, Any], @@ -399,7 +399,7 @@ async def test_run_partial_computation( osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() await wait_for_catalog_service(user["id"], osparc_product_name) sleepers_project: ProjectAtDB = await project( user, workbench=fake_workbench_without_outputs @@ -539,7 +539,7 @@ def _convert_to_pipeline_details( async def test_run_computation( wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]], async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], update_project_workbench_with_comp_tasks: Callable, @@ -548,7 +548,7 @@ async def test_run_computation( osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() await wait_for_catalog_service(user["id"], osparc_product_name) sleepers_project = await project(user, workbench=fake_workbench_without_outputs) # send a valid project with sleepers @@ -653,14 +653,14 @@ async def test_run_computation( async def test_abort_computation( async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], fake_workbench_computational_pipeline_details: PipelineDetails, osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() # we need long running tasks to ensure cancellation is done properly for node in fake_workbench_without_outputs.values(): if "sleeper" in node["key"]: @@ -730,7 +730,7 @@ async def test_abort_computation( async def test_update_and_delete_computation( async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], fake_workbench_computational_pipeline_details_not_started: PipelineDetails, @@ -738,7 +738,7 @@ async def test_update_and_delete_computation( osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() sleepers_project = await project(user, workbench=fake_workbench_without_outputs) # send a valid project with sleepers task_out = await create_pipeline( @@ -852,13 +852,13 @@ async def test_update_and_delete_computation( async def test_pipeline_with_no_computational_services_still_create_correct_comp_tasks_in_db( async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], jupyter_service: dict[str, Any], osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() # create a workbench with just a dynamic service project_with_dynamic_node = await project( user, @@ -895,12 +895,12 @@ async def test_pipeline_with_no_computational_services_still_create_correct_comp async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed( client: TestClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], jupyter_service: dict[str, Any], osparc_product_name: str, ): - user = registered_user() + user = create_registered_user() # create a workbench with just 2 dynamic service in a cycle project_with_dynamic_node = await project( user, @@ -963,13 +963,13 @@ async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed( async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidden( client: TestClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], sleeper_service: dict[str, Any], jupyter_service: dict[str, Any], osparc_product_name: str, ): - user = registered_user() + user = create_registered_user() # create a workbench with just 2 dynamic service in a cycle project_with_cycly_and_comp_service = await project( user, @@ -1044,7 +1044,7 @@ async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidd async def test_burst_create_computations( async_client: httpx.AsyncClient, - registered_user: Callable, + create_registered_user: Callable, project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], update_project_workbench_with_comp_tasks: Callable, @@ -1053,7 +1053,7 @@ async def test_burst_create_computations( osparc_product_name: str, create_pipeline: Callable[..., Awaitable[ComputationGet]], ): - user = registered_user() + user = create_registered_user() sleepers_project = await project(user, workbench=fake_workbench_without_outputs) sleepers_project2 = await project(user, workbench=fake_workbench_without_outputs) diff --git a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py index 4a4340338649..25aaf0de8ed3 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py +++ b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py @@ -77,18 +77,16 @@ def minimal_configuration( simcore_services_ready: None, rabbit_service: RabbitSettings, storage_service: URL, -): - ... +): ... @pytest.fixture -def mock_env(mock_env: EnvVarsDict, minimal_configuration) -> None: - ... +def mock_env(mock_env: EnvVarsDict, minimal_configuration) -> None: ... @pytest.fixture -def user_db(registered_user: Callable[..., dict[str, Any]]) -> dict[str, Any]: - user = registered_user() +def user_db(create_registered_user: Callable[..., dict[str, Any]]) -> dict[str, Any]: + user = create_registered_user() return user diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 09cb6a75faae..85ecce3211e1 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -254,8 +254,8 @@ def _get_node_uuid(registry_service_data: dict) -> str: @pytest.fixture -def current_user(registered_user: Callable) -> dict[str, Any]: - return registered_user() +def current_user(create_registered_user: Callable) -> dict[str, Any]: + return create_registered_user() @pytest.fixture diff --git a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py index 4bfe998ad599..0f6a0ec31656 100644 --- a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py +++ b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py @@ -112,8 +112,7 @@ def minimal_configuration( dy_static_file_server_dynamic_sidecar_compose_spec_service: dict, simcore_services_ready: None, ensure_swarm_and_networks: None, -): - ... +): ... @pytest.fixture @@ -132,8 +131,8 @@ def uuid_dynamic_sidecar_compose(faker: Faker) -> str: @pytest.fixture -def user_dict(registered_user: Callable) -> dict[str, Any]: - return registered_user() +def user_dict(create_registered_user: Callable) -> dict[str, Any]: + return create_registered_user() @pytest.fixture diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py index 0d5060bc4667..5d64fcd87650 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py @@ -369,12 +369,12 @@ def product_name(faker: Faker) -> str: async def test_computation_create_validators( - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], faker: Faker, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) ComputationCreate( user_id=user["id"], @@ -396,11 +396,11 @@ async def test_create_computation( mocked_catalog_service_fcts: respx.MockRouter, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) create_computation_url = httpx.URL("/v2/computations") response = await async_client.post( @@ -493,7 +493,7 @@ async def test_create_computation_with_wallet( mocked_clusters_keeper_service_get_instance_type_details: mock.Mock, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, wallet_info: WalletInfo, @@ -506,7 +506,7 @@ async def test_create_computation_with_wallet( # In billable product a wallet is passed, with a selected pricing plan # the pricing plan contains information about the hardware that should be used # this will then override the original service resources - user = registered_user() + user = create_registered_user() proj = await project( user, @@ -603,12 +603,12 @@ async def test_create_computation_with_wallet_with_invalid_pricing_unit_name_rai mocked_clusters_keeper_service_get_instance_type_details_with_invalid_name: mock.Mock, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, wallet_info: WalletInfo, ): - user = registered_user() + user = create_registered_user() proj = await project( user, workbench=fake_workbench_without_outputs, @@ -644,12 +644,12 @@ async def test_create_computation_with_wallet_with_no_clusters_keeper_raises_503 mocked_resource_usage_tracker_service_fcts: respx.MockRouter, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, wallet_info: WalletInfo, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) create_computation_url = httpx.URL("/v2/computations") response = await async_client.post( @@ -672,11 +672,11 @@ async def test_start_computation_without_product_fails( mocked_catalog_service_fcts: respx.MockRouter, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) create_computation_url = httpx.URL("/v2/computations") response = await async_client.post( @@ -696,11 +696,11 @@ async def test_start_computation( mocked_catalog_service_fcts: respx.MockRouter, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) create_computation_url = httpx.URL("/v2/computations") response = await async_client.post( @@ -728,11 +728,11 @@ async def test_start_computation_with_project_node_resources_defined( mocked_catalog_service_fcts: respx.MockRouter, product_name: str, fake_workbench_without_outputs: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() assert "json_schema_extra" in ServiceResourcesDictHelpers.model_config assert isinstance( ServiceResourcesDictHelpers.model_config["json_schema_extra"], dict @@ -774,11 +774,11 @@ async def test_start_computation_with_deprecated_services_raises_406( product_name: str, fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) create_computation_url = httpx.URL("/v2/computations") response = await async_client.post( @@ -799,13 +799,13 @@ async def test_get_computation_from_empty_project( minimal_configuration: None, fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], faker: Faker, async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() get_computation_url = httpx.URL( f"/v2/computations/{faker.uuid4()}?user_id={user['id']}" ) @@ -850,13 +850,13 @@ async def test_get_computation_from_not_started_computation_task( minimal_configuration: None, fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) get_computation_url = httpx.URL( f"/v2/computations/{proj.uuid}?user_id={user['id']}" @@ -915,14 +915,14 @@ async def test_get_computation_from_published_computation_task( minimal_configuration: None, fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], async_client: httpx.AsyncClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) await create_pipeline( project_id=f"{proj.uuid}", diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py index f433b86b666a..2c539a7c2b6c 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py @@ -103,8 +103,8 @@ class Loc(NamedTuple): @pytest.fixture -def user(registered_user: Callable[..., dict[str, Any]]) -> dict[str, Any]: - return registered_user() +def user(create_registered_user: Callable[..., dict[str, Any]]) -> dict[str, Any]: + return create_registered_user() @pytest.fixture diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py index 3b179dc54e2d..54c91a752ebe 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -34,14 +34,14 @@ async def test_rpc_list_computation_runs_and_tasks( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], rpc_client: RabbitMQRPCClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) await create_pipeline( project_id=f"{proj.uuid}", @@ -108,14 +108,14 @@ async def test_rpc_list_computation_runs_and_tasks( async def test_rpc_list_computation_runs_with_filtering( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], rpc_client: RabbitMQRPCClient, ): - user = registered_user() + user = create_registered_user() proj_1 = await project(user, workbench=fake_workbench_without_outputs) await create_pipeline( @@ -158,14 +158,14 @@ async def test_rpc_list_computation_runs_with_filtering( async def test_rpc_list_computation_runs_history( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], rpc_client: RabbitMQRPCClient, ): - user = registered_user() + user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) await create_pipeline( diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py index 90ca0953fce8..f38b33024318 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py @@ -320,7 +320,7 @@ async def test_empty_pipeline_is_not_scheduled( with_disabled_auto_scheduling: mock.Mock, with_disabled_scheduler_worker: mock.Mock, initialized_app: FastAPI, - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], run_metadata: RunMetadataDict, @@ -329,7 +329,7 @@ async def test_empty_pipeline_is_not_scheduled( caplog: pytest.LogCaptureFixture, ): await assert_comp_runs_empty(sqlalchemy_async_engine) - user = registered_user() + user = create_registered_user() empty_project = await project(user) # the project is not in the comp_pipeline, therefore scheduling it should fail diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index 5b1cbf64aa3f..f84ff1121861 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -1087,7 +1087,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( with_disabled_scheduler_publisher: mock.Mock, initialized_app: FastAPI, scheduler_api: BaseCompScheduler, - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], fake_workbench_without_outputs: dict[str, Any], @@ -1097,7 +1097,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( ): """A pipeline which comp_tasks are missing should not be scheduled. It shall be aborted and shown as such in the comp_runs db""" - user = registered_user() + user = create_registered_user() sleepers_project = await project(user, workbench=fake_workbench_without_outputs) await create_pipeline( project_id=f"{sleepers_project.uuid}", diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 39fda4979de4..24cc05464149 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -219,14 +219,14 @@ async def _( @pytest.fixture async def publish_project( - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], ) -> Callable[[], Awaitable[PublishedProject]]: - user = registered_user() + user = create_registered_user() async def _() -> PublishedProject: created_project = await project(user, workbench=fake_workbench_without_outputs) @@ -254,7 +254,7 @@ async def published_project( @pytest.fixture async def running_project( - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], @@ -262,7 +262,7 @@ async def running_project( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], ) -> RunningProject: - user = registered_user() + user = create_registered_user() created_project = await project(user, workbench=fake_workbench_without_outputs) now_time = arrow.utcnow().datetime return RunningProject( @@ -291,7 +291,7 @@ async def running_project( @pytest.fixture async def running_project_mark_for_cancellation( - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], @@ -299,7 +299,7 @@ async def running_project_mark_for_cancellation( fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], ) -> RunningProject: - user = registered_user() + user = create_registered_user() created_project = await project(user, workbench=fake_workbench_without_outputs) now_time = arrow.utcnow().datetime return RunningProject( diff --git a/services/director-v2/tests/unit/with_dbs/test_cli.py b/services/director-v2/tests/unit/with_dbs/test_cli.py index 91d8f8a773e0..813bd93aa078 100644 --- a/services/director-v2/tests/unit/with_dbs/test_cli.py +++ b/services/director-v2/tests/unit/with_dbs/test_cli.py @@ -61,11 +61,11 @@ def cli_runner(minimal_configuration: None) -> CliRunner: @pytest.fixture async def project_at_db( - registered_user: Callable[..., dict[str, Any]], + create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], ) -> ProjectAtDB: - user = registered_user() + user = create_registered_user() return await project(user, workbench=fake_workbench_without_outputs) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_groups_extra_properties.py b/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_groups_extra_properties.py index 31baae6de579..2ac7d6cd0dfb 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_groups_extra_properties.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_groups_extra_properties.py @@ -1,7 +1,8 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -from typing import Any, Callable, Iterator, cast +from collections.abc import Callable, Iterator +from typing import Any, cast import pytest import sqlalchemy as sa @@ -115,11 +116,11 @@ def with_internet_access(request: pytest.FixtureRequest) -> bool: @pytest.fixture() async def user( mock_env: EnvVarsDict, - registered_user: Callable[..., dict], + create_registered_user: Callable[..., dict], give_internet_to_group: Callable[..., dict], with_internet_access: bool, ) -> dict[str, Any]: - user = registered_user() + user = create_registered_user() group_info = give_internet_to_group( group_id=user["primary_gid"], has_internet_access=with_internet_access ) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py b/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py index 3cdc76f4150f..14ff015d790c 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py @@ -71,11 +71,11 @@ def workbench() -> dict[str, Any]: @pytest.fixture() async def project( mock_env: EnvVarsDict, - registered_user: Callable[..., dict], + create_registered_user: Callable[..., dict], project: Callable[..., Awaitable[ProjectAtDB]], workbench: dict[str, Any], ) -> ProjectAtDB: - return await project(registered_user(), workbench=workbench) + return await project(create_registered_user(), workbench=workbench) async def test_is_node_present_in_workbench( diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py b/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py index 8778d17245ed..cb3d81a910d1 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py @@ -79,8 +79,8 @@ async def _assert_message_received( @pytest.fixture -def user(registered_user: Callable[..., dict]) -> dict: - return registered_user() +def user(create_registered_user: Callable[..., dict]) -> dict: + return create_registered_user() @pytest.fixture diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index db0da6a500ab..6f96b9a22f5d 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2580,7 +2580,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_list_ComputationRunRestGet__' + $ref: '#/components/schemas/Page_ComputationRunRestGet_' /v0/computations/{project_id}/iterations: get: tags: @@ -2625,7 +2625,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_list_ComputationRunRestGet__' + $ref: '#/components/schemas/Page_ComputationRunRestGet_' /v0/computations/{project_id}/iterations/latest/tasks: get: tags: @@ -2670,7 +2670,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_list_ComputationTaskRestGet__' + $ref: '#/components/schemas/Page_ComputationTaskRestGet_' /v0/projects/{project_id}:xport: post: tags: @@ -10735,38 +10735,6 @@ components: title: Error type: object title: Envelope[list[ApiKeyGet]] - Envelope_list_ComputationRunRestGet__: - properties: - data: - anyOf: - - items: - $ref: '#/components/schemas/ComputationRunRestGet' - type: array - - type: 'null' - title: Data - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - title: Envelope[list[ComputationRunRestGet]] - Envelope_list_ComputationTaskRestGet__: - properties: - data: - anyOf: - - items: - $ref: '#/components/schemas/ComputationTaskRestGet' - type: array - - type: 'null' - title: Data - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - title: Envelope[list[ComputationTaskRestGet]] Envelope_list_DatasetMetaData__: properties: data: @@ -13413,6 +13381,82 @@ components: - _links - data title: Page[CatalogLatestServiceGet] + Page_ComputationRunRestGet_: + properties: + items: + items: + $ref: '#/components/schemas/ComputationRunRestGet' + type: array + title: Items + total: + anyOf: + - type: integer + minimum: 0 + - type: 'null' + title: Total + page: + anyOf: + - type: integer + minimum: 1 + - type: 'null' + title: Page + size: + anyOf: + - type: integer + minimum: 1 + - type: 'null' + title: Size + pages: + anyOf: + - type: integer + minimum: 0 + - type: 'null' + title: Pages + type: object + required: + - items + - total + - page + - size + title: Page[ComputationRunRestGet] + Page_ComputationTaskRestGet_: + properties: + items: + items: + $ref: '#/components/schemas/ComputationTaskRestGet' + type: array + title: Items + total: + anyOf: + - type: integer + minimum: 0 + - type: 'null' + title: Total + page: + anyOf: + - type: integer + minimum: 1 + - type: 'null' + title: Page + size: + anyOf: + - type: integer + minimum: 1 + - type: 'null' + title: Size + pages: + anyOf: + - type: integer + minimum: 0 + - type: 'null' + title: Pages + type: object + required: + - items + - total + - page + - size + title: Page[ComputationTaskRestGet] Page_ConversationMessageRestGet_: properties: _meta: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py index a2e5045e4e12..5f0b6bb49b91 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -30,10 +30,10 @@ from ..projects.projects_metadata_service import ( get_project_custom_metadata_or_empty_dict, ) -from ..rabbitmq import get_rabbitmq_rpc_client +from ..rabbitmq import RabbitMQRPCClient, get_rabbitmq_rpc_client -async def _fetch_projects_metadata( +async def _get_projects_metadata( app: web.Application, project_uuids: list[ProjectID], ) -> list[dict]: @@ -48,7 +48,7 @@ async def _fetch_projects_metadata( ) -async def _fetch_root_project_names( +async def _get_root_project_names( app: web.Application, items: list[ComputationRunRpcGet] ) -> list[str]: """Resolve root project names from computation items""" @@ -89,11 +89,11 @@ async def list_computations_latest_iteration( ) # Get projects metadata - _projects_metadata = await _fetch_projects_metadata( + _projects_metadata = await _get_projects_metadata( app, project_uuids=[item.project_uuid for item in _runs_get.items] ) # Get Root project names - _projects_root_names = await _fetch_root_project_names(app, _runs_get.items) + _projects_root_names = await _get_root_project_names(app, _runs_get.items) _computational_runs_output = [ ComputationRunWithAttributes( @@ -139,10 +139,10 @@ async def list_computation_iterations( ) # Get projects metadata - _projects_metadata = await _fetch_projects_metadata(app, project_uuids=[project_id]) + _projects_metadata = await _get_projects_metadata(app, project_uuids=[project_id]) assert len(_projects_metadata) == 1 # nosec # Get Root project names - _projects_root_names = await _fetch_root_project_names(app, [_runs_get.items[0]]) + _projects_root_names = await _get_root_project_names(app, [_runs_get.items[0]]) assert len(_projects_root_names) == 1 # nosec _computational_runs_output = [ @@ -164,7 +164,7 @@ async def list_computation_iterations( async def _get_credits_or_zero_by_service_run_id( - rpc_client, service_run_id: ServiceRunID + rpc_client: RabbitMQRPCClient, service_run_id: ServiceRunID ) -> Decimal: try: return ( diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service_utils.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service_utils.py new file mode 100644 index 000000000000..f05430a42a86 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service_utils.py @@ -0,0 +1,247 @@ +from decimal import Decimal + +from aiohttp import web +from models_library.computations import ( + ComputationRunWithAttributes, + ComputationTaskWithAttributes, +) +from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.rest_ordering import OrderBy +from models_library.users import UserID +from pydantic import NonNegativeInt +from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations +from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import ( + credit_transactions, +) +from servicelib.utils import limited_gather + +from ..products.products_service import is_product_billable +from ..projects.api import ( + batch_get_project_name, + check_user_project_permission, + get_project_dict_legacy, +) +from ..projects.projects_metadata_service import ( + get_project_custom_metadata_or_empty_dict, +) +from ..rabbitmq import get_rabbitmq_rpc_client + + +async def _fetch_projects_metadata( + app: web.Application, + project_uuids: list[ProjectID], # Using str instead of ProjectID for compatibility +) -> list[dict]: + """Batch fetch project metadata with concurrency control""" + return await limited_gather( + *[ + get_project_custom_metadata_or_empty_dict(app, project_uuid=uuid) + for uuid in project_uuids + ], + limit=20, + ) + + +async def list_computations_latest_iteration( + app: web.Application, + product_name: ProductName, + user_id: UserID, + # filters + filter_only_running: bool, # noqa: FBT001 + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> tuple[int, list[ComputationRunWithAttributes]]: + """Returns the list of computations (only latest iterations)""" + rpc_client = get_rabbitmq_rpc_client(app) + _runs_get = await computations.list_computations_latest_iteration_page( + rpc_client, + product_name=product_name, + user_id=user_id, + filter_only_running=filter_only_running, + offset=offset, + limit=limit, + order_by=order_by, + ) + + # Get projects metadata (NOTE: MD: can be improved with a single batch call) + _projects_metadata = await _fetch_projects_metadata( + app, project_uuids=[item.project_uuid for item in _runs_get.items] + ) + + # Get Root project names + _projects_root_uuids: list[ProjectID] = [] + for item in _runs_get.items: + if ( + prj_root_id := item.info.get("project_metadata", {}).get( + "root_parent_project_id", None + ) + ) is not None: + _projects_root_uuids.append(ProjectID(prj_root_id)) + else: + _projects_root_uuids.append(item.project_uuid) + + _projects_root_names = await batch_get_project_name( + app, projects_uuids=_projects_root_uuids + ) + + _computational_runs_output = [ + ComputationRunWithAttributes( + project_uuid=item.project_uuid, + iteration=item.iteration, + state=item.state, + info=item.info, + submitted_at=item.submitted_at, + started_at=item.started_at, + ended_at=item.ended_at, + root_project_name=project_name, + project_custom_metadata=project_metadata, + ) + for item, project_metadata, project_name in zip( + _runs_get.items, _projects_metadata, _projects_root_names, strict=True + ) + ] + + return _runs_get.total, _computational_runs_output + + +async def list_computation_iterations( + app: web.Application, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> tuple[int, list[ComputationRunWithAttributes]]: + """Returns the list of computations (only latest iterations)""" + rpc_client = get_rabbitmq_rpc_client(app) + _runs_get = await computations.list_computations_iterations_page( + rpc_client, + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + + # Get projects metadata (NOTE: MD: can be improved with a single batch call) + _projects_metadata = await limited_gather( + *[ + get_project_custom_metadata_or_empty_dict( + app, project_uuid=item.project_uuid + ) + for item in _runs_get.items + ], + limit=20, + ) + + # Get Root project names + _projects_root_uuids: list[ProjectID] = [] + for item in _runs_get.items: + if ( + prj_root_id := item.info.get("project_metadata", {}).get( + "root_parent_project_id", None + ) + ) is not None: + _projects_root_uuids.append(ProjectID(prj_root_id)) + else: + _projects_root_uuids.append(item.project_uuid) + + _projects_root_names = await batch_get_project_name( + app, projects_uuids=_projects_root_uuids + ) + + _computational_runs_output = [ + ComputationRunWithAttributes( + project_uuid=item.project_uuid, + iteration=item.iteration, + state=item.state, + info=item.info, + submitted_at=item.submitted_at, + started_at=item.started_at, + ended_at=item.ended_at, + root_project_name=project_name, + project_custom_metadata=project_metadata, + ) + for item, project_metadata, project_name in zip( + _runs_get.items, _projects_metadata, _projects_root_names, strict=True + ) + ] + + return _runs_get.total, _computational_runs_output + + +async def list_computations_latest_iteration_tasks( + app: web.Application, + product_name: ProductName, + user_id: UserID, + project_id: ProjectID, + # pagination + offset: int, + limit: NonNegativeInt, + # ordering + order_by: OrderBy, +) -> tuple[int, list[ComputationTaskWithAttributes]]: + """Returns the list of tasks for the latest iteration of a computation""" + + await check_user_project_permission( + app, project_id=project_id, user_id=user_id, product_name=product_name + ) + + rpc_client = get_rabbitmq_rpc_client(app) + _tasks_get = await computations.list_computations_latest_iteration_tasks_page( + rpc_client, + product_name=product_name, + user_id=user_id, + project_id=project_id, + offset=offset, + limit=limit, + order_by=order_by, + ) + + # Get node names (for all project nodes) + project_dict = await get_project_dict_legacy(app, project_uuid=project_id) + workbench = project_dict["workbench"] + + _service_run_ids = [item.service_run_id for item in _tasks_get.items] + _is_product_billable = await is_product_billable(app, product_name=product_name) + _service_run_osparc_credits: list[Decimal | None] + if _is_product_billable: + # NOTE: MD: can be improved with a single batch call + _service_run_osparc_credits = await limited_gather( + *[ + credit_transactions.get_transaction_current_credits_by_service_run_id( + rpc_client, service_run_id=_run_id + ) + for _run_id in _service_run_ids + ], + limit=20, + ) + else: + _service_run_osparc_credits = [None for _ in _service_run_ids] + + # Final output + _tasks_get_output = [ + ComputationTaskWithAttributes( + project_uuid=item.project_uuid, + node_id=item.node_id, + state=item.state, + progress=item.progress, + image=item.image, + started_at=item.started_at, + ended_at=item.ended_at, + log_download_link=item.log_download_link, + node_name=workbench[f"{item.node_id}"].get("label", ""), + osparc_credits=credits_or_none, + ) + for item, credits_or_none in zip( + _tasks_get.items, _service_run_osparc_credits, strict=True + ) + ] + return _tasks_get.total, _tasks_get_output From 84171d5fee472b55a48490e3c61d6672cd2f54e0 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 14 May 2025 12:35:17 +0200 Subject: [PATCH 6/7] rerun ci --- .../director_v2/_controller/computations_rest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py index a67d7f83e5e9..3f13721b34a5 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_controller/computations_rest.py @@ -115,7 +115,6 @@ async def list_computation_iterations(request: web.Request) -> web.Response: # ordering order_by=OrderBy.model_construct(**query_params.order_by.model_dump()), ) - page = Page[ComputationRunRestGet].model_validate( paginate_data( chunk=[ From 5098290b7c4d95226d4877a558fda49a95bb4ea6 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 14 May 2025 12:52:16 +0200 Subject: [PATCH 7/7] fix --- .../director_v2/_computations_service.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py index 5f0b6bb49b91..fac04998d1dc 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_computations_service.py @@ -12,6 +12,9 @@ from models_library.services_types import ServiceRunID from models_library.users import UserID from pydantic import NonNegativeInt +from servicelib.rabbitmq import ( + RabbitMQRPCClient, +) from servicelib.rabbitmq.rpc_interfaces.director_v2 import computations from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import ( credit_transactions, @@ -30,7 +33,7 @@ from ..projects.projects_metadata_service import ( get_project_custom_metadata_or_empty_dict, ) -from ..rabbitmq import RabbitMQRPCClient, get_rabbitmq_rpc_client +from ..rabbitmq import get_rabbitmq_rpc_client async def _get_projects_metadata(