diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index dda8598ccf9..0b4a71301e0 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -97,8 +97,8 @@ async def _check_pipeline_not_running_or_raise_409( computation: ComputationCreate, ) -> None: with contextlib.suppress(ComputationalRunNotFoundError): - last_run = await comp_runs_repo.get( - user_id=computation.user_id, project_id=computation.project_id + last_run = await comp_runs_repo.get_latest_run_by_project( + project_id=computation.project_id ) pipeline_state = last_run.result @@ -367,8 +367,8 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa last_run: CompRunsAtDB | None = None pipeline_state = RunningState.NOT_STARTED with contextlib.suppress(ComputationalRunNotFoundError): - last_run = await comp_runs_repo.get( - user_id=computation.user_id, project_id=computation.project_id + last_run = await comp_runs_repo.get_latest_run_by_project( + project_id=computation.project_id ) pipeline_state = last_run.result @@ -467,7 +467,7 @@ async def get_computation( last_run: CompRunsAtDB | None = None pipeline_state = RunningState.NOT_STARTED with contextlib.suppress(ComputationalRunNotFoundError): - last_run = await comp_runs_repo.get(user_id=user_id, project_id=project_id) + last_run = await comp_runs_repo.get_latest_run_by_project(project_id=project_id) pipeline_state = last_run.result _logger.debug( @@ -542,8 +542,8 @@ async def stop_computation( last_run: CompRunsAtDB | None = None pipeline_state = RunningState.UNKNOWN with contextlib.suppress(ComputationalRunNotFoundError): - last_run = await comp_runs_repo.get( - user_id=computation_stop.user_id, project_id=project_id + last_run = await comp_runs_repo.get_latest_run_by_project( + project_id=project_id ) pipeline_state = last_run.result if utils.is_pipeline_running(last_run.result): @@ -601,8 +601,8 @@ async def delete_computation( # check if current state allow to stop the computation pipeline_state = RunningState.UNKNOWN with contextlib.suppress(ComputationalRunNotFoundError): - last_run = await comp_runs_repo.get( - user_id=computation_stop.user_id, project_id=project_id + last_run = await comp_runs_repo.get_latest_run_by_project( + project_id=project_id ) pipeline_state = last_run.result if utils.is_pipeline_running(pipeline_state): @@ -636,8 +636,8 @@ def return_last_value(retry_state: Any) -> Any: before_sleep=before_sleep_log(_logger, logging.INFO), ) async def check_pipeline_stopped() -> bool: - last_run = await comp_runs_repo.get( - user_id=computation_stop.user_id, project_id=project_id + last_run = await comp_runs_repo.get_latest_run_by_project( + project_id=project_id ) pipeline_state = last_run.result return utils.is_pipeline_stopped(pipeline_state) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 3da15d494f3..cbc38c983f3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -145,6 +145,22 @@ async def get( raise ComputationalRunNotFoundError return CompRunsAtDB.model_validate(row) + async def get_latest_run_by_project( + self, + project_id: ProjectID, + ) -> CompRunsAtDB: + async with pass_or_acquire_connection(self.db_engine) as conn: + result = await conn.execute( + sa.select(comp_runs) + .where(comp_runs.c.project_uuid == f"{project_id}") + .order_by(desc(comp_runs.c.run_id)) + .limit(1) + ) + row = result.one_or_none() + if not row: + raise ComputationalRunNotFoundError + return CompRunsAtDB.model_validate(row) + async def list_( self, *, diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py index 9e521c54a3a..da24d32eb86 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py @@ -1245,3 +1245,78 @@ async def test_list_group_by_collection_run_id_state_priority_precedence( assert len(items) == 1 collection_item = items[0] assert collection_item.state == RunningState.FAILED + + +async def test_get_latest_run_by_project( + sqlalchemy_async_engine: AsyncEngine, + run_metadata: RunMetadataDict, + faker: Faker, + publish_project: Callable[[], Awaitable[PublishedProject]], + create_registered_user: Callable[..., dict[str, Any]], + with_product: dict[str, Any], +): + """Test that get() with user_id=None retrieves the latest run regardless of user""" + published_project = await publish_project() + + # Create a second user + second_user = create_registered_user() + + # Create comp runs for the original user + comp_run_user1_iter1 = await CompRunsRepository(sqlalchemy_async_engine).create( + user_id=published_project.user["id"], + project_id=published_project.project.uuid, + iteration=None, + metadata=run_metadata, + use_on_demand_clusters=faker.pybool(), + dag_adjacency_list=published_project.pipeline.dag_adjacency_list, + collection_run_id=CollectionRunID(faker.uuid4()), + ) + + # Create comp runs for the second user (this should increment iteration) + comp_run_user2_iter2 = await CompRunsRepository(sqlalchemy_async_engine).create( + user_id=second_user["id"], + project_id=published_project.project.uuid, + iteration=None, + metadata=run_metadata, + use_on_demand_clusters=faker.pybool(), + dag_adjacency_list=published_project.pipeline.dag_adjacency_list, + collection_run_id=CollectionRunID(faker.uuid4()), + ) + + # Create another run for the first user (should be iteration 3) + comp_run_user1_iter3 = await CompRunsRepository(sqlalchemy_async_engine).create( + user_id=published_project.user["id"], + project_id=published_project.project.uuid, + iteration=None, + metadata=run_metadata, + use_on_demand_clusters=faker.pybool(), + dag_adjacency_list=published_project.pipeline.dag_adjacency_list, + collection_run_id=CollectionRunID(faker.uuid4()), + ) + + # Verify iterations are correct + assert comp_run_user1_iter1.iteration == 1 + assert comp_run_user2_iter2.iteration == 1 + assert comp_run_user1_iter3.iteration == 2 + + # Test get with user_id=None should return the latest run (highest iteration) + latest_run = await CompRunsRepository( + sqlalchemy_async_engine + ).get_latest_run_by_project( + project_id=published_project.project.uuid, + ) + assert latest_run == comp_run_user1_iter3 + assert latest_run.iteration == 2 + + # Test get with specific user_id still works + user1_latest = await CompRunsRepository(sqlalchemy_async_engine).get( + user_id=published_project.user["id"], + project_id=published_project.project.uuid, + ) + assert user1_latest == comp_run_user1_iter3 + + user2_latest = await CompRunsRepository(sqlalchemy_async_engine).get( + user_id=second_user["id"], + project_id=published_project.project.uuid, + ) + assert user2_latest == comp_run_user2_iter2