Skip to content

Commit 537a1c8

Browse files
committed
refactor
1 parent 73a45bc commit 537a1c8

File tree

3 files changed

+84
-76
lines changed

3 files changed

+84
-76
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/clusters_keeper/clusters.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@
66
from models_library.rabbitmq_basic_types import RPCMethodName
77
from models_library.users import UserID
88
from models_library.wallets import WalletID
9+
from pydantic import TypeAdapter
910

1011
from ....async_utils import run_sequentially_in_context
1112
from ..._client_rpc import RabbitMQRPCClient
1213
from ..._constants import RPC_REMOTE_METHOD_TIMEOUT_S
1314

1415
_TTL_CACHE_ON_CLUSTERS_S: Final[int] = 5
1516

17+
_GET_OR_CREATE_CLUSTER_METHOD_NAME: Final[RPCMethodName] = TypeAdapter(
18+
RPCMethodName
19+
).validate_python("get_or_create_cluster")
20+
1621

1722
@run_sequentially_in_context(target_args=["user_id", "wallet_id"])
1823
@cached(
@@ -32,7 +37,7 @@ async def get_or_create_cluster(
3237
# the 2nd decorator ensure that many calls in a short time will return quickly the same value
3338
on_demand_cluster: OnDemandCluster = await client.request(
3439
CLUSTERS_KEEPER_RPC_NAMESPACE,
35-
RPCMethodName("get_or_create_cluster"),
40+
_GET_OR_CREATE_CLUSTER_METHOD_NAME,
3641
timeout_s=RPC_REMOTE_METHOD_TIMEOUT_S,
3742
user_id=user_id,
3843
wallet_id=wallet_id,

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -120,43 +120,42 @@ async def create(
120120
tasks_file_link_type: FileLinkType,
121121
cluster_type: ClusterTypeInModel,
122122
) -> "DaskClient":
123-
_logger.info(
124-
"Initiating connection to %s with auth: %s, type: %s",
125-
f"dask-scheduler at {endpoint}",
126-
authentication,
127-
cluster_type,
128-
)
129-
async for attempt in AsyncRetrying(
130-
reraise=True,
131-
before_sleep=before_sleep_log(_logger, logging.INFO),
132-
wait=wait_fixed(0.3),
133-
stop=stop_after_attempt(3),
123+
with log_context(
124+
_logger,
125+
logging.INFO,
126+
msg=f"create dask client to dask-scheduler at {endpoint=} with {authentication=}, {cluster_type=}",
134127
):
135-
with attempt:
136-
_logger.debug(
137-
"Connecting to %s, attempt %s...",
138-
endpoint,
139-
attempt.retry_state.attempt_number,
140-
)
141-
backend = await connect_to_dask_scheduler(endpoint, authentication)
142-
dask_utils.check_scheduler_status(backend.client)
143-
instance = cls(
144-
app=app,
145-
backend=backend,
146-
settings=settings,
147-
tasks_file_link_type=tasks_file_link_type,
148-
cluster_type=cluster_type,
149-
)
150-
_logger.info(
151-
"Connection to %s succeeded [%s]",
152-
f"dask-scheduler at {endpoint}",
153-
json_dumps(attempt.retry_state.retry_object.statistics),
154-
)
155-
_logger.info(
156-
"Scheduler info:\n%s",
157-
json_dumps(backend.client.scheduler_info(), indent=2),
158-
)
159-
return instance
128+
async for attempt in AsyncRetrying(
129+
reraise=True,
130+
before_sleep=before_sleep_log(_logger, logging.INFO),
131+
wait=wait_fixed(0.3),
132+
stop=stop_after_attempt(3),
133+
):
134+
with attempt:
135+
_logger.debug(
136+
"Connecting to %s, attempt %s...",
137+
endpoint,
138+
attempt.retry_state.attempt_number,
139+
)
140+
backend = await connect_to_dask_scheduler(endpoint, authentication)
141+
dask_utils.check_scheduler_status(backend.client)
142+
instance = cls(
143+
app=app,
144+
backend=backend,
145+
settings=settings,
146+
tasks_file_link_type=tasks_file_link_type,
147+
cluster_type=cluster_type,
148+
)
149+
_logger.info(
150+
"Connection to %s succeeded [%s]",
151+
f"dask-scheduler at {endpoint}",
152+
json_dumps(attempt.retry_state.retry_object.statistics),
153+
)
154+
_logger.info(
155+
"Scheduler info:\n%s",
156+
json_dumps(backend.client.scheduler_info(), indent=2),
157+
)
158+
return instance
160159
# this is to satisfy pylance
161160
err_msg = "Could not create client"
162161
raise ValueError(err_msg)

services/director-v2/src/simcore_service_director_v2/modules/dask_clients_pool.py

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from fastapi import FastAPI
99
from models_library.clusters import BaseCluster, ClusterTypeInModel
1010
from pydantic import AnyUrl
11+
from servicelib.logging_utils import log_context
1112

1213
from ..core.errors import (
1314
ComputationalBackendNotConnectedError,
@@ -19,7 +20,7 @@
1920
from ..utils.dask_client_utils import TaskHandlers
2021
from .dask_client import DaskClient
2122

22-
logger = logging.getLogger(__name__)
23+
_logger = logging.getLogger(__name__)
2324

2425

2526
_ClusterUrl: TypeAlias = AnyUrl
@@ -62,48 +63,51 @@ async def delete(self) -> None:
6263

6364
@asynccontextmanager
6465
async def acquire(self, cluster: BaseCluster) -> AsyncIterator[DaskClient]:
66+
"""returns a dask client for the given cluster
67+
This method is thread-safe and can be called concurrently.
68+
If the cluster is not found in the pool, it will create a new dask client for it.
69+
70+
"""
71+
6572
async def _concurently_safe_acquire_client() -> DaskClient:
6673
async with self._client_acquisition_lock:
67-
dask_client = self._cluster_to_client_map.get(cluster.endpoint)
68-
69-
# we create a new client if that cluster was never used before
70-
logger.debug(
71-
"acquiring connection to cluster %s:%s",
72-
cluster.endpoint,
73-
cluster.name,
74-
)
75-
if not dask_client:
76-
tasks_file_link_type = (
77-
self.settings.COMPUTATIONAL_BACKEND_DEFAULT_FILE_LINK_TYPE
78-
)
79-
if cluster == self.settings.default_cluster:
74+
with log_context(
75+
_logger,
76+
logging.DEBUG,
77+
f"acquire dask client for {cluster.name=}:{cluster.endpoint}",
78+
):
79+
dask_client = self._cluster_to_client_map.get(cluster.endpoint)
80+
if not dask_client:
8081
tasks_file_link_type = (
81-
self.settings.COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_FILE_LINK_TYPE
82+
self.settings.COMPUTATIONAL_BACKEND_DEFAULT_FILE_LINK_TYPE
8283
)
83-
if cluster.type == ClusterTypeInModel.ON_DEMAND.value:
84-
tasks_file_link_type = (
85-
self.settings.COMPUTATIONAL_BACKEND_ON_DEMAND_CLUSTERS_FILE_LINK_TYPE
84+
if cluster == self.settings.default_cluster:
85+
tasks_file_link_type = (
86+
self.settings.COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_FILE_LINK_TYPE
87+
)
88+
if cluster.type == ClusterTypeInModel.ON_DEMAND.value:
89+
tasks_file_link_type = (
90+
self.settings.COMPUTATIONAL_BACKEND_ON_DEMAND_CLUSTERS_FILE_LINK_TYPE
91+
)
92+
self._cluster_to_client_map[cluster.endpoint] = dask_client = (
93+
await DaskClient.create(
94+
app=self.app,
95+
settings=self.settings,
96+
endpoint=cluster.endpoint,
97+
authentication=cluster.authentication,
98+
tasks_file_link_type=tasks_file_link_type,
99+
cluster_type=cluster.type,
100+
)
86101
)
87-
self._cluster_to_client_map[
88-
cluster.endpoint
89-
] = dask_client = await DaskClient.create(
90-
app=self.app,
91-
settings=self.settings,
92-
endpoint=cluster.endpoint,
93-
authentication=cluster.authentication,
94-
tasks_file_link_type=tasks_file_link_type,
95-
cluster_type=cluster.type,
96-
)
97-
if self._task_handlers:
98-
dask_client.register_handlers(self._task_handlers)
99-
100-
logger.debug("created new client to cluster %s", f"{cluster=}")
101-
logger.debug(
102-
"list of clients: %s", f"{self._cluster_to_client_map=}"
103-
)
104-
105-
assert dask_client # nosec
106-
return dask_client
102+
if self._task_handlers:
103+
dask_client.register_handlers(self._task_handlers)
104+
105+
_logger.debug(
106+
"list of clients: %s", f"{self._cluster_to_client_map=}"
107+
)
108+
109+
assert dask_client # nosec
110+
return dask_client
107111

108112
try:
109113
dask_client = await _concurently_safe_acquire_client()
@@ -129,7 +133,7 @@ async def on_startup() -> None:
129133
app=app, settings=settings
130134
)
131135

132-
logger.info(
136+
_logger.info(
133137
"Default cluster is set to %s",
134138
f"{settings.default_cluster!r}",
135139
)

0 commit comments

Comments
 (0)