| 
30 | 30 | )  | 
31 | 31 | from servicelib.utils import limited_as_completed, limited_gather  | 
32 | 32 | 
 
  | 
 | 33 | +from ..._meta import APP_NAME  | 
33 | 34 | from ...core.errors import (  | 
34 | 35 |     ComputationalBackendNotConnectedError,  | 
35 | 36 |     ComputationalBackendOnDemandNotReadyError,  | 
 | 
57 | 58 | from ..db.repositories.comp_tasks import CompTasksRepository  | 
58 | 59 | from ._constants import (  | 
59 | 60 |     MAX_CONCURRENT_PIPELINE_SCHEDULING,  | 
60 |  | -    MODULE_NAME_WORKER,  | 
61 | 61 | )  | 
62 | 62 | from ._models import TaskStateTracker  | 
63 | 63 | from ._scheduler_base import BaseCompScheduler  | 
64 | 64 | from ._utils import (  | 
65 | 65 |     WAITING_FOR_START_STATES,  | 
66 |  | -    get_redis_lock_key,  | 
67 | 66 | )  | 
68 | 67 | 
 
  | 
69 | 68 | _logger = logging.getLogger(__name__)  | 
70 | 69 | 
 
  | 
71 | 70 | _DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{project_id}:{run_id}"  | 
72 | 71 | _TASK_RETRIEVAL_ERROR_TYPE: Final[str] = "task-result-retrieval-timeout"  | 
73 | 72 | _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: Final[str] = "check_time"  | 
 | 73 | +_DASK_CLUSTER_CLIENT_SEMAPHORE_CAPACITY: Final[int] = 20  | 
74 | 74 | 
 
  | 
75 | 75 | 
 
  | 
76 |  | -def _get_redis_client_from_scheduler(scheduler: "DaskScheduler") -> RedisClientSDK:  | 
 | 76 | +def _get_redis_client_from_scheduler(  | 
 | 77 | +    user_id: UserID,  # noqa: ARG001  | 
 | 78 | +    scheduler: "DaskScheduler",  | 
 | 79 | +    **kwargs,  # noqa: ARG001  | 
 | 80 | +) -> RedisClientSDK:  | 
77 | 81 |     return scheduler.redis_client  | 
78 | 82 | 
 
  | 
79 | 83 | 
 
  | 
80 |  | -def _unique_key_builder(_app, user_id: UserID, run_metadata: RunMetadataDict) -> str:  | 
81 |  | -    return f"user_id_{user_id}-wallet_id_{run_metadata.get('wallet_id')}"  | 
 | 84 | +def _get_semaphore_cluster_redis_key(  | 
 | 85 | +    user_id: UserID,  | 
 | 86 | +    *args,  # noqa: ARG001  | 
 | 87 | +    run_metadata: RunMetadataDict,  | 
 | 88 | +    **kwargs,  # noqa: ARG001  | 
 | 89 | +) -> str:  | 
 | 90 | +    return f"{APP_NAME}-cluster-user_id_{user_id}-wallet_id_{run_metadata.get('wallet_id')}"  | 
82 | 91 | 
 
  | 
83 | 92 | 
 
  | 
84 | 93 | @asynccontextmanager  | 
85 | 94 | @with_limited_concurrency_cm(  | 
86 | 95 |     _get_redis_client_from_scheduler,  | 
87 |  | -    key=get_redis_lock_key(  | 
88 |  | -        MODULE_NAME_WORKER,  | 
89 |  | -        unique_lock_key_builder=_unique_key_builder,  | 
90 |  | -    ),  | 
91 |  | -    capacity=1,  | 
 | 96 | +    key=_get_semaphore_cluster_redis_key,  | 
 | 97 | +    capacity=_DASK_CLUSTER_CLIENT_SEMAPHORE_CAPACITY,  | 
92 | 98 |     blocking=True,  | 
93 | 99 |     blocking_timeout=None,  | 
94 | 100 | )  | 
 | 
0 commit comments