Skip to content

Commit e4cc8a6

Browse files
committed
fix
1 parent a21f916 commit e4cc8a6

File tree

1 file changed

+15
-3
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+15
-3
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
_logger = logging.getLogger(__name__)
5858

59-
_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{run_id}"
59+
_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{project_id}:{run_id}"
6060

6161

6262
@asynccontextmanager
@@ -65,6 +65,7 @@ async def _cluster_dask_client(
6565
scheduler: "DaskScheduler",
6666
*,
6767
use_on_demand_clusters: bool,
68+
project_id: ProjectID,
6869
run_id: PositiveInt,
6970
run_metadata: RunMetadataDict,
7071
) -> AsyncIterator[DaskClient]:
@@ -76,7 +77,10 @@ async def _cluster_dask_client(
7677
wallet_id=run_metadata.get("wallet_id"),
7778
)
7879
async with scheduler.dask_clients_pool.acquire(
79-
cluster, ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=run_id)
80+
cluster,
81+
ref=_DASK_CLIENT_RUN_REF.format(
82+
user_id=user_id, project_id=project_id, run_id=run_id
83+
),
8084
) as client:
8185
yield client
8286

@@ -106,6 +110,7 @@ async def _start_tasks(
106110
user_id,
107111
self,
108112
use_on_demand_clusters=comp_run.use_on_demand_clusters,
113+
project_id=comp_run.project_uuid,
109114
run_id=comp_run.run_id,
110115
run_metadata=comp_run.metadata,
111116
) as client:
@@ -157,6 +162,7 @@ async def _get_tasks_status(
157162
user_id,
158163
self,
159164
use_on_demand_clusters=comp_run.use_on_demand_clusters,
165+
project_id=comp_run.project_uuid,
160166
run_id=comp_run.run_id,
161167
run_metadata=comp_run.metadata,
162168
) as client:
@@ -178,6 +184,7 @@ async def _process_executing_tasks(
178184
user_id,
179185
self,
180186
use_on_demand_clusters=comp_run.use_on_demand_clusters,
187+
project_id=comp_run.project_uuid,
181188
run_id=comp_run.run_id,
182189
run_metadata=comp_run.metadata,
183190
) as client:
@@ -238,7 +245,9 @@ async def _release_resources(
238245
),
239246
):
240247
await self.dask_clients_pool.release_client_ref(
241-
ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=comp_run.run_id)
248+
ref=_DASK_CLIENT_RUN_REF.format(
249+
user_id=user_id, project_id=project_id, run_id=comp_run.run_id
250+
)
242251
)
243252

244253
async def _stop_tasks(
@@ -250,6 +259,7 @@ async def _stop_tasks(
250259
user_id,
251260
self,
252261
use_on_demand_clusters=comp_run.use_on_demand_clusters,
262+
project_id=comp_run.project_uuid,
253263
run_id=comp_run.run_id,
254264
run_metadata=comp_run.metadata,
255265
) as client:
@@ -284,6 +294,7 @@ async def _process_completed_tasks(
284294
user_id,
285295
self,
286296
use_on_demand_clusters=comp_run.use_on_demand_clusters,
297+
project_id=comp_run.project_uuid,
287298
run_id=comp_run.run_id,
288299
run_metadata=comp_run.metadata,
289300
) as client:
@@ -304,6 +315,7 @@ async def _process_completed_tasks(
304315
user_id,
305316
self,
306317
use_on_demand_clusters=comp_run.use_on_demand_clusters,
318+
project_id=comp_run.project_uuid,
307319
run_id=comp_run.run_id,
308320
run_metadata=comp_run.metadata,
309321
) as client:

0 commit comments

Comments
 (0)