Skip to content

Commit 83c5f31

Browse files
authored
✨Autoscaling: add a delay before draining a node (#5843)
1 parent 4121401 commit 83c5f31

File tree

16 files changed

+355
-166
lines changed

16 files changed

+355
-166
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ htmlcov/
4747
.cache
4848
nosetests.xml
4949
coverage.xml
50+
cov.xml
5051
*.cover
5152
.hypothesis/
5253
.pytest_cache/

packages/settings-library/src/settings_library/docker_registry.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from functools import cached_property
2+
from typing import Any, ClassVar
23

34
from pydantic import Field, SecretStr, validator
45

@@ -24,7 +25,7 @@ class RegistrySettings(BaseCustomSettings):
2425

2526
@validator("REGISTRY_PATH", pre=True)
2627
@classmethod
27-
def escape_none_string(cls, v):
28+
def escape_none_string(cls, v) -> Any | None:
2829
return None if v == "None" else v
2930

3031
@cached_property
@@ -34,3 +35,16 @@ def resolved_registry_url(self) -> str:
3435
@cached_property
3536
def api_url(self) -> str:
3637
return f"{self.REGISTRY_URL}/v2"
38+
39+
class Config(BaseCustomSettings.Config):
40+
schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc]
41+
"examples": [
42+
{
43+
"REGISTRY_AUTH": "True",
44+
"REGISTRY_USER": "theregistryuser",
45+
"REGISTRY_PW": "some_secret_value",
46+
"REGISTRY_SSL": "True",
47+
"REGISTRY_URL": "registry.osparc-master.speag.com",
48+
}
49+
],
50+
}

services/autoscaling/src/simcore_service_autoscaling/core/settings.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ class EC2InstancesSettings(BaseCustomSettings):
100100
" (https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html), "
101101
"this is required to start a new EC2 instance",
102102
)
103+
EC2_INSTANCES_TIME_BEFORE_DRAINING: datetime.timedelta = Field(
104+
default=datetime.timedelta(seconds=20),
105+
description="Time after which an EC2 instance may be drained (10s<=T<=1 minutes, is automatically capped)"
106+
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
107+
)
103108
EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field(
104109
default=datetime.timedelta(minutes=1),
105110
description="Time after which an EC2 instance may be terminated (0<=T<=59 minutes, is automatically capped)"
@@ -111,9 +116,22 @@ class EC2InstancesSettings(BaseCustomSettings):
111116
"a tag must have a key and an optional value. see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html]",
112117
)
113118

119+
@validator("EC2_INSTANCES_TIME_BEFORE_DRAINING")
120+
@classmethod
121+
def ensure_draining_delay_time_is_in_range(
122+
cls, value: datetime.timedelta
123+
) -> datetime.timedelta:
124+
if value < datetime.timedelta(seconds=10):
125+
value = datetime.timedelta(seconds=10)
126+
elif value > datetime.timedelta(minutes=1):
127+
value = datetime.timedelta(minutes=1)
128+
return value
129+
114130
@validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION")
115131
@classmethod
116-
def ensure_time_is_in_range(cls, value):
132+
def ensure_termination_delay_time_is_in_range(
133+
cls, value: datetime.timedelta
134+
) -> datetime.timedelta:
117135
if value < datetime.timedelta(minutes=0):
118136
value = datetime.timedelta(minutes=0)
119137
elif value > datetime.timedelta(minutes=59):

services/autoscaling/src/simcore_service_autoscaling/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def __post_init__(self) -> None:
3434
if self.available_resources == Resources.create_as_empty():
3535
object.__setattr__(self, "available_resources", self.ec2_instance.resources)
3636

37+
def has_assigned_tasks(self) -> bool:
38+
return bool(self.available_resources < self.ec2_instance.resources)
39+
3740

3841
@dataclass(frozen=True, kw_only=True, slots=True)
3942
class AssociatedInstance(_BaseInstance):

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -751,23 +751,61 @@ async def _scale_up_cluster(
751751
return cluster
752752

753753

754+
async def _find_drainable_nodes(
755+
app: FastAPI, cluster: Cluster
756+
) -> list[AssociatedInstance]:
757+
app_settings: ApplicationSettings = app.state.settings
758+
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
759+
760+
if not cluster.active_nodes:
761+
# there is nothing to drain here
762+
return []
763+
764+
# get the corresponding ec2 instance data
765+
drainable_nodes: list[AssociatedInstance] = []
766+
767+
for instance in cluster.active_nodes:
768+
if instance.has_assigned_tasks():
769+
await utils_docker.set_node_found_empty(
770+
get_docker_client(app), instance.node, empty=False
771+
)
772+
continue
773+
node_last_empty = await utils_docker.get_node_empty_since(instance.node)
774+
if not node_last_empty:
775+
await utils_docker.set_node_found_empty(
776+
get_docker_client(app), instance.node, empty=True
777+
)
778+
continue
779+
elapsed_time_since_empty = arrow.utcnow().datetime - node_last_empty
780+
_logger.debug("%s", f"{node_last_empty=}, {elapsed_time_since_empty=}")
781+
if (
782+
elapsed_time_since_empty
783+
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
784+
):
785+
drainable_nodes.append(instance)
786+
else:
787+
_logger.info(
788+
"%s has still %ss before being drainable",
789+
f"{instance.ec2_instance.id=}",
790+
f"{(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING - elapsed_time_since_empty).total_seconds():.0f}",
791+
)
792+
793+
if drainable_nodes:
794+
_logger.info(
795+
"the following nodes were found to be drainable: '%s'",
796+
f"{[instance.node.Description.Hostname for instance in drainable_nodes if instance.node.Description]}",
797+
)
798+
return drainable_nodes
799+
800+
754801
async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
755802
app_settings = get_application_settings(app)
756803
docker_client = get_docker_client(app)
757-
active_empty_instances: list[AssociatedInstance] = []
758-
active_non_empty_instances: list[AssociatedInstance] = []
759-
for instance in cluster.active_nodes:
760-
if instance.available_resources == instance.ec2_instance.resources:
761-
active_empty_instances.append(instance)
762-
else:
763-
active_non_empty_instances.append(instance)
804+
active_empty_instances = await _find_drainable_nodes(app, cluster)
764805

765806
if not active_empty_instances:
766807
return cluster
767-
_logger.info(
768-
"following nodes will be drained: '%s'",
769-
f"{[instance.node.Description.Hostname for instance in active_empty_instances if instance.node.Description]}",
770-
)
808+
771809
# drain this empty nodes
772810
updated_nodes: list[Node] = await asyncio.gather(
773811
*(
@@ -782,7 +820,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
782820
)
783821
if updated_nodes:
784822
_logger.info(
785-
"following nodes set to drain: '%s'",
823+
"following nodes were set to drain: '%s'",
786824
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
787825
)
788826
newly_drained_instances = [
@@ -791,7 +829,9 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
791829
]
792830
return dataclasses.replace(
793831
cluster,
794-
active_nodes=active_non_empty_instances,
832+
active_nodes=[
833+
n for n in cluster.active_nodes if n not in active_empty_instances
834+
],
795835
drained_nodes=cluster.drained_nodes + newly_drained_instances,
796836
)
797837

@@ -814,7 +854,7 @@ async def _find_terminateable_instances(
814854
elapsed_time_since_drained = (
815855
datetime.datetime.now(datetime.timezone.utc) - node_last_updated
816856
)
817-
_logger.warning("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}")
857+
_logger.debug("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}")
818858
if (
819859
elapsed_time_since_drained
820860
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION

services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@
7171
]
7272

7373

74+
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
75+
DockerLabelKey, "io.simcore.osparc-node-found-empty"
76+
)
77+
78+
7479
async def get_monitored_nodes(
7580
docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey]
7681
) -> list[Node]:
@@ -609,6 +614,38 @@ def get_node_last_readyness_update(node: Node) -> datetime.datetime:
609614
) # mypy
610615

611616

617+
async def set_node_found_empty(
618+
docker_client: AutoscalingDocker,
619+
node: Node,
620+
*,
621+
empty: bool,
622+
) -> Node:
623+
assert node.Spec # nosec
624+
new_tags = deepcopy(cast(dict[DockerLabelKey, str], node.Spec.Labels))
625+
if empty:
626+
new_tags[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat()
627+
else:
628+
new_tags.pop(_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, None)
629+
return await tag_node(
630+
docker_client,
631+
node,
632+
tags=new_tags,
633+
available=bool(node.Spec.Availability is Availability.active),
634+
)
635+
636+
637+
async def get_node_empty_since(node: Node) -> datetime.datetime | None:
638+
"""returns the last time when the node was found empty or None if it was not empty"""
639+
assert node.Spec # nosec
640+
assert node.Spec.Labels # nosec
641+
if _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY not in node.Spec.Labels:
642+
return None
643+
return cast(
644+
datetime.datetime,
645+
arrow.get(node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY]).datetime,
646+
) # mypy
647+
648+
612649
async def attach_node(
613650
app_settings: ApplicationSettings,
614651
docker_client: AutoscalingDocker,

services/autoscaling/tests/manual/.env-devel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ AUTOSCALING_EC2_REGION_NAME=us-east-1
88
AUTOSCALING_REMOTE_DEBUGGING_PORT=3000
99
EC2_INSTANCES_MACHINES_BUFFER=0
1010
EC2_INSTANCES_MAX_INSTANCES=20
11+
EC2_INSTANCES_TIME_BEFORE_DRAINING="00:00:10"
1112
EC2_INSTANCES_TIME_BEFORE_TERMINATION="00:03:00"
1213
EC2_INSTANCES_ALLOWED_TYPES='{"t2.micro": {"ami_id": "XXXXXXXX", "custom_boot_scripts": ["whoami"], "pre_pull_images": ["ubuntu:latest"]}}'
1314
EC2_INSTANCES_KEY_NAME=XXXXXXXXXX

services/autoscaling/tests/unit/test_core_settings.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,26 @@ def test_defining_both_computational_and_dynamic_modes_is_invalid_and_raises(
8888
ApplicationSettings.create_from_envs()
8989

9090

91+
def test_invalid_EC2_INSTANCES_TIME_BEFORE_DRAINING( # noqa: N802
92+
app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch
93+
):
94+
setenvs_from_dict(monkeypatch, {"EC2_INSTANCES_TIME_BEFORE_DRAINING": "1:05:00"})
95+
settings = ApplicationSettings.create_from_envs()
96+
assert settings.AUTOSCALING_EC2_INSTANCES
97+
assert settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
98+
assert (
99+
datetime.timedelta(minutes=1)
100+
== settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
101+
)
102+
setenvs_from_dict(monkeypatch, {"EC2_INSTANCES_TIME_BEFORE_DRAINING": "-1:05:00"})
103+
settings = ApplicationSettings.create_from_envs()
104+
assert settings.AUTOSCALING_EC2_INSTANCES
105+
assert (
106+
datetime.timedelta(seconds=10)
107+
== settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
108+
)
109+
110+
91111
def test_invalid_EC2_INSTANCES_TIME_BEFORE_TERMINATION( # noqa: N802
92112
app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch
93113
):

services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from simcore_service_autoscaling.modules.docker import get_docker_client
4545
from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API
4646
from simcore_service_autoscaling.utils.utils_docker import (
47+
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY,
4748
_OSPARC_SERVICE_READY_LABEL_KEY,
4849
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
4950
)
@@ -581,7 +582,40 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
581582
# 4. now scaling down, as we deleted all the tasks
582583
#
583584
del dask_future
585+
await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode)
586+
mock_dask_is_worker_connected.assert_called_once()
587+
mock_dask_is_worker_connected.reset_mock()
588+
mock_dask_get_worker_has_results_in_memory.assert_called()
589+
assert mock_dask_get_worker_has_results_in_memory.call_count == 2
590+
mock_dask_get_worker_has_results_in_memory.reset_mock()
591+
mock_dask_get_worker_used_resources.assert_called()
592+
assert mock_dask_get_worker_used_resources.call_count == 2
593+
mock_dask_get_worker_used_resources.reset_mock()
594+
# the node shall be waiting before draining
595+
mock_docker_set_node_availability.assert_not_called()
596+
mock_docker_tag_node.assert_called_once_with(
597+
get_docker_client(initialized_app),
598+
fake_attached_node,
599+
tags=fake_attached_node.Spec.Labels
600+
| {
601+
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY,
602+
},
603+
available=True,
604+
)
605+
mock_docker_tag_node.reset_mock()
606+
607+
# now update the fake node to have the required label as expected
608+
assert app_settings.AUTOSCALING_EC2_INSTANCES
609+
fake_attached_node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = (
610+
arrow.utcnow()
611+
.shift(
612+
seconds=-app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING.total_seconds()
613+
- 1
614+
)
615+
.datetime.isoformat()
616+
)
584617

618+
# now it will drain
585619
await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode)
586620
mock_dask_is_worker_connected.assert_called_once()
587621
mock_dask_is_worker_connected.reset_mock()
@@ -598,6 +632,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
598632
fake_attached_node,
599633
tags=fake_attached_node.Spec.Labels
600634
| {
635+
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY,
601636
_OSPARC_SERVICE_READY_LABEL_KEY: "false",
602637
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY,
603638
},

0 commit comments

Comments
 (0)