Skip to content

Commit ddc3e74

Browse files
authored
🐛Director-v2: properly close dask client when use is completed (#7880)
1 parent 533c02e commit ddc3e74

File tree

8 files changed

+244
-84
lines changed

8 files changed

+244
-84
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/clusters_keeper/clusters.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@
66
from models_library.rabbitmq_basic_types import RPCMethodName
77
from models_library.users import UserID
88
from models_library.wallets import WalletID
9+
from pydantic import TypeAdapter
910

1011
from ....async_utils import run_sequentially_in_context
1112
from ..._client_rpc import RabbitMQRPCClient
1213
from ..._constants import RPC_REMOTE_METHOD_TIMEOUT_S
1314

1415
_TTL_CACHE_ON_CLUSTERS_S: Final[int] = 5
1516

17+
_GET_OR_CREATE_CLUSTER_METHOD_NAME: Final[RPCMethodName] = TypeAdapter(
18+
RPCMethodName
19+
).validate_python("get_or_create_cluster")
20+
1621

1722
@run_sequentially_in_context(target_args=["user_id", "wallet_id"])
1823
@cached(
@@ -32,7 +37,7 @@ async def get_or_create_cluster(
3237
# the 2nd decorator ensure that many calls in a short time will return quickly the same value
3338
on_demand_cluster: OnDemandCluster = await client.request(
3439
CLUSTERS_KEEPER_RPC_NAMESPACE,
35-
RPCMethodName("get_or_create_cluster"),
40+
_GET_OR_CREATE_CLUSTER_METHOD_NAME,
3641
timeout_s=RPC_REMOTE_METHOD_TIMEOUT_S,
3742
user_id=user_id,
3843
wallet_id=wallet_id,

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,12 @@ async def _process_executing_tasks(
589589
) -> None:
590590
"""process executing tasks from the 3rd party backend"""
591591

592+
@abstractmethod
593+
async def _release_resources(
594+
self, user_id: UserID, project_id: ProjectID, comp_run: CompRunsAtDB
595+
) -> None:
596+
"""release resources used by the scheduler for a given user and project"""
597+
592598
async def apply(
593599
self,
594600
*,
@@ -654,6 +660,7 @@ async def apply(
654660

655661
# 7. Are we done scheduling that pipeline?
656662
if not dag.nodes() or pipeline_result in COMPLETED_STATES:
663+
await self._release_resources(user_id, project_id, comp_run)
657664
# there is nothing left, the run is completed, we're done here
658665
_logger.info(
659666
"pipeline %s scheduling completed with result %s",

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from collections.abc import AsyncIterator, Callable
55
from contextlib import asynccontextmanager
66
from dataclasses import dataclass
7-
from typing import Any
7+
from typing import Any, Final
88

99
import arrow
1010
from dask_task_models_library.container_tasks.errors import TaskCancelledError
@@ -23,7 +23,7 @@
2323
from models_library.users import UserID
2424
from pydantic import PositiveInt
2525
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
26-
from servicelib.logging_utils import log_catch
26+
from servicelib.logging_utils import log_catch, log_context
2727

2828
from ...core.errors import (
2929
ComputationalBackendNotConnectedError,
@@ -56,13 +56,16 @@
5656

5757
_logger = logging.getLogger(__name__)
5858

59+
_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{run_id}"
60+
5961

6062
@asynccontextmanager
6163
async def _cluster_dask_client(
6264
user_id: UserID,
6365
scheduler: "DaskScheduler",
6466
*,
6567
use_on_demand_clusters: bool,
68+
run_id: PositiveInt,
6669
run_metadata: RunMetadataDict,
6770
) -> AsyncIterator[DaskClient]:
6871
cluster: BaseCluster = scheduler.settings.default_cluster
@@ -72,7 +75,9 @@ async def _cluster_dask_client(
7275
user_id=user_id,
7376
wallet_id=run_metadata.get("wallet_id"),
7477
)
75-
async with scheduler.dask_clients_pool.acquire(cluster) as client:
78+
async with scheduler.dask_clients_pool.acquire(
79+
cluster, ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=run_id)
80+
) as client:
7681
yield client
7782

7883

@@ -101,6 +106,7 @@ async def _start_tasks(
101106
user_id,
102107
self,
103108
use_on_demand_clusters=comp_run.use_on_demand_clusters,
109+
run_id=comp_run.run_id,
104110
run_metadata=comp_run.metadata,
105111
) as client:
106112
# Change the tasks state to PENDING
@@ -151,6 +157,7 @@ async def _get_tasks_status(
151157
user_id,
152158
self,
153159
use_on_demand_clusters=comp_run.use_on_demand_clusters,
160+
run_id=comp_run.run_id,
154161
run_metadata=comp_run.metadata,
155162
) as client:
156163
return await client.get_tasks_status([f"{t.job_id}" for t in tasks])
@@ -171,6 +178,7 @@ async def _process_executing_tasks(
171178
user_id,
172179
self,
173180
use_on_demand_clusters=comp_run.use_on_demand_clusters,
181+
run_id=comp_run.run_id,
174182
run_metadata=comp_run.metadata,
175183
) as client:
176184
task_progresses = await client.get_tasks_progress(
@@ -217,6 +225,22 @@ async def _process_executing_tasks(
217225
)
218226
)
219227

228+
async def _release_resources(
229+
self, user_id: UserID, project_id: ProjectID, comp_run: CompRunsAtDB
230+
) -> None:
231+
"""release resources used by the scheduler for a given user and project"""
232+
with (
233+
log_catch(_logger, reraise=False),
234+
log_context(
235+
_logger,
236+
logging.INFO,
237+
msg=f"releasing resources for {user_id=}, {project_id=}, {comp_run.run_id=}",
238+
),
239+
):
240+
await self.dask_clients_pool.release_client_ref(
241+
ref=_DASK_CLIENT_RUN_REF.format(user_id=user_id, run_id=comp_run.run_id)
242+
)
243+
220244
async def _stop_tasks(
221245
self, user_id: UserID, tasks: list[CompTaskAtDB], comp_run: CompRunsAtDB
222246
) -> None:
@@ -226,6 +250,7 @@ async def _stop_tasks(
226250
user_id,
227251
self,
228252
use_on_demand_clusters=comp_run.use_on_demand_clusters,
253+
run_id=comp_run.run_id,
229254
run_metadata=comp_run.metadata,
230255
) as client:
231256
await asyncio.gather(
@@ -259,6 +284,7 @@ async def _process_completed_tasks(
259284
user_id,
260285
self,
261286
use_on_demand_clusters=comp_run.use_on_demand_clusters,
287+
run_id=comp_run.run_id,
262288
run_metadata=comp_run.metadata,
263289
) as client:
264290
tasks_results = await asyncio.gather(
@@ -278,6 +304,7 @@ async def _process_completed_tasks(
278304
user_id,
279305
self,
280306
use_on_demand_clusters=comp_run.use_on_demand_clusters,
307+
run_id=comp_run.run_id,
281308
run_metadata=comp_run.metadata,
282309
) as client:
283310
await asyncio.gather(

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -120,43 +120,42 @@ async def create(
120120
tasks_file_link_type: FileLinkType,
121121
cluster_type: ClusterTypeInModel,
122122
) -> "DaskClient":
123-
_logger.info(
124-
"Initiating connection to %s with auth: %s, type: %s",
125-
f"dask-scheduler at {endpoint}",
126-
authentication,
127-
cluster_type,
128-
)
129-
async for attempt in AsyncRetrying(
130-
reraise=True,
131-
before_sleep=before_sleep_log(_logger, logging.INFO),
132-
wait=wait_fixed(0.3),
133-
stop=stop_after_attempt(3),
123+
with log_context(
124+
_logger,
125+
logging.INFO,
126+
msg=f"create dask client to dask-scheduler at {endpoint=} with {authentication=}, {cluster_type=}",
134127
):
135-
with attempt:
136-
_logger.debug(
137-
"Connecting to %s, attempt %s...",
138-
endpoint,
139-
attempt.retry_state.attempt_number,
140-
)
141-
backend = await connect_to_dask_scheduler(endpoint, authentication)
142-
dask_utils.check_scheduler_status(backend.client)
143-
instance = cls(
144-
app=app,
145-
backend=backend,
146-
settings=settings,
147-
tasks_file_link_type=tasks_file_link_type,
148-
cluster_type=cluster_type,
149-
)
150-
_logger.info(
151-
"Connection to %s succeeded [%s]",
152-
f"dask-scheduler at {endpoint}",
153-
json_dumps(attempt.retry_state.retry_object.statistics),
154-
)
155-
_logger.info(
156-
"Scheduler info:\n%s",
157-
json_dumps(backend.client.scheduler_info(), indent=2),
158-
)
159-
return instance
128+
async for attempt in AsyncRetrying(
129+
reraise=True,
130+
before_sleep=before_sleep_log(_logger, logging.INFO),
131+
wait=wait_fixed(0.3),
132+
stop=stop_after_attempt(3),
133+
):
134+
with attempt:
135+
_logger.debug(
136+
"Connecting to %s, attempt %s...",
137+
endpoint,
138+
attempt.retry_state.attempt_number,
139+
)
140+
backend = await connect_to_dask_scheduler(endpoint, authentication)
141+
dask_utils.check_scheduler_status(backend.client)
142+
instance = cls(
143+
app=app,
144+
backend=backend,
145+
settings=settings,
146+
tasks_file_link_type=tasks_file_link_type,
147+
cluster_type=cluster_type,
148+
)
149+
_logger.info(
150+
"Connection to %s succeeded [%s]",
151+
f"dask-scheduler at {endpoint}",
152+
json_dumps(attempt.retry_state.retry_object.statistics),
153+
)
154+
_logger.info(
155+
"Scheduler info:\n%s",
156+
json_dumps(backend.client.scheduler_info(), indent=2),
157+
)
158+
return instance
160159
# this is to satisfy pylance
161160
err_msg = "Could not create client"
162161
raise ValueError(err_msg)

0 commit comments

Comments
 (0)