File tree Expand file tree Collapse file tree 1 file changed +6
-3
lines changed
services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Original file line number Diff line number Diff line change 7070_DASK_CLIENT_RUN_REF : Final [str ] = "{user_id}:{project_id}:{run_id}"
7171_TASK_RETRIEVAL_ERROR_TYPE : Final [str ] = "task-result-retrieval-timeout"
7272_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY : Final [str ] = "check_time"
73+ _DASK_SCHEDULER_MAX_CONCURRENT_ACCESS : Final [int ] = 50
7374
7475
7576def create_cluster_client_lock_key (
7677 _app , user_id : UserID , wallet_id : WalletID | None
7778) -> str :
78- return f"cluster-client-{ user_id } -{ wallet_id or 'None' } "
79+ return f"cluster-client-user_id { user_id } -wallet_id { wallet_id or 'None' } "
7980
8081
8182@asynccontextmanager
@@ -96,12 +97,13 @@ async def _cluster_dask_client(
9697 wallet_id = run_metadata .get ("wallet_id" ),
9798 )
9899
100+ @asynccontextmanager
99101 @with_limited_concurrency (
100102 get_redis_client_from_app ,
101103 key = get_redis_lock_key (
102104 MODULE_NAME_WORKER , unique_lock_key_builder = create_cluster_client_lock_key
103105 ),
104- capacity = 20 ,
106+ capacity = _DASK_SCHEDULER_MAX_CONCURRENT_ACCESS ,
105107 blocking = True ,
106108 blocking_timeout = None ,
107109 )
@@ -114,7 +116,8 @@ async def _limited_client_pool() -> AsyncIterator[DaskClient]:
114116 ) as client :
115117 yield client
116118
117- return await _limited_client_pool ()
119+ async with _limited_client_pool () as client :
120+ yield client
118121
119122
120123@dataclass
You can’t perform that action at this time.
0 commit comments