Skip to content

Commit 11846e3

Browse files
authored
🐛Ensure Dask client reference is uniquely defined for reference counting (#7937)
1 parent a21f916 commit 11846e3

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,9 +590,7 @@ async def _process_executing_tasks(
590590
"""process executing tasks from the 3rd party backend"""
591591

592592
@abstractmethod
593-
async def _release_resources(
594-
self, user_id: UserID, project_id: ProjectID, comp_run: CompRunsAtDB
595-
) -> None:
593+
async def _release_resources(self, comp_run: CompRunsAtDB) -> None:
596594
"""release resources used by the scheduler for a given user and project"""
597595

598596
async def apply(
@@ -660,7 +658,7 @@ async def apply(
660658

661659
# 7. Are we done scheduling that pipeline?
662660
if not dag.nodes() or pipeline_result in COMPLETED_STATES:
663-
await self._release_resources(user_id, project_id, comp_run)
661+
await self._release_resources(comp_run)
664662
# there is nothing left, the run is completed, we're done here
665663
_logger.info(
666664
"pipeline %s scheduling completed with result %s",

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
_logger = logging.getLogger(__name__)
5858

59-
_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{run_id}"
59+
_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{project_id}:{run_id}"
6060

6161

6262
@asynccontextmanager
@@ -65,6 +65,7 @@ async def _cluster_dask_client(
6565
scheduler: "DaskScheduler",
6666
*,
6767
use_on_demand_clusters: bool,
68+
project_id: ProjectID,
6869
run_id: PositiveInt,
6970
run_metadata: RunMetadataDict,
7071
) -> AsyncIterator[DaskClient]:
@@ -76,7 +77,10 @@ async def _cluster_dask_client(
7677
wallet_id=run_metadata.get("wallet_id"),
7778
)
7879
async with scheduler.dask_clients_pool.acquire(
79-
cluster, ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=run_id)
80+
cluster,
81+
ref=_DASK_CLIENT_RUN_REF.format(
82+
user_id=user_id, project_id=project_id, run_id=run_id
83+
),
8084
) as client:
8185
yield client
8286

@@ -106,6 +110,7 @@ async def _start_tasks(
106110
user_id,
107111
self,
108112
use_on_demand_clusters=comp_run.use_on_demand_clusters,
113+
project_id=comp_run.project_uuid,
109114
run_id=comp_run.run_id,
110115
run_metadata=comp_run.metadata,
111116
) as client:
@@ -157,6 +162,7 @@ async def _get_tasks_status(
157162
user_id,
158163
self,
159164
use_on_demand_clusters=comp_run.use_on_demand_clusters,
165+
project_id=comp_run.project_uuid,
160166
run_id=comp_run.run_id,
161167
run_metadata=comp_run.metadata,
162168
) as client:
@@ -178,6 +184,7 @@ async def _process_executing_tasks(
178184
user_id,
179185
self,
180186
use_on_demand_clusters=comp_run.use_on_demand_clusters,
187+
project_id=comp_run.project_uuid,
181188
run_id=comp_run.run_id,
182189
run_metadata=comp_run.metadata,
183190
) as client:
@@ -225,20 +232,22 @@ async def _process_executing_tasks(
225232
)
226233
)
227234

228-
async def _release_resources(
229-
self, user_id: UserID, project_id: ProjectID, comp_run: CompRunsAtDB
230-
) -> None:
235+
async def _release_resources(self, comp_run: CompRunsAtDB) -> None:
231236
"""release resources used by the scheduler for a given user and project"""
232237
with (
233238
log_catch(_logger, reraise=False),
234239
log_context(
235240
_logger,
236241
logging.INFO,
237-
msg=f"releasing resources for {user_id=}, {project_id=}, {comp_run.run_id=}",
242+
msg=f"releasing resources for {comp_run.user_id=}, {comp_run.project_uuid=}, {comp_run.run_id=}",
238243
),
239244
):
240245
await self.dask_clients_pool.release_client_ref(
241-
ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=comp_run.run_id)
246+
ref=_DASK_CLIENT_RUN_REF.format(
247+
user_id=comp_run.user_id,
248+
project_id=comp_run.project_uuid,
249+
run_id=comp_run.run_id,
250+
)
242251
)
243252

244253
async def _stop_tasks(
@@ -250,6 +259,7 @@ async def _stop_tasks(
250259
user_id,
251260
self,
252261
use_on_demand_clusters=comp_run.use_on_demand_clusters,
262+
project_id=comp_run.project_uuid,
253263
run_id=comp_run.run_id,
254264
run_metadata=comp_run.metadata,
255265
) as client:
@@ -284,6 +294,7 @@ async def _process_completed_tasks(
284294
user_id,
285295
self,
286296
use_on_demand_clusters=comp_run.use_on_demand_clusters,
297+
project_id=comp_run.project_uuid,
287298
run_id=comp_run.run_id,
288299
run_metadata=comp_run.metadata,
289300
) as client:
@@ -304,6 +315,7 @@ async def _process_completed_tasks(
304315
user_id,
305316
self,
306317
use_on_demand_clusters=comp_run.use_on_demand_clusters,
318+
project_id=comp_run.project_uuid,
307319
run_id=comp_run.run_id,
308320
run_metadata=comp_run.metadata,
309321
) as client:

0 commit comments

Comments
 (0)