From 5edcc8b1522f2626c6ee2e5f7049d244e60cdb01 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 22 May 2025 16:38:06 +0200 Subject: [PATCH 1/3] fix --- .../api/rpc/_computations.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index 183f2920387a..eeb270c46cbb 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -13,8 +13,10 @@ from models_library.users import UserID from servicelib.rabbitmq import RPCRouter from servicelib.utils import limited_gather -from simcore_service_director_v2.models.comp_tasks import ComputationTaskForRpcDBGet +from ...core.errors import ComputationalRunNotFoundError +from ...models.comp_runs import CompRunsAtDB +from ...models.comp_tasks import ComputationTaskForRpcDBGet from ...modules.db.repositories.comp_runs import CompRunsRepository from ...modules.db.repositories.comp_tasks import CompTasksRepository from ...utils import dask as dask_utils @@ -95,6 +97,19 @@ async def _fetch_task_log( return None +async def _get_latest_run_or_none( + comp_runs_repo: CompRunsRepository, + user_id: UserID, + project_uuid: ProjectID, +) -> CompRunsAtDB | None: + try: + return await comp_runs_repo.get( + user_id=user_id, project_id=project_uuid, iteration=None + ) + except ComputationalRunNotFoundError: + return None + + @router.expose(reraise_if_error_type=()) async def list_computations_latest_iteration_tasks_page( app: FastAPI, @@ -127,13 +142,15 @@ async def list_computations_latest_iteration_tasks_page( # Fetch latest run for each project concurrently latest_runs = await limited_gather( *[ - comp_runs_repo.get(user_id=user_id, project_id=project_uuid, iteration=None) + _get_latest_run_or_none(comp_runs_repo, user_id, project_uuid) for project_uuid in unique_project_uuids ], limit=20, ) # Build a dict: project_uuid -> iteration - project_uuid_to_iteration = {run.project_uuid: run.iteration for run in latest_runs} + project_uuid_to_iteration = { + run.project_uuid: run.iteration for run in latest_runs if run is not None + } # Run all log fetches concurrently log_files = await limited_gather( From c9de24a2a883024c60166b2f12921e8e6d3d720d Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Fri, 23 May 2025 08:57:25 +0200 Subject: [PATCH 2/3] bug fix listing --- .../api/rpc/_computations.py | 2 ++ .../modules/db/repositories/comp_runs.py | 31 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index eeb270c46cbb..445f5dbd6ddb 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -129,6 +129,8 @@ async def list_computations_latest_iteration_tasks_page( comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine) comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) + # Get latest + total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain( project_ids=project_ids, offset=offset, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 511b53fe1c08..3aee20e6db1e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -221,9 +221,7 @@ async def list_for_user__only_latest_iterations( if order_by is None: order_by = OrderBy(field=IDStr("run_id")) # default ordering - base_select_query = sa.select( - *self._COMPUTATION_RUNS_RPC_GET_COLUMNS - ).select_from( + _latest_runs = ( sa.select( comp_runs.c.project_uuid, sa.func.max(comp_runs.c.iteration).label( @@ -235,20 +233,23 @@ async def list_for_user__only_latest_iterations( & ( comp_runs.c.metadata["product_name"].astext == product_name ) # <-- NOTE: We might create a separate column for this for fast retrieval - & ( - comp_runs.c.result.in_( - [ - RUNNING_STATE_TO_DB[item] - for item in RunningState.list_running_states() - ] - ) - ) - if filter_only_running - else True ) .group_by(comp_runs.c.project_uuid) - .subquery("latest_runs") - .join( + ) + if filter_only_running: + _latest_runs = _latest_runs.where( + comp_runs.c.result.in_( + [ + RUNNING_STATE_TO_DB[item] + for item in RunningState.list_running_states() + ] + ) + ) + + base_select_query = sa.select( + *self._COMPUTATION_RUNS_RPC_GET_COLUMNS + ).select_from( + _latest_runs.subquery().join( comp_runs, sa.and_( comp_runs.c.project_uuid From 71d2515aa62757a4089ef127bccfddfb4d2b8fb9 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Fri, 23 May 2025 11:00:50 +0200 Subject: [PATCH 3/3] bug fix listing --- .../simcore_service_director_v2/api/rpc/_computations.py | 2 -- .../modules/db/repositories/comp_runs.py | 9 ++++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index 445f5dbd6ddb..eeb270c46cbb 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -129,8 +129,6 @@ async def list_computations_latest_iteration_tasks_page( comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine) comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine) - # Get latest - total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain( project_ids=project_ids, offset=offset, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 3aee20e6db1e..75e1639ad84d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -245,17 +245,16 @@ async def list_for_user__only_latest_iterations( ] ) ) + _latest_runs_subquery = _latest_runs.subquery().alias("latest_runs") base_select_query = sa.select( *self._COMPUTATION_RUNS_RPC_GET_COLUMNS ).select_from( - _latest_runs.subquery().join( + _latest_runs_subquery.join( comp_runs, sa.and_( - comp_runs.c.project_uuid - == literal_column("latest_runs.project_uuid"), - comp_runs.c.iteration - == literal_column("latest_runs.latest_iteration"), + comp_runs.c.project_uuid == _latest_runs_subquery.c.project_uuid, + comp_runs.c.iteration == _latest_runs_subquery.c.latest_iteration, ), ) )