Skip to content

Commit 44b5f4e

Browse files
authored
(⚠️ devops) ♻️✨Adding distributed locking to throttle concurrent saves on nodes (ITISFoundation#3160)
* adds distributed locking per resource per node * saving states and outputs and pulling states and inputs are now limited per node * all options are disabled by default -authored-by: Andrei Neagu <[email protected]>
1 parent 75afd7f commit 44b5f4e

File tree

18 files changed

+829
-112
lines changed

18 files changed

+829
-112
lines changed

packages/pytest-simcore/src/pytest_simcore/redis_service.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,8 @@ async def redis_locks_client(
9393
async def wait_till_redis_responsive(redis_url: Union[URL, str]) -> None:
9494
client = from_url(f"{redis_url}", encoding="utf-8", decode_responses=True)
9595

96-
if not await client.ping():
97-
raise ConnectionError(f"{redis_url=} not available")
96+
try:
97+
if not await client.ping():
98+
raise ConnectionError(f"{redis_url=} not available")
99+
finally:
100+
await client.close(close_connection_pool=True)

services/director-v2/requirements/_base.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ httpx
2828
networkx
2929
orjson
3030
pydantic[dotenv]
31+
redis
3132
tenacity

services/director-v2/requirements/_base.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ async-timeout==4.0.2
6969
# aiohttp
7070
# aiopg
7171
# aioredis
72+
# redis
7273
attrs==20.3.0
7374
# via
7475
# -c requirements/../../../packages/service-library/requirements/././constraints.txt
@@ -109,6 +110,8 @@ dask-gateway==2022.6.1
109110
# via -r requirements/_base.in
110111
decorator==4.4.2
111112
# via networkx
113+
deprecated==1.2.13
114+
# via redis
112115
distributed==2022.6.0
113116
# via
114117
# -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
@@ -242,6 +245,7 @@ packaging==21.3
242245
# -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
243246
# dask
244247
# distributed
248+
# redis
245249
pamqp==2.3.0
246250
# via aiormq
247251
partd==1.2.0
@@ -329,6 +333,8 @@ pyyaml==5.4.1
329333
# distributed
330334
# fastapi
331335
# uvicorn
336+
redis==4.3.4
337+
# via -r requirements/_base.in
332338
requests==2.27.1
333339
# via
334340
# async-asgi-testclient
@@ -444,6 +450,8 @@ watchgod==0.8.2
444450
# via uvicorn
445451
websockets==10.1
446452
# via uvicorn
453+
wrapt==1.14.1
454+
# via deprecated
447455
yarl==1.7.2
448456
# via
449457
# -r requirements/../../../packages/postgres-database/requirements/_base.in

services/director-v2/requirements/_test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ websocket-client==0.59.0
355355
# docker-compose
356356
wrapt==1.14.1
357357
# via
358+
# -c requirements/_base.txt
358359
# aiobotocore
359360
# astroid
360361
yarl==1.7.2

services/director-v2/src/simcore_service_director_v2/core/application.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
director_v0,
2525
dynamic_services,
2626
dynamic_sidecar,
27+
node_rights,
2728
rabbitmq,
2829
remote_debug,
2930
storage,
@@ -138,6 +139,8 @@ def init_app(settings: Optional[AppSettings] = None) -> FastAPI:
138139
rabbitmq.setup(app)
139140
comp_scheduler.setup(app)
140141

142+
node_rights.setup(app)
143+
141144
if settings.DIRECTOR_V2_TRACING:
142145
setup_tracing(app, settings.DIRECTOR_V2_TRACING)
143146

services/director-v2/src/simcore_service_director_v2/core/errors.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
}
2020
"""
2121

22-
from typing import List, Optional
22+
from typing import Optional
2323

2424
from models_library.errors import ErrorDict
2525
from models_library.projects import ProjectID
@@ -104,6 +104,10 @@ class ComputationalRunNotFoundError(PydanticErrorMixin, DirectorException):
104104
msg_template = "Computational run not found"
105105

106106

107+
class NodeRightsAcquireError(PydanticErrorMixin, DirectorException):
108+
msg_template = "Could not acquire a lock for {docker_node_id} since all {slots} slots are used."
109+
110+
107111
#
108112
# SCHEDULER ERRORS
109113
#
@@ -133,7 +137,7 @@ def __init__(
133137
self.project_id = project_id
134138
self.node_id = node_id
135139

136-
def get_errors(self) -> List[ErrorDict]:
140+
def get_errors(self) -> list[ErrorDict]:
137141
# default implementation
138142
return [
139143
{
@@ -175,15 +179,15 @@ class PortsValidationError(TaskSchedulingError):
175179
ports in a project's node.
176180
"""
177181

178-
def __init__(self, project_id: ProjectID, node_id: NodeID, errors: List[ErrorDict]):
182+
def __init__(self, project_id: ProjectID, node_id: NodeID, errors: list[ErrorDict]):
179183
super().__init__(
180184
project_id,
181185
node_id,
182186
msg=f"Node with {len(errors)} ports having invalid values",
183187
)
184188
self.errors = errors
185189

186-
def get_errors(self) -> List[ErrorDict]:
190+
def get_errors(self) -> list[ErrorDict]:
187191
"""Returns 'public errors': filters only value_error.port_validation errors for the client.
188192
The rest only shown as number
189193
"""

services/director-v2/src/simcore_service_director_v2/core/settings.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from settings_library.postgres import PostgresSettings
3939
from settings_library.r_clone import RCloneSettings
4040
from settings_library.rabbit import RabbitSettings
41+
from settings_library.redis import RedisSettings
4142
from settings_library.tracing import TracingSettings
4243
from settings_library.utils_logging import MixinLoggingSettings
4344
from settings_library.utils_service import DEFAULT_FASTAPI_PORT
@@ -300,6 +301,24 @@ class DynamicSidecarSettings(BaseCustomSettings):
300301
description="exposes the proxy on localhost for debuging and testing",
301302
)
302303

