2323from pydantic import PositiveInt
2424from servicelib .common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2525from servicelib .logging_utils import log_catch , log_context
26- from servicelib .redis ._client import RedisClientSDK
2726from servicelib .redis ._semaphore_decorator import (
2827 with_limited_concurrency_cm ,
2928)
7170_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY : Final [str ] = "check_time"
7271
7372
74- def _get_redis_client_from_scheduler (
75- _user_id : UserID ,
76- scheduler : "DaskScheduler" ,
77- ** kwargs , # pylint: disable=unused-argument # noqa: ARG001
78- ) -> RedisClientSDK :
79- return scheduler .redis_client
80-
81-
82- def _get_semaphore_cluster_redis_key (
83- user_id : UserID ,
84- * args , # pylint: disable=unused-argument # noqa: ARG001
85- run_metadata : RunMetadataDict ,
86- ** kwargs , # pylint: disable=unused-argument # noqa: ARG001
87- ) -> str :
88- return f"{ APP_NAME } -cluster-user_id_{ user_id } -wallet_id_{ run_metadata .get ('wallet_id' )} "
89-
90-
91- def _get_semaphore_capacity_from_scheduler (
92- _user_id : UserID ,
93- scheduler : "DaskScheduler" ,
94- ** kwargs , # pylint: disable=unused-argument # noqa: ARG001
95- ) -> int :
96- return (
97- scheduler .settings .COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS
98- )
99-
100-
101- @with_limited_concurrency_cm (
102- _get_redis_client_from_scheduler ,
103- key = _get_semaphore_cluster_redis_key ,
104- capacity = _get_semaphore_capacity_from_scheduler ,
105- blocking = True ,
106- blocking_timeout = None ,
107- )
10873@asynccontextmanager
10974async def _cluster_dask_client (
11075 user_id : UserID ,
@@ -122,12 +87,27 @@ async def _cluster_dask_client(
12287 user_id = user_id ,
12388 wallet_id = run_metadata .get ("wallet_id" ),
12489 )
125- async with scheduler .dask_clients_pool .acquire (
126- cluster ,
127- ref = _DASK_CLIENT_RUN_REF .format (
128- user_id = user_id , project_id = project_id , run_id = run_id
129- ),
130- ) as client :
90+
91+ @with_limited_concurrency_cm (
92+ scheduler .redis_client ,
93+ key = f"{ APP_NAME } -cluster-user_id_{ user_id } -wallet_id_{ run_metadata .get ('wallet_id' )} " ,
94+ capacity = scheduler .settings .COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS ,
95+ blocking = True ,
96+ blocking_timeout = None ,
97+ )
98+ @asynccontextmanager
99+ async def _acquire_client (
100+ user_id : UserID , scheduler : "DaskScheduler"
101+ ) -> AsyncIterator [DaskClient ]:
102+ async with scheduler .dask_clients_pool .acquire (
103+ cluster ,
104+ ref = _DASK_CLIENT_RUN_REF .format (
105+ user_id = user_id , project_id = project_id , run_id = run_id
106+ ),
107+ ) as client :
108+ yield client
109+
110+ async with _acquire_client (user_id , scheduler ) as client :
131111 yield client
132112
133113
0 commit comments