Skip to content

Commit 4058ff7

Browse files
committed
protect the acquisition of the client
1 parent 0abee9b commit 4058ff7

File tree

1 file changed

+21
-41
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+21
-41
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from pydantic import PositiveInt
2525
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2626
from servicelib.logging_utils import log_catch, log_context
27-
from servicelib.redis._client import RedisClientSDK
2827
from servicelib.redis._semaphore_decorator import (
2928
with_limited_concurrency_cm,
3029
)
@@ -72,40 +71,6 @@
7271
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: Final[str] = "check_time"
7372

7473

75-
def _get_redis_client_from_scheduler(
76-
_user_id: UserID,
77-
scheduler: "DaskScheduler",
78-
**kwargs, # pylint: disable=unused-argument # noqa: ARG001
79-
) -> RedisClientSDK:
80-
return scheduler.redis_client
81-
82-
83-
def _get_semaphore_cluster_redis_key(
84-
user_id: UserID,
85-
*args, # pylint: disable=unused-argument # noqa: ARG001
86-
run_metadata: RunMetadataDict,
87-
**kwargs, # pylint: disable=unused-argument # noqa: ARG001
88-
) -> str:
89-
return f"{APP_NAME}-cluster-user_id_{user_id}-wallet_id_{run_metadata.get('wallet_id')}"
90-
91-
92-
def _get_semaphore_capacity_from_scheduler(
93-
_user_id: UserID,
94-
scheduler: "DaskScheduler",
95-
**kwargs, # pylint: disable=unused-argument # noqa: ARG001
96-
) -> int:
97-
return (
98-
scheduler.settings.COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS
99-
)
100-
101-
102-
@with_limited_concurrency_cm(
103-
_get_redis_client_from_scheduler,
104-
key=_get_semaphore_cluster_redis_key,
105-
capacity=_get_semaphore_capacity_from_scheduler,
106-
blocking=True,
107-
blocking_timeout=None,
108-
)
10974
@asynccontextmanager
11075
async def _cluster_dask_client(
11176
user_id: UserID,
@@ -123,12 +88,27 @@ async def _cluster_dask_client(
12388
user_id=user_id,
12489
wallet_id=run_metadata.get("wallet_id"),
12590
)
126-
async with scheduler.dask_clients_pool.acquire(
127-
cluster,
128-
ref=_DASK_CLIENT_RUN_REF.format(
129-
user_id=user_id, project_id=project_id, run_id=run_id
130-
),
131-
) as client:
91+
92+
@with_limited_concurrency_cm(
93+
scheduler.redis_client,
94+
key=f"{APP_NAME}-cluster-user_id_{user_id}-wallet_id_{run_metadata.get('wallet_id')}",
95+
capacity=scheduler.settings.COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS,
96+
blocking=True,
97+
blocking_timeout=None,
98+
)
99+
@asynccontextmanager
100+
async def _acquire_client(
101+
user_id: UserID, scheduler: "DaskScheduler"
102+
) -> AsyncIterator[DaskClient]:
103+
async with scheduler.dask_clients_pool.acquire(
104+
cluster,
105+
ref=_DASK_CLIENT_RUN_REF.format(
106+
user_id=user_id, project_id=project_id, run_id=run_id
107+
),
108+
) as client:
109+
yield client
110+
111+
async with _acquire_client(user_id, scheduler) as client:
132112
yield client
133113

134114

0 commit comments

Comments
 (0)