304+
DYNAMIC_SIDECAR_DOCKER_NODE_RESOURCE_LIMITS_ENABLED: bool = Field(
305+
False,
306+
description=(
307+
"Limits concurrent service saves for a docker node. Guarantees "
308+
"that no more than X services use a resource together."
309+
),
310+
)
311+
DYNAMIC_SIDECAR_DOCKER_NODE_CONCURRENT_RESOURCE_SLOTS: PositiveInt = Field(
312+
2, description="Amount of slots per resource on a node"
313+
)
314+
DYNAMIC_SIDECAR_DOCKER_NODE_SAVES_LOCK_TIMEOUT_S: PositiveFloat = Field(
315+
10,
316+
description=(
317+
"Lifetime of the lock. Allows the system to recover a lock "
318+
"in case of crash, the lock will expire and result as released."
319+
),
320+
)
321+
303322
@validator("DYNAMIC_SIDECAR_MOUNT_PATH_DEV", pre=True)
304323
@classmethod
305324
def auto_disable_if_production(cls, v, values):
@@ -469,6 +488,8 @@ class AppSettings(BaseCustomSettings, MixinLoggingSettings):
469488

470489
POSTGRES: PGSettings = Field(auto_default_from_env=True)
471490

491+
REDIS: RedisSettings = Field(auto_default_from_env=True)
492+
472493
DIRECTOR_V2_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True)
473494

474495
TRAEFIK_SIMCORE_ZONE: str = Field("internal_simcore_stack")

services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,6 @@ class SchedulerData(CommonServiceDetails, DynamicSidecarServiceLabels):
352352
..., description="service resources used to enforce limits"
353353
)
354354

355-
# Below values are used only once and then are nto required, thus optional
356-
# after the service is picked up by the scheduler after a reboot these are not required
357-
# and can be set to None
358355
request_dns: Optional[str] = Field(
359356
None, description="used when configuring the CORS options on the proxy"
360357
)
@@ -364,6 +361,13 @@ class SchedulerData(CommonServiceDetails, DynamicSidecarServiceLabels):
364361
proxy_service_name: Optional[str] = Field(
365362
None, description="service name given to the proxy"
366363
)
364+
docker_node_id: Optional[str] = Field(
365+
None,
366+
description=(
367+
"contains node id of the docker node where all services "
368+
"and created containers are started"
369+
),
370+
)
367371

368372
@classmethod
369373
def from_http_request(

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/proxy.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ def get_dynamic_proxy_spec(
1818
dynamic_sidecar_network_id: str,
1919
swarm_network_id: str,
2020
swarm_network_name: str,
21-
dynamic_sidecar_node_id: str,
2221
entrypoint_container_name: str,
2322
service_port: PositiveInt,
2423
) -> Dict[str, Any]:
@@ -98,7 +97,7 @@ def get_dynamic_proxy_spec(
9897
"Placement": {
9998
"Constraints": [
10099
"node.platform.os == linux",
101-
f"node.id == {dynamic_sidecar_node_id}",
100+
f"node.id == {scheduler_data.docker_node_id}",
102101
]
103102
},
104103
"Resources": {

0 commit comments

Comments
 (0)