Skip to content

Commit 7c58618

Browse files
authored
♻️ Autoscaling: one docker client (ITISFoundation#3654)
1 parent d4236d9 commit 7c58618

File tree

10 files changed

+353
-173
lines changed

10 files changed

+353
-173
lines changed

services/autoscaling/src/simcore_service_autoscaling/api/health.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from fastapi.responses import PlainTextResponse
1111
from pydantic import BaseModel
1212

13+
from ..modules.docker import get_docker_client
1314
from ..modules.rabbitmq import get_rabbitmq_client
1415
from .dependencies.application import get_app
1516

@@ -30,6 +31,7 @@ class _ComponentStatus(BaseModel):
3031
class _StatusGet(BaseModel):
3132
rabbitmq: _ComponentStatus
3233
ec2: _ComponentStatus
34+
docker: _ComponentStatus
3335

3436

3537
@router.get("/status", include_in_schema=True, response_model=_StatusGet)
@@ -48,4 +50,8 @@ async def get_status(app: FastAPI = Depends(get_app)) -> _StatusGet:
4850
if app.state.ec2_client
4951
else False,
5052
),
53+
docker=_ComponentStatus(
54+
is_enabled=bool(app.state.docker_client),
55+
is_responsive=await get_docker_client(app).ping(),
56+
),
5157
)

services/autoscaling/src/simcore_service_autoscaling/core/application.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from ..api.routes import setup_api_routes
1313
from ..dynamic_scaling import setup as setup_background_task
14+
from ..modules.docker import setup as setup_docker
1415
from ..modules.ec2 import setup as setup_ec2
1516
from ..modules.rabbitmq import setup as setup_rabbitmq
1617
from .settings import ApplicationSettings
@@ -37,6 +38,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
3738

3839
# PLUGINS SETUP
3940
setup_api_routes(app)
41+
setup_docker(app)
4042
setup_rabbitmq(app)
4143
setup_ec2(app)
4244
# autoscaler background task

services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
from pydantic import parse_obj_as
88
from types_aiobotocore_ec2.literals import InstanceTypeType
99

10-
from . import utils_docker
1110
from ._meta import VERSION
1211
from .core.errors import Ec2InstanceNotFoundError
1312
from .core.settings import ApplicationSettings
13+
from .modules.docker import get_docker_client
1414
from .modules.ec2 import get_ec2_client
15-
from .utils import ec2, rabbitmq
15+
from .utils import ec2, rabbitmq, utils_docker
1616

1717
logger = logging.getLogger(__name__)
1818

@@ -28,25 +28,28 @@ async def check_dynamic_resources(app: FastAPI) -> None:
2828
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec
2929

3030
# 1. get monitored nodes information and resources
31+
docker_client = get_docker_client(app)
3132
monitored_nodes = await utils_docker.get_monitored_nodes(
32-
node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS
33+
docker_client,
34+
node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS,
3335
)
3436

3537
cluster_total_resources = await utils_docker.compute_cluster_total_resources(
3638
monitored_nodes
3739
)
3840
logger.info("%s", f"{cluster_total_resources=}")
3941
cluster_used_resources = await utils_docker.compute_cluster_used_resources(
40-
monitored_nodes
42+
docker_client, monitored_nodes
4143
)
4244
logger.info("%s", f"{cluster_used_resources=}")
4345

4446
# 2. Remove nodes that are gone
45-
await utils_docker.remove_monitored_down_nodes(monitored_nodes)
47+
await utils_docker.remove_monitored_down_nodes(docker_client, monitored_nodes)
4648

