File tree Expand file tree Collapse file tree 2 files changed +17
-2
lines changed
services/director-v2/src/simcore_service_director_v2 Expand file tree Collapse file tree 2 files changed +17
-2
lines changed Original file line number Diff line number Diff line change @@ -59,6 +59,12 @@ class ComputationalBackendSettings(BaseCustomSettings):
5959 ),
6060 ] = 50
6161 COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED : bool = True
62+ COMPUTATIONAL_BACKEND_DASK_CLIENT_MAX_DISTRIBUTED_CONCURRENCY : Annotated [
63+ PositiveInt ,
64+ Field (
65+ description = "defines how many concurrent connections to each dask scheduler are allowed accross all director-v2 replicas"
66+ ),
67+ ] = 20
6268 COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL : Annotated [
6369 AnyUrl ,
6470 Field (
Original file line number Diff line number Diff line change 7070_DASK_CLIENT_RUN_REF : Final [str ] = "{user_id}:{project_id}:{run_id}"
7171_TASK_RETRIEVAL_ERROR_TYPE : Final [str ] = "task-result-retrieval-timeout"
7272_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY : Final [str ] = "check_time"
73- _DASK_CLUSTER_CLIENT_SEMAPHORE_CAPACITY : Final [int ] = 20
7473
7574
7675def _get_redis_client_from_scheduler (
@@ -90,10 +89,20 @@ def _get_semaphore_cluster_redis_key(
9089 return f"{ APP_NAME } -cluster-user_id_{ user_id } -wallet_id_{ run_metadata .get ('wallet_id' )} "
9190
9291
92+ def _get_semaphore_capacity_from_scheduler (
93+ _user_id : UserID ,
94+ scheduler : "DaskScheduler" ,
95+ ** kwargs , # pylint: disable=unused-argument # noqa: ARG001
96+ ) -> int :
97+ return (
98+ scheduler .settings .COMPUTATIONAL_BACKEND_DASK_CLIENT_MAX_DISTRIBUTED_CONCURRENCY
99+ )
100+
101+
93102@with_limited_concurrency_cm (
94103 _get_redis_client_from_scheduler ,
95104 key = _get_semaphore_cluster_redis_key ,
96- capacity = _DASK_CLUSTER_CLIENT_SEMAPHORE_CAPACITY ,
105+ capacity = _get_semaphore_capacity_from_scheduler ,
97106 blocking = True ,
98107 blocking_timeout = None ,
99108)
You can’t perform that action at this time.
0 commit comments