diff --git a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py index 183f2920387a..eeb270c46cbb 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/rpc/_computations.py @@ -13,8 +13,10 @@ from models_library.users import UserID from servicelib.rabbitmq import RPCRouter from servicelib.utils import limited_gather -from simcore_service_director_v2.models.comp_tasks import ComputationTaskForRpcDBGet +from ...core.errors import ComputationalRunNotFoundError +from ...models.comp_runs import CompRunsAtDB +from ...models.comp_tasks import ComputationTaskForRpcDBGet from ...modules.db.repositories.comp_runs import CompRunsRepository from ...modules.db.repositories.comp_tasks import CompTasksRepository from ...utils import dask as dask_utils @@ -95,6 +97,19 @@ async def _fetch_task_log( return None +async def _get_latest_run_or_none( + comp_runs_repo: CompRunsRepository, + user_id: UserID, + project_uuid: ProjectID, +) -> CompRunsAtDB | None: + try: + return await comp_runs_repo.get( + user_id=user_id, project_id=project_uuid, iteration=None + ) + except ComputationalRunNotFoundError: + return None + + @router.expose(reraise_if_error_type=()) async def list_computations_latest_iteration_tasks_page( app: FastAPI, @@ -127,13 +142,15 @@ async def list_computations_latest_iteration_tasks_page( # Fetch latest run for each project concurrently latest_runs = await limited_gather( *[ - comp_runs_repo.get(user_id=user_id, project_id=project_uuid, iteration=None) + _get_latest_run_or_none(comp_runs_repo, user_id, project_uuid) for project_uuid in unique_project_uuids ], limit=20, ) # Build a dict: project_uuid -> iteration - project_uuid_to_iteration = {run.project_uuid: run.iteration for run in latest_runs} + project_uuid_to_iteration = { + run.project_uuid: run.iteration for run in latest_runs if run is not None + } # Run all log fetches concurrently log_files = await limited_gather( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 511b53fe1c08..75e1639ad84d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -221,9 +221,7 @@ async def list_for_user__only_latest_iterations( if order_by is None: order_by = OrderBy(field=IDStr("run_id")) # default ordering - base_select_query = sa.select( - *self._COMPUTATION_RUNS_RPC_GET_COLUMNS - ).select_from( + _latest_runs = ( sa.select( comp_runs.c.project_uuid, sa.func.max(comp_runs.c.iteration).label( @@ -235,26 +233,28 @@ async def list_for_user__only_latest_iterations( & ( comp_runs.c.metadata["product_name"].astext == product_name ) # <-- NOTE: We might create a separate column for this for fast retrieval - & ( - comp_runs.c.result.in_( - [ - RUNNING_STATE_TO_DB[item] - for item in RunningState.list_running_states() - ] - ) - ) - if filter_only_running - else True ) .group_by(comp_runs.c.project_uuid) - .subquery("latest_runs") - .join( + ) + if filter_only_running: + _latest_runs = _latest_runs.where( + comp_runs.c.result.in_( + [ + RUNNING_STATE_TO_DB[item] + for item in RunningState.list_running_states() + ] + ) + ) + _latest_runs_subquery = _latest_runs.subquery().alias("latest_runs") + + base_select_query = sa.select( + *self._COMPUTATION_RUNS_RPC_GET_COLUMNS + ).select_from( + _latest_runs_subquery.join( comp_runs, sa.and_( - comp_runs.c.project_uuid - == literal_column("latest_runs.project_uuid"), - comp_runs.c.iteration - == literal_column("latest_runs.latest_iteration"), + comp_runs.c.project_uuid == _latest_runs_subquery.c.project_uuid, + comp_runs.c.iteration == _latest_runs_subquery.c.latest_iteration, ), ) )