4749
# 3. Scale up nodes if there are pending tasks
4850
pending_tasks = await utils_docker.pending_service_tasks_with_insufficient_resources(
49-
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS
51+
docker_client,
52+
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS,
5053
)
5154
await rabbitmq.post_state_message(
5255
app,
@@ -106,8 +109,11 @@ async def check_dynamic_resources(app: FastAPI) -> None:
106109
# NOTE: new_instance_dns_name is of type ip-123-23-23-3.ec2.internal and we need only the first part
107110
if match := re.match(_EC2_INTERNAL_DNS_RE, new_instance_dns_name):
108111
new_instance_dns_name = match.group(1)
109-
new_node = await utils_docker.wait_for_node(new_instance_dns_name)
112+
new_node = await utils_docker.wait_for_node(
113+
docker_client, new_instance_dns_name
114+
)
110115
await utils_docker.tag_node(
116+
docker_client,
111117
new_node,
112118
tags={
113119
tag_key: "true"
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import logging
2+
from typing import cast
3+
4+
import aiodocker
5+
from fastapi import FastAPI
6+
from tenacity._asyncio import AsyncRetrying
7+
from tenacity.before_sleep import before_sleep_log
8+
from tenacity.stop import stop_after_delay
9+
from tenacity.wait import wait_random_exponential
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class AutoscalingDocker(aiodocker.Docker):
15+
async def ping(self) -> bool:
16+
try:
17+
await self.version()
18+
return True
19+
except Exception: # pylint: disable=broad-except
20+
return False
21+
22+
23+
def setup(app: FastAPI) -> None:
24+
async def on_startup() -> None:
25+
app.state.docker_client = client = AutoscalingDocker()
26+
27+
async for attempt in AsyncRetrying(
28+
reraise=True,
29+
stop=stop_after_delay(120),
30+
wait=wait_random_exponential(max=30),
31+
before_sleep=before_sleep_log(logger, logging.WARNING),
32+
):
33+
with attempt:
34+
# this will raise if the connection is not working
35+
await client.version()
36+
37+
async def on_shutdown() -> None:
38+
if app.state.docker_client:
39+
await cast(AutoscalingDocker, app.state.docker_client).close()
40+
41+
app.add_event_handler("startup", on_startup)
42+
app.add_event_handler("shutdown", on_shutdown)
43+
44+
45+
def get_docker_client(app: FastAPI) -> AutoscalingDocker:
46+
return cast(AutoscalingDocker, app.state.docker_client)

services/autoscaling/src/simcore_service_autoscaling/utils_docker.py renamed to services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py

Lines changed: 78 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import re
99
from typing import Final
1010

11-
import aiodocker
1211
from models_library.docker import DockerLabelKey
1312
from models_library.generated_models.docker_rest_api import (
1413
Node,
@@ -24,7 +23,8 @@
2423
from tenacity.stop import stop_after_delay
2524
from tenacity.wait import wait_fixed
2625

27-
from .models import Resources
26+
from ..models import Resources
27+
from ..modules.docker import AutoscalingDocker
2828

2929
logger = logging.getLogger(__name__)
3030
_NANO_CPU: Final[float] = 10**9
@@ -40,18 +40,21 @@
4040
_TIMEOUT_WAITING_FOR_NODES_S: Final[int] = 5 * _MINUTE
4141

4242

43-
async def get_monitored_nodes(node_labels: list[DockerLabelKey]) -> list[Node]:
44-
async with aiodocker.Docker() as docker:
45-
nodes = parse_obj_as(
46-
list[Node],
47-
await docker.nodes.list(
48-
filters={"node.label": [f"{label}=true" for label in node_labels]}
49-
),
50-
)
43+
async def get_monitored_nodes(
44+
docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey]
45+
) -> list[Node]:
46+
nodes = parse_obj_as(
47+
list[Node],
48+
await docker_client.nodes.list(
49+
filters={"node.label": [f"{label}=true" for label in node_labels]}
50+
),
51+
)
5152
return nodes
5253

5354

54-
async def remove_monitored_down_nodes(nodes: list[Node]) -> list[Node]:
55+
async def remove_monitored_down_nodes(
56+
docker_client: AutoscalingDocker, nodes: list[Node]
57+
) -> list[Node]:
5558
"""removes docker nodes that are in the down state"""
5659

5760
def _check_if_node_is_removable(node: Node) -> bool:
@@ -69,15 +72,15 @@ def _check_if_node_is_removable(node: Node) -> bool:
6972
return False
7073

7174
nodes_that_need_removal = [n for n in nodes if _check_if_node_is_removable(n)]
72-
async with aiodocker.Docker() as docker:
73-
for node in nodes_that_need_removal:
74-
assert node.ID # nosec
75-
with log_context(logger, logging.INFO, msg=f"remove {node.ID=}"):
76-
await docker.nodes.remove(node_id=node.ID)
75+
for node in nodes_that_need_removal:
76+
assert node.ID # nosec
77+
with log_context(logger, logging.INFO, msg=f"remove {node.ID=}"):
78+
await docker_client.nodes.remove(node_id=node.ID)
7779
return nodes_that_need_removal
7880

7981

8082
async def pending_service_tasks_with_insufficient_resources(
83+
docker_client: AutoscalingDocker,
8184
service_labels: list[DockerLabelKey],
8285
) -> list[Task]:
8386
"""
@@ -88,16 +91,15 @@ async def pending_service_tasks_with_insufficient_resources(
8891
- have an error message with "insufficient resources"
8992
- are not scheduled on any node
9093
"""
91-
async with aiodocker.Docker() as docker:
92-
tasks = parse_obj_as(
93-
list[Task],
94-
await docker.tasks.list(
95-
filters={
96-
"desired-state": "running",
97-
"label": service_labels,
98-
}
99-
),
100-
)
94+
tasks = parse_obj_as(
95+
list[Task],
96+
await docker_client.tasks.list(
97+
filters={
98+
"desired-state": "running",
99+
"label": service_labels,
100+
}
101+
),
102+
)
101103

102104
def _is_task_waiting_for_resources(task: Task) -> bool:
103105
# NOTE: https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/
@@ -171,36 +173,38 @@ def get_max_resources_from_docker_task(task: Task) -> Resources:
171173
return Resources(cpus=0, ram=ByteSize(0))
172174

173175

174-
async def compute_node_used_resources(node: Node) -> Resources:
176+
async def compute_node_used_resources(
177+
docker_client: AutoscalingDocker,
178+
node: Node,
179+
) -> Resources:
175180
cluster_resources_counter = collections.Counter({"ram": 0, "cpus": 0})
176-
async with aiodocker.Docker() as docker:
177-
all_tasks_on_node = parse_obj_as(
178-
list[Task], await docker.tasks.list(filters={"node": node.ID})
179-
)
180-
for task in all_tasks_on_node:
181-
assert task.Status # nosec
182-
if (
183-
task.Status.State in _TASK_STATUS_WITH_ASSIGNED_RESOURCES
184-
and task.Spec
185-
and task.Spec.Resources
186-
and task.Spec.Resources.Reservations
187-
):
188-
task_reservations = task.Spec.Resources.Reservations.dict(
189-
exclude_none=True
190-
)
191-
cluster_resources_counter.update(
192-
{
193-
"ram": task_reservations.get("MemoryBytes", 0),
194-
"cpus": task_reservations.get("NanoCPUs", 0) / _NANO_CPU,
195-
}
196-
)
181+
all_tasks_on_node = parse_obj_as(
182+
list[Task], await docker_client.tasks.list(filters={"node": node.ID})
183+
)
184+
for task in all_tasks_on_node:
185+
assert task.Status # nosec
186+
if (
187+
task.Status.State in _TASK_STATUS_WITH_ASSIGNED_RESOURCES
188+
and task.Spec
189+
and task.Spec.Resources
190+
and task.Spec.Resources.Reservations
191+
):
192+
task_reservations = task.Spec.Resources.Reservations.dict(exclude_none=True)
193+
cluster_resources_counter.update(
194+
{
195+
"ram": task_reservations.get("MemoryBytes", 0),
196+
"cpus": task_reservations.get("NanoCPUs", 0) / _NANO_CPU,
197+
}
198+
)
197199
return Resources.parse_obj(dict(cluster_resources_counter))
198200

199201

200-
async def compute_cluster_used_resources(nodes: list[Node]) -> Resources:
202+
async def compute_cluster_used_resources(
203+
docker_client: AutoscalingDocker, nodes: list[Node]
204+
) -> Resources:
201205
"""Returns the total amount of resources (reservations) used on each of the given nodes"""
202206
list_of_used_resources = await logged_gather(
203-
*(compute_node_used_resources(node) for node in nodes)
207+
*(compute_node_used_resources(docker_client, node) for node in nodes)
204208
)
205209
counter = collections.Counter({k: 0 for k in Resources.__fields__.keys()})
206210
for result in list_of_used_resources:
@@ -243,32 +247,37 @@ async def get_docker_swarm_join_bash_command() -> str:
243247
before_sleep=before_sleep_log(logger, logging.WARNING),
244248
wait=wait_fixed(5),
245249
)
246-
async def wait_for_node(node_name: str) -> Node:
247-
async with aiodocker.Docker() as docker:
248-
list_of_nodes = await docker.nodes.list(filters={"name": node_name})
250+
async def wait_for_node(
251+
docker_client: AutoscalingDocker,
252+
node_name: str,
253+
) -> Node:
254+
list_of_nodes = await docker_client.nodes.list(filters={"name": node_name})
249255
if not list_of_nodes:
250256
raise TryAgain
251257
return parse_obj_as(Node, list_of_nodes[0])
252258

253259

254260
async def tag_node(
255-
node: Node, *, tags: dict[DockerLabelKey, str], available: bool
261+
docker_client: AutoscalingDocker,
262+
node: Node,
263+
*,
264+
tags: dict[DockerLabelKey, str],
265+
available: bool,
256266
) -> None:
257267
with log_context(
258268
logger, logging.DEBUG, msg=f"tagging {node.ID=} with {tags=} and {available=}"
259269
):
260-
async with aiodocker.Docker() as docker:
261-
assert node.ID # nosec
262-
assert node.Version # nosec
263-
assert node.Version.Index # nosec
264-
assert node.Spec # nosec
265-
assert node.Spec.Role # nosec
266-
await docker.nodes.update(
267-
node_id=node.ID,
268-
version=node.Version.Index,
269-
spec={
270-
"Availability": "active" if available else "drain",
271-
"Labels": tags,
272-
"Role": node.Spec.Role.value,
273-
},
274-
)
270+
assert node.ID # nosec
271+
assert node.Version # nosec
272+
assert node.Version.Index # nosec
273+
assert node.Spec # nosec
274+
assert node.Spec.Role # nosec
275+
await docker_client.nodes.update(
276+
node_id=node.ID,
277+
version=node.Version.Index,
278+
spec={
279+
"Availability": "active" if available else "drain",
280+
"Labels": tags,
281+
"Role": node.Spec.Role.value,
282+
},
283+
)

services/autoscaling/tests/unit/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Mapping,
1717
Optional,
1818
Union,
19+
cast,
1920
)
2021

2122
import aiodocker
@@ -39,6 +40,7 @@
3940
from simcore_service_autoscaling.core.application import create_app
4041
from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings
4142
from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys
43+
from simcore_service_autoscaling.modules.docker import AutoscalingDocker
4244
from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2
4345
from tenacity import retry
4446
from tenacity._asyncio import AsyncRetrying
@@ -165,6 +167,12 @@ async def async_client(initialized_app: FastAPI) -> AsyncIterator[httpx.AsyncCli
165167
yield client
166168

167169

170+
@pytest.fixture
171+
async def autoscaling_docker() -> AsyncIterator[AutoscalingDocker]:
172+
async with AutoscalingDocker() as docker_client:
173+
yield cast(AutoscalingDocker, docker_client)
174+
175+
168176
@pytest.fixture
169177
async def async_docker_client() -> AsyncIterator[aiodocker.Docker]:
170178
async with aiodocker.Docker() as docker_client:

0 commit comments

Comments
 (0)