Skip to content

Commit 9457419

Browse files
authored
✨ Autoscaling: scale down nodes (ITISFoundation#3655)
1 parent 332cbb4 commit 9457419

File tree

14 files changed

+1245
-403
lines changed

14 files changed

+1245
-403
lines changed

services/autoscaling/src/simcore_service_autoscaling/core/errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class Ec2NotConnectedError(AutoscalingRuntimeError):
1414

1515

1616
class Ec2InstanceNotFoundError(AutoscalingRuntimeError):
17-
msg_template: str = "Needed instance was not found"
17+
msg_template: str = "EC2 instance was not found"
1818

1919

2020
class Ec2TooManyInstancesError(AutoscalingRuntimeError):

services/autoscaling/src/simcore_service_autoscaling/core/settings.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class EC2InstancesSettings(BaseCustomSettings):
4040
description="Defines the AMI (Amazon Machine Image) ID used to start a new EC2 instance",
4141
)
4242
EC2_INSTANCES_MAX_INSTANCES: int = Field(
43-
10,
43+
default=10,
4444
description="Defines the maximum number of instances the autoscaling app may create",
4545
)
4646
EC2_INSTANCES_SECURITY_GROUP_IDS: list[str] = Field(
@@ -65,6 +65,20 @@ class EC2InstancesSettings(BaseCustomSettings):
6565
"this is required to start a new EC2 instance",
6666
)
6767

68+
EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field(
69+
default=datetime.timedelta(minutes=55),
70+
description="Time after which an EC2 instance may be terminated (repeat every hour, min 0, max 59 minutes)",
71+
)
72+
73+
@validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION")
74+
@classmethod
75+
def ensure_time_is_in_range(cls, value):
76+
if value < datetime.timedelta(minutes=0):
77+
value = datetime.timedelta(minutes=0)
78+
elif value > datetime.timedelta(minutes=59):
79+
value = datetime.timedelta(minutes=59)
80+
return value
81+
6882
@validator("EC2_INSTANCES_ALLOWED_TYPES")
6983
@classmethod
7084
def check_valid_intance_names(cls, value):
Lines changed: 223 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,203 @@
1+
import asyncio
12
import json
23
import logging
34
import re
4-
from datetime import datetime
5+
from datetime import datetime, timedelta, timezone
56

67
from fastapi import FastAPI
8+
from models_library.generated_models.docker_rest_api import Availability, Node, Task
79
from pydantic import parse_obj_as
810
from types_aiobotocore_ec2.literals import InstanceTypeType
911

1012
from ._meta import VERSION
1113
from .core.errors import Ec2InstanceNotFoundError
1214
from .core.settings import ApplicationSettings
15+
from .models import Resources
1316
from .modules.docker import get_docker_client
14-
from .modules.ec2 import get_ec2_client
17+
from .modules.ec2 import EC2InstanceData, get_ec2_client
1518
from .utils import ec2, rabbitmq, utils_docker
1619

1720
logger = logging.getLogger(__name__)
1821

1922
_EC2_INTERNAL_DNS_RE: re.Pattern = re.compile(r"^(?P<ip>ip-[0-9-]+).+$")
2023

2124

22-
async def check_dynamic_resources(app: FastAPI) -> None:
23-
"""Check that there are no pending tasks requiring additional resources in the cluster (docker swarm)
24-
If there are such tasks, this method will allocate new machines in AWS to cope with
25-
the additional load.
26-
"""
25+
async def _mark_empty_active_nodes_to_drain(
26+
app: FastAPI,
27+
monitored_nodes: list[Node],
28+
) -> None:
2729
app_settings: ApplicationSettings = app.state.settings
2830
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec
29-
30-
# 1. get monitored nodes information and resources
3131
docker_client = get_docker_client(app)
32-
monitored_nodes = await utils_docker.get_monitored_nodes(
33-
docker_client,
34-
node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS,
32+
active_empty_nodes = [
33+
node
34+
for node in monitored_nodes
35+
if (
36+
await utils_docker.compute_node_used_resources(
37+
docker_client,
38+
node,
39+
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS,
40+
)
41+
== Resources.create_as_empty()
42+
)
43+
and (node.Spec is not None)
44+
and (node.Spec.Availability == Availability.active)
45+
]
46+
await asyncio.gather(
47+
*(
48+
utils_docker.set_node_availability(
49+
docker_client,
50+
node,
51+
available=False,
52+
)
53+
for node in active_empty_nodes
54+
if (node.Spec) and (node.Spec.Labels is not None)
55+
)
3556
)
57+
if active_empty_nodes:
58+
logger.info(
59+
"The following nodes set to drain: '%s'",
60+
f"{[node.Description.Hostname for node in active_empty_nodes if node.Description]}",
61+
)
3662

37-
cluster_total_resources = await utils_docker.compute_cluster_total_resources(
38-
monitored_nodes
39-
)
40-
logger.info("%s", f"{cluster_total_resources=}")
41-
cluster_used_resources = await utils_docker.compute_cluster_used_resources(
42-
docker_client, monitored_nodes
43-
)
44-
logger.info("%s", f"{cluster_used_resources=}")
4563

46-
# 2. Remove nodes that are gone
47-
await utils_docker.remove_monitored_down_nodes(docker_client, monitored_nodes)
64+
async def _find_terminateable_nodes(
65+
app: FastAPI, monitored_nodes: list[Node]
66+
) -> list[tuple[Node, EC2InstanceData]]:
67+
app_settings: ApplicationSettings = app.state.settings
68+
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec
69+
docker_client = get_docker_client(app)
70+
71+
# NOTE: we want the drained nodes where no monitored service is running anymore
72+
drained_empty_nodes = [
73+
node
74+
for node in monitored_nodes
75+
if (
76+
await utils_docker.compute_node_used_resources(
77+
docker_client,
78+
node,
79+
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS,
80+
)
81+
== Resources.create_as_empty()
82+
)
83+
and (node.Spec is not None)
84+
and (node.Spec.Availability == Availability.drain)
85+
]
86+
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
87+
if not drained_empty_nodes:
88+
# there is nothing to terminate here
89+
return []
4890

49-
# 3. Scale up nodes if there are pending tasks
50-
pending_tasks = await utils_docker.pending_service_tasks_with_insufficient_resources(
51-
docker_client,
52-
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS,
53-
)
54-
await rabbitmq.post_state_message(
55-
app,
56-
monitored_nodes,
57-
cluster_total_resources,
58-
cluster_used_resources,
59-
pending_tasks,
91+
# get the corresponding ec2 instance data
92+
# NOTE: some might be in the process of terminating and will not be found
93+
ec2_client = get_ec2_client(app)
94+
drained_empty_ec2_instances = await asyncio.gather(
95+
*(
96+
ec2_client.get_running_instance(
97+
app_settings.AUTOSCALING_EC2_INSTANCES,
98+
tag_keys=[
99+
"io.simcore.autoscaling.version",
100+
],
101+
instance_host_name=node.Description.Hostname,
102+
)
103+
for node in drained_empty_nodes
104+
if node.Description and node.Description.Hostname
105+
),
106+
return_exceptions=True,
60107
)
61108

109+
terminateable_nodes: list[tuple[Node, EC2InstanceData]] = []
110+
for node, ec2_instance_data in zip(
111+
drained_empty_nodes, drained_empty_ec2_instances
112+
):
113+
if isinstance(ec2_instance_data, Ec2InstanceNotFoundError):
114+
# skip if already terminating
115+
continue
116+
# NOTE: AWS price is hourly based (e.g. same price for a machine used 2 minutes or 1 hour, so we wait until 55 minutes)
117+
elapsed_time_since_launched = (
118+
datetime.utcnow().replace(tzinfo=timezone.utc)
119+
- ec2_instance_data.launch_time
120+
)
121+
elapsed_time_since_full_hour = elapsed_time_since_launched % timedelta(hours=1)
122+
if (
123+
elapsed_time_since_full_hour
124+
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
125+
):
126+
# let's terminate that one
127+
terminateable_nodes.append((node, ec2_instance_data))
128+
if terminateable_nodes:
129+
logger.info(
130+
"the following nodes were found to be terminateable: '%s'",
131+
f"{[node.Description.Hostname for node,_ in terminateable_nodes if node.Description]}",
132+
)
133+
return terminateable_nodes
134+
135+
136+
async def _try_scale_down_cluster(app: FastAPI, monitored_nodes: list[Node]) -> None:
137+
# 2. once it is in draining mode and we are nearing a modulo of an hour we can start the termination procedure
138+
# NOTE: the nodes that were just changed to drain above will be eventually terminated on the next iteration
139+
if terminateable_nodes := await _find_terminateable_nodes(app, monitored_nodes):
140+
await asyncio.gather(
141+
*(
142+
get_ec2_client(app).terminate_instance(ec2_instance_data)
143+
for _, ec2_instance_data in terminateable_nodes
144+
)
145+
)
146+
logger.info(
147+
"terminated the following machines: '%s'",
148+
f"{[node.Description.Hostname for node,_ in terminateable_nodes if node.Description]}",
149+
)
150+
# since these nodes are being terminated, remove them from the swarm
151+
await utils_docker.remove_nodes(
152+
get_docker_client(app),
153+
[node for node, _ in terminateable_nodes],
154+
force=True,
155+
)
156+
157+
# 3. we could ask on rabbit whether someone would like to keep that machine for something (like the agent for example), if that is the case, we wait another hour and ask again?
158+
# 4.
159+
160+
161+
async def _try_scale_up_with_drained_nodes(
162+
app: FastAPI,
163+
monitored_nodes: list[Node],
164+
pending_tasks: list[Task],
165+
) -> bool:
166+
docker_client = get_docker_client(app)
62167
if not pending_tasks:
63-
logger.debug("no pending tasks with insufficient resources at the moment")
64-
return
168+
return True
169+
for task in pending_tasks:
170+
# NOTE: currently we go one by one and break, next iteration
171+
# will take care of next tasks if there are any
172+
173+
# check if there is some node with enough resources
174+
for node in monitored_nodes:
175+
assert node.Spec # nosec
176+
assert node.Description # nosec
177+
if (node.Spec.Availability == Availability.drain) and (
178+
utils_docker.get_node_total_resources(node)
179+
>= utils_docker.get_max_resources_from_docker_task(task)
180+
):
181+
# let's make that node available again
182+
await utils_docker.set_node_availability(
183+
docker_client, node, available=True
184+
)
185+
logger.info(
186+
"Activated former drained node '%s'", node.Description.Hostname
187+
)
188+
await rabbitmq.post_log_message(
189+
app,
190+
task,
191+
"cluster was scaled up and is now ready to run service",
192+
logging.INFO,
193+
)
194+
return True
195+
logger.info("There are no available drained node for the pending tasks")
196+
return False
197+
65198

199+
async def _scale_up_cluster(app: FastAPI, pending_tasks: list[Task]) -> None:
200+
app_settings: ApplicationSettings = app.state.settings
66201
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
67202
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
68203
ec2_client = get_ec2_client(app)
@@ -88,13 +223,12 @@ async def check_dynamic_resources(app: FastAPI) -> None:
88223
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec
89224

90225
logger.debug("%s", f"{ec2_instances_needed[0]=}")
91-
new_instance_dns_name = await ec2_client.start_aws_instance(
226+
new_instance_data = await ec2_client.start_aws_instance(
92227
app_settings.AUTOSCALING_EC2_INSTANCES,
93228
instance_type=parse_obj_as(
94229
InstanceTypeType, ec2_instances_needed[0].name
95230
),
96231
tags={
97-
"io.simcore.autoscaling.created": f"{datetime.utcnow()}",
98232
"io.simcore.autoscaling.version": f"{VERSION}",
99233
"io.simcore.autoscaling.monitored_nodes_labels": json.dumps(
100234
app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS
@@ -107,13 +241,15 @@ async def check_dynamic_resources(app: FastAPI) -> None:
107241
)
108242

109243
# NOTE: new_instance_dns_name is of type ip-123-23-23-3.ec2.internal and we need only the first part
110-
if match := re.match(_EC2_INTERNAL_DNS_RE, new_instance_dns_name):
244+
if match := re.match(
245+
_EC2_INTERNAL_DNS_RE, new_instance_data.aws_private_dns
246+
):
111247
new_instance_dns_name = match.group(1)
112248
new_node = await utils_docker.wait_for_node(
113-
docker_client, new_instance_dns_name
249+
get_docker_client(app), new_instance_dns_name
114250
)
115251
await utils_docker.tag_node(
116-
docker_client,
252+
get_docker_client(app),
117253
new_node,
118254
tags={
119255
tag_key: "true"
@@ -138,5 +274,51 @@ async def check_dynamic_resources(app: FastAPI) -> None:
138274
logger.error(
139275
"Task %s needs more resources than any EC2 instance "
140276
"can provide with the current configuration. Please check.",
141-
{f"{task.Name=}:{task.ServiceID=}"},
277+
{
278+
f"{task.Name if task.Name else 'unknown task name'}:{task.ServiceID if task.ServiceID else 'unknown service ID'}"
279+
},
142280
)
281+
282+
283+
async def check_dynamic_resources(app: FastAPI) -> None:
284+
"""Check that there are no pending tasks requiring additional resources in the cluster (docker swarm)
285+
If there are such tasks, this method will allocate new machines in AWS to cope with
286+
the additional load.
287+
"""
288+
app_settings: ApplicationSettings = app.state.settings
289+
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec
290+
291+
# 1. get monitored nodes information and resources
292+
docker_client = get_docker_client(app)
293+
294+
monitored_nodes = await utils_docker.get_monitored_nodes(
295+
docker_client,
296+
node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS,
297+
)
298+
cluster_total_resources = await utils_docker.compute_cluster_total_resources(
299+
monitored_nodes
300+
)
301+
cluster_used_resources = await utils_docker.compute_cluster_used_resources(
302+
docker_client, monitored_nodes
303+
)
304+
logger.info("Monitored nodes total resources: %s", f"{cluster_total_resources}")
305+
logger.info(
306+
"Monitored nodes current used resources: %s", f"{cluster_used_resources}"
307+
)
308+
309+
# 2. Cleanup nodes that are gone
310+
await utils_docker.remove_nodes(docker_client, monitored_nodes)
311+
312+
# 3. Scale up the cluster if there are pending tasks, else see if we shall scale down
313+
if pending_tasks := await utils_docker.pending_service_tasks_with_insufficient_resources(
314+
docker_client,
315+
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS,
316+
):
317+
if not await _try_scale_up_with_drained_nodes(
318+
app, monitored_nodes, pending_tasks
319+
):
320+
# no? then scale up
321+
await _scale_up_cluster(app, pending_tasks)
322+
else:
323+
await _mark_empty_active_nodes_to_drain(app, monitored_nodes)
324+
await _try_scale_down_cluster(app, monitored_nodes)

services/autoscaling/src/simcore_service_autoscaling/models.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,21 @@ class Resources(BaseModel):
99
cpus: NonNegativeFloat
1010
ram: ByteSize
1111

12+
@classmethod
13+
def create_as_empty(cls) -> "Resources":
14+
return cls(cpus=0, ram=ByteSize(0))
15+
16+
def __ge__(self, other: "Resources") -> bool:
17+
return self.cpus >= other.cpus and self.ram >= other.ram
18+
19+
def __add__(self, other: "Resources") -> "Resources":
20+
return Resources.construct(
21+
**{
22+
key: a + b
23+
for (key, a), b in zip(self.dict().items(), other.dict().values())
24+
}
25+
)
26+
1227

1328
class EC2Instance(BaseModel):
1429
name: str

0 commit comments

Comments
 (0)