Skip to content

Commit 513a157

Browse files
committed
some more
1 parent 71137e7 commit 513a157

File tree

2 files changed

+21
-23
lines changed

2 files changed

+21
-23
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2626
from servicelib.logging_errors import create_troubleshootting_log_kwargs
2727
from servicelib.logging_utils import log_catch, log_context
28-
from servicelib.redis import with_limited_concurrency
2928
from servicelib.utils import limited_as_completed, limited_gather
3029

3130
from ...core.errors import (
@@ -55,14 +54,11 @@
5554
from ..db.repositories.comp_tasks import CompTasksRepository
5655
from ._constants import (
5756
MAX_CONCURRENT_PIPELINE_SCHEDULING,
58-
MODULE_NAME_WORKER,
5957
)
6058
from ._models import TaskStateTracker
6159
from ._scheduler_base import BaseCompScheduler
6260
from ._utils import (
6361
WAITING_FOR_START_STATES,
64-
get_redis_client_from_app,
65-
get_redis_lock_key,
6662
)
6763

6864
_logger = logging.getLogger(__name__)
@@ -97,26 +93,12 @@ async def _cluster_dask_client(
9793
wallet_id=run_metadata.get("wallet_id"),
9894
)
9995

100-
@asynccontextmanager
101-
@with_limited_concurrency(
102-
get_redis_client_from_app,
103-
key=get_redis_lock_key(
104-
MODULE_NAME_WORKER, unique_lock_key_builder=create_cluster_client_lock_key
96+
async with scheduler.dask_clients_pool.acquire(
97+
cluster,
98+
ref=_DASK_CLIENT_RUN_REF.format(
99+
user_id=user_id, project_id=project_id, run_id=run_id
105100
),
106-
capacity=_DASK_SCHEDULER_MAX_CONCURRENT_ACCESS,
107-
blocking=True,
108-
blocking_timeout=None,
109-
)
110-
async def _limited_client_pool() -> AsyncIterator[DaskClient]:
111-
async with scheduler.dask_clients_pool.acquire(
112-
cluster,
113-
ref=_DASK_CLIENT_RUN_REF.format(
114-
user_id=user_id, project_id=project_id, run_id=run_id
115-
),
116-
) as client:
117-
yield client
118-
119-
async with _limited_client_pool() as client:
101+
) as client:
120102
yield client
121103

122104

services/director-v2/src/simcore_service_director_v2/modules/dask_clients_pool.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010
from models_library.clusters import BaseCluster, ClusterTypeInModel
1111
from pydantic import AnyUrl
1212
from servicelib.logging_utils import log_context
13+
from servicelib.redis._semaphore_decorator import with_limited_concurrency
14+
from settings_library.redis import RedisDatabase
15+
from simcore_service_director_v2.modules.comp_scheduler._utils import (
16+
get_redis_lock_key,
17+
)
18+
from simcore_service_director_v2.modules.redis import get_redis_client_manager
1319

1420
from ..core.errors import (
1521
ComputationalBackendNotConnectedError,
@@ -114,6 +120,16 @@ async def acquire(
114120
`release_client_ref` to release the client reference when done.
115121
"""
116122

123+
@with_limited_concurrency(
124+
get_redis_client_manager(self.app).client(RedisDatabase.LOCKS),
125+
key=get_redis_lock_key(
126+
"dask-clients-pool",
127+
unique_lock_key_builder=lambda: f"{cluster.name}-{cluster.endpoint}",
128+
),
129+
capacity=20,
130+
blocking=True,
131+
blocking_timeout=None,
132+
)
117133
async def _concurently_safe_acquire_client() -> DaskClient:
118134
async with self._client_acquisition_lock:
119135
with log_context(

0 commit comments

Comments
 (0)