|
30 | 30 | ) |
31 | 31 | from servicelib.utils import limited_as_completed, limited_gather |
32 | 32 |
|
| 33 | +from ..._meta import APP_NAME |
33 | 34 | from ...core.errors import ( |
34 | 35 | ComputationalBackendNotConnectedError, |
35 | 36 | ComputationalBackendOnDemandNotReadyError, |
|
57 | 58 | from ..db.repositories.comp_tasks import CompTasksRepository |
58 | 59 | from ._constants import ( |
59 | 60 | MAX_CONCURRENT_PIPELINE_SCHEDULING, |
60 | | - MODULE_NAME_WORKER, |
61 | 61 | ) |
62 | 62 | from ._models import TaskStateTracker |
63 | 63 | from ._scheduler_base import BaseCompScheduler |
64 | 64 | from ._utils import ( |
65 | 65 | WAITING_FOR_START_STATES, |
66 | | - get_redis_lock_key, |
67 | 66 | ) |
68 | 67 |
|
69 | 68 | _logger = logging.getLogger(__name__) |
70 | 69 |
|
71 | 70 | _DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{project_id}:{run_id}" |
72 | 71 | _TASK_RETRIEVAL_ERROR_TYPE: Final[str] = "task-result-retrieval-timeout" |
73 | 72 | _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: Final[str] = "check_time" |
| 73 | +_DASK_CLUSTER_CLIENT_SEMAPHORE_CAPACITY: Final[int] = 20 |
74 | 74 |
|
75 | 75 |
|
76 | | -def _get_redis_client_from_scheduler(scheduler: "DaskScheduler") -> RedisClientSDK: |
| 76 | +def _get_redis_client_from_scheduler( |
| 77 | + user_id: UserID, # noqa: ARG001 |
| 78 | + scheduler: "DaskScheduler", |
| 79 | + **kwargs, # noqa: ARG001 |
| 80 | +) -> RedisClientSDK: |
77 | 81 | return scheduler.redis_client |
78 | 82 |
|
79 | 83 |
|
80 | | -def _unique_key_builder(_app, user_id: UserID, run_metadata: RunMetadataDict) -> str: |
81 | | - return f"user_id_{user_id}-wallet_id_{run_metadata.get('wallet_id')}" |
| 84 | +def _get_semaphore_cluster_redis_key( |
| 85 | + user_id: UserID, |
| 86 | + *args, # noqa: ARG001 |
| 87 | + run_metadata: RunMetadataDict, |
| 88 | + **kwargs, # noqa: ARG001 |
| 89 | +) -> str: |
| 90 | + return f"{APP_NAME}-cluster-user_id_{user_id}-wallet_id_{run_metadata.get('wallet_id')}" |
82 | 91 |
|
83 | 92 |
|
84 | 93 | @asynccontextmanager |
85 | 94 | @with_limited_concurrency_cm( |
86 | 95 | _get_redis_client_from_scheduler, |
87 | | - key=get_redis_lock_key( |
88 | | - MODULE_NAME_WORKER, |
89 | | - unique_lock_key_builder=_unique_key_builder, |
90 | | - ), |
91 | | - capacity=1, |
| 96 | + key=_get_semaphore_cluster_redis_key, |
| 97 | + capacity=_DASK_CLUSTER_CLIENT_SEMAPHORE_CAPACITY, |
92 | 98 | blocking=True, |
93 | 99 | blocking_timeout=None, |
94 | 100 | ) |
|
0 commit comments