diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index b14e7e4f0cb..9d40b534db9 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -590,9 +590,7 @@ async def _process_executing_tasks( """process executing tasks from the 3rd party backend""" @abstractmethod - async def _release_resources( - self, user_id: UserID, project_id: ProjectID, comp_run: CompRunsAtDB - ) -> None: + async def _release_resources(self, comp_run: CompRunsAtDB) -> None: """release resources used by the scheduler for a given user and project""" async def apply( @@ -660,7 +658,7 @@ async def apply( # 7. Are we done scheduling that pipeline? if not dag.nodes() or pipeline_result in COMPLETED_STATES: - await self._release_resources(user_id, project_id, comp_run) + await self._release_resources(comp_run) # there is nothing left, the run is completed, we're done here _logger.info( "pipeline %s scheduling completed with result %s", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py index 21316769343..1fb4564ab24 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py @@ -56,7 +56,7 @@ _logger = logging.getLogger(__name__) -_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{run_id}" +_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{project_id}:{run_id}" @asynccontextmanager @@ -65,6 +65,7 @@ async def _cluster_dask_client( scheduler: "DaskScheduler", *, use_on_demand_clusters: bool, + project_id: ProjectID, run_id: PositiveInt, run_metadata: RunMetadataDict, ) -> AsyncIterator[DaskClient]: @@ -76,7 +77,10 @@ async def _cluster_dask_client( wallet_id=run_metadata.get("wallet_id"), ) async with scheduler.dask_clients_pool.acquire( - cluster, ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=run_id) + cluster, + ref=_DASK_CLIENT_RUN_REF.format( + user_id=user_id, project_id=project_id, run_id=run_id + ), ) as client: yield client @@ -106,6 +110,7 @@ async def _start_tasks( user_id, self, use_on_demand_clusters=comp_run.use_on_demand_clusters, + project_id=comp_run.project_uuid, run_id=comp_run.run_id, run_metadata=comp_run.metadata, ) as client: @@ -157,6 +162,7 @@ async def _get_tasks_status( user_id, self, use_on_demand_clusters=comp_run.use_on_demand_clusters, + project_id=comp_run.project_uuid, run_id=comp_run.run_id, run_metadata=comp_run.metadata, ) as client: @@ -178,6 +184,7 @@ async def _process_executing_tasks( user_id, self, use_on_demand_clusters=comp_run.use_on_demand_clusters, + project_id=comp_run.project_uuid, run_id=comp_run.run_id, run_metadata=comp_run.metadata, ) as client: @@ -225,20 +232,22 @@ async def _process_executing_tasks( ) ) - async def _release_resources( - self, user_id: UserID, project_id: ProjectID, comp_run: CompRunsAtDB - ) -> None: + async def _release_resources(self, comp_run: CompRunsAtDB) -> None: """release resources used by the scheduler for a given user and project""" with ( log_catch(_logger, reraise=False), log_context( _logger, logging.INFO, - msg=f"releasing resources for {user_id=}, {project_id=}, {comp_run.run_id=}", + msg=f"releasing resources for {comp_run.user_id=}, {comp_run.project_uuid=}, {comp_run.run_id=}", ), ): await self.dask_clients_pool.release_client_ref( - ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=comp_run.run_id) + ref=_DASK_CLIENT_RUN_REF.format( + user_id=comp_run.user_id, + project_id=comp_run.project_uuid, + run_id=comp_run.run_id, + ) ) async def _stop_tasks( @@ -250,6 +259,7 @@ async def _stop_tasks( user_id, self, use_on_demand_clusters=comp_run.use_on_demand_clusters, + project_id=comp_run.project_uuid, run_id=comp_run.run_id, run_metadata=comp_run.metadata, ) as client: @@ -284,6 +294,7 @@ async def _process_completed_tasks( user_id, self, use_on_demand_clusters=comp_run.use_on_demand_clusters, + project_id=comp_run.project_uuid, run_id=comp_run.run_id, run_metadata=comp_run.metadata, ) as client: @@ -304,6 +315,7 @@ async def _process_completed_tasks( user_id, self, use_on_demand_clusters=comp_run.use_on_demand_clusters, + project_id=comp_run.project_uuid, run_id=comp_run.run_id, run_metadata=comp_run.metadata, ) as client: