2323from  pydantic  import  PositiveInt 
2424from  servicelib .common_headers  import  UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE 
2525from  servicelib .logging_utils  import  log_catch , log_context 
26- from  servicelib .redis ._client  import  RedisClientSDK 
2726from  servicelib .redis ._semaphore_decorator  import  (
2827    with_limited_concurrency_cm ,
2928)
7170_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY : Final [str ] =  "check_time" 
7271
7372
74- def  _get_redis_client_from_scheduler (
75-     _user_id : UserID ,
76-     scheduler : "DaskScheduler" ,
77-     ** kwargs ,  # pylint: disable=unused-argument # noqa: ARG001 
78- ) ->  RedisClientSDK :
79-     return  scheduler .redis_client 
80- 
81- 
82- def  _get_semaphore_cluster_redis_key (
83-     user_id : UserID ,
84-     * args ,  # pylint: disable=unused-argument # noqa: ARG001 
85-     run_metadata : RunMetadataDict ,
86-     ** kwargs ,  # pylint: disable=unused-argument # noqa: ARG001 
87- ) ->  str :
88-     return  f"{ APP_NAME }  -cluster-user_id_{ user_id }  -wallet_id_{ run_metadata .get ('wallet_id' )}  " 
89- 
90- 
91- def  _get_semaphore_capacity_from_scheduler (
92-     _user_id : UserID ,
93-     scheduler : "DaskScheduler" ,
94-     ** kwargs ,  # pylint: disable=unused-argument # noqa: ARG001 
95- ) ->  int :
96-     return  (
97-         scheduler .settings .COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS 
98-     )
99- 
100- 
101- @with_limited_concurrency_cm ( 
102-     _get_redis_client_from_scheduler , 
103-     key = _get_semaphore_cluster_redis_key , 
104-     capacity = _get_semaphore_capacity_from_scheduler , 
105-     blocking = True , 
106-     blocking_timeout = None , 
107- ) 
10873@asynccontextmanager  
10974async  def  _cluster_dask_client (
11075    user_id : UserID ,
@@ -122,12 +87,27 @@ async def _cluster_dask_client(
12287            user_id = user_id ,
12388            wallet_id = run_metadata .get ("wallet_id" ),
12489        )
125-     async  with  scheduler .dask_clients_pool .acquire (
126-         cluster ,
127-         ref = _DASK_CLIENT_RUN_REF .format (
128-             user_id = user_id , project_id = project_id , run_id = run_id 
129-         ),
130-     ) as  client :
90+ 
91+     @with_limited_concurrency_cm ( 
92+         scheduler .redis_client , 
93+         key = f"{ APP_NAME }  -cluster-user_id_{ user_id }  -wallet_id_{ run_metadata .get ('wallet_id' )}  " , 
94+         capacity = scheduler .settings .COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS , 
95+         blocking = True , 
96+         blocking_timeout = None , 
97+     ) 
98+     @asynccontextmanager  
99+     async  def  _acquire_client (
100+         user_id : UserID , scheduler : "DaskScheduler" 
101+     ) ->  AsyncIterator [DaskClient ]:
102+         async  with  scheduler .dask_clients_pool .acquire (
103+             cluster ,
104+             ref = _DASK_CLIENT_RUN_REF .format (
105+                 user_id = user_id , project_id = project_id , run_id = run_id 
106+             ),
107+         ) as  client :
108+             yield  client 
109+ 
110+     async  with  _acquire_client (user_id , scheduler ) as  client :
131111        yield  client 
132112
133113
0 commit comments