Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ async def assert_autoscaled_dynamic_ec2_instances(
expected_instance_state: InstanceStateNameType,
expected_additional_tag_keys: list[str],
instance_filters: Sequence[FilterTypeDef] | None,
expected_user_data: list[str] | None = None,
) -> list[InstanceTypeDef]:
if expected_user_data is None:
expected_user_data = ["docker swarm join"]
return await assert_ec2_instances(
ec2_client,
expected_num_reservations=expected_num_reservations,
Expand All @@ -54,7 +57,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
"io.simcore.autoscaling.monitored_services_labels",
*expected_additional_tag_keys,
],
expected_user_data=["docker swarm join"],
expected_user_data=expected_user_data,
instance_filters=instance_filters,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,43 @@ async def _activate_drained_nodes(
)


async def _start_buffer_instances(
async def _start_warm_buffer_instances(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> Cluster:
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""

app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

instances_to_start = [
i.ec2_instance for i in cluster.buffer_ec2s if i.assigned_tasks
]

if (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
# check if we can migrate warm buffers to hot buffers
hot_buffer_instance_type = cast(
InstanceTypeType,
next(
iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)
),
)
free_starteable_warm_buffers_to_replace_hot_buffers = [
warm_buffer.ec2_instance
for warm_buffer in cluster.buffer_ec2s
if (warm_buffer.ec2_instance.type == hot_buffer_instance_type)
and not warm_buffer.assigned_tasks
]
instances_to_start += free_starteable_warm_buffers_to_replace_hot_buffers[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.buffer_drained_nodes)
]

if not instances_to_start:
return cluster
# change the buffer machine to an active one

with log_context(
_logger, logging.INFO, f"start {len(instances_to_start)} buffer machines"
):
Expand Down Expand Up @@ -1187,8 +1215,8 @@ async def _autoscale_cluster(
# 2. activate available drained nodes to cover some of the tasks
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)

# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)
# 3. start warm buffer instances to cover the remaining tasks
cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused instances
cluster = await _scale_down_unused_cluster_instances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _list_processing_tasks_on_worker(
async with _scheduler_client(scheduler_url, authentication) as client:
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)

_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")
_logger.debug("looking for processing tasks for %s", f"{worker_url=}")

# now get the used resources
worker_processing_tasks: list[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,14 @@ async def tag_node(
tags: dict[DockerLabelKey, str],
available: bool,
) -> Node:
assert node.spec # nosec
if (node.spec.labels == tags) and (
(node.spec.availability is Availability.active) == available
):
# nothing to do
return node
with log_context(
logger, logging.DEBUG, msg=f"tagging {node.id=} with {tags=} and {available=}"
logger, logging.DEBUG, msg=f"tag {node.id=} with {tags=} and {available=}"
):
assert node.id # nosec

Expand Down
202 changes: 195 additions & 7 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
EC2InstanceType,
Resources,
)
from common_library.json_serialization import json_dumps
from deepdiff import DeepDiff
from faker import Faker
from fakeredis.aioredis import FakeRedis
from fastapi import FastAPI
from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels
from models_library.docker import (
DockerGenericTag,
DockerLabelKey,
StandardSimcoreDockerLabels,
)
from models_library.generated_models.docker_rest_api import Availability
from models_library.generated_models.docker_rest_api import Node as DockerNode
from models_library.generated_models.docker_rest_api import (
Expand All @@ -45,7 +50,7 @@
Service,
TaskSpec,
)
from pydantic import ByteSize, PositiveInt, TypeAdapter
from pydantic import ByteSize, NonNegativeInt, PositiveInt, TypeAdapter
from pytest_mock import MockType
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.host import get_localhost_ip
Expand All @@ -57,6 +62,7 @@
)
from settings_library.rabbit import RabbitSettings
from settings_library.ssm import SSMSettings
from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY
from simcore_service_autoscaling.core.application import create_app
from simcore_service_autoscaling.core.settings import (
AUTOSCALING_ENV_PREFIX,
Expand All @@ -71,8 +77,14 @@
DaskTaskResources,
)
from simcore_service_autoscaling.modules import auto_scaling_core
from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import (
DynamicAutoscaling,
)
from simcore_service_autoscaling.modules.docker import AutoscalingDocker
from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API
from simcore_service_autoscaling.utils.buffer_machines_pool_core import (
get_deactivated_buffer_ec2_tags,
)
from simcore_service_autoscaling.utils.utils_docker import (
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
Expand All @@ -81,7 +93,9 @@
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed
from types_aiobotocore_ec2.literals import InstanceTypeType
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
from types_aiobotocore_ec2.type_defs import TagTypeDef

pytest_plugins = [
"pytest_simcore.aws_server",
Expand Down Expand Up @@ -991,10 +1005,22 @@ def _creator(


@pytest.fixture
def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int:
num_machines_in_buffer = 5
monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}")
return num_machines_in_buffer
def num_hot_buffer() -> NonNegativeInt:
return 5


@pytest.fixture
def with_instances_machines_hot_buffer(
num_hot_buffer: int,
app_environment: EnvVarsDict,
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
return app_environment | setenvs_from_dict(
monkeypatch,
{
"EC2_INSTANCES_MACHINES_BUFFER": f"{num_hot_buffer}",
},
)


@pytest.fixture
Expand Down Expand Up @@ -1042,3 +1068,165 @@ async def _(
autospec=True,
side_effect=_,
)


@pytest.fixture
def fake_pre_pull_images() -> list[DockerGenericTag]:
return TypeAdapter(list[DockerGenericTag]).validate_python(
[
"nginx:latest",
"itisfoundation/my-very-nice-service:latest",
"simcore/services/dynamic/another-nice-one:2.4.5",
"asd",
]
)


@pytest.fixture
def ec2_instances_allowed_types_with_only_1_buffered(
faker: Faker,
fake_pre_pull_images: list[DockerGenericTag],
external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific],
) -> dict[InstanceTypeType, EC2InstanceBootSpecific]:
if not external_ec2_instances_allowed_types:
return {
"t2.micro": EC2InstanceBootSpecific(
ami_id=faker.pystr(),
pre_pull_images=fake_pre_pull_images,
buffer_count=faker.pyint(min_value=1, max_value=10),
)
}

allowed_ec2_types = external_ec2_instances_allowed_types
allowed_ec2_types_with_buffer_defined = dict(
filter(
lambda instance_type_and_settings: instance_type_and_settings[
1
].buffer_count
> 0,
allowed_ec2_types.items(),
)
)
assert (
allowed_ec2_types_with_buffer_defined
), "one type with buffer is needed for the tests!"
assert (
len(allowed_ec2_types_with_buffer_defined) == 1
), "more than one type with buffer is disallowed in this test!"
return {
TypeAdapter(InstanceTypeType).validate_python(k): v
for k, v in allowed_ec2_types_with_buffer_defined.items()
}


@pytest.fixture
def buffer_count(
ec2_instances_allowed_types_with_only_1_buffered: dict[
InstanceTypeType, EC2InstanceBootSpecific
],
) -> int:
def _by_buffer_count(
instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific]
) -> bool:
_, boot_specific = instance_type_and_settings
return boot_specific.buffer_count > 0

allowed_ec2_types = ec2_instances_allowed_types_with_only_1_buffered
allowed_ec2_types_with_buffer_defined = dict(
filter(_by_buffer_count, allowed_ec2_types.items())
)
assert allowed_ec2_types_with_buffer_defined, "you need one type with buffer"
assert (
len(allowed_ec2_types_with_buffer_defined) == 1
), "more than one type with buffer is disallowed in this test!"
return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count


@pytest.fixture
async def create_buffer_machines(
ec2_client: EC2Client,
aws_ami_id: str,
app_settings: ApplicationSettings,
initialized_app: FastAPI,
) -> Callable[
[int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None],
Awaitable[list[str]],
]:
async def _do(
num: int,
instance_type: InstanceTypeType,
instance_state_name: InstanceStateNameType,
pre_pull_images: list[DockerGenericTag] | None,
) -> list[str]:
assert app_settings.AUTOSCALING_EC2_INSTANCES

assert instance_state_name in [
"running",
"stopped",
], "only 'running' and 'stopped' are supported for testing"

resource_tags: list[TagTypeDef] = [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in get_deactivated_buffer_ec2_tags(
initialized_app, DynamicAutoscaling()
).items()
]
if pre_pull_images is not None and instance_state_name == "stopped":
resource_tags.append(
{
"Key": PRE_PULLED_IMAGES_EC2_TAG_KEY,
"Value": f"{json_dumps(pre_pull_images)}",
}
)
with log_context(
logging.INFO, f"creating {num} buffer machines of {instance_type}"
):
instances = await ec2_client.run_instances(
ImageId=aws_ami_id,
MaxCount=num,
MinCount=num,
InstanceType=instance_type,
KeyName=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME,
SecurityGroupIds=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS,
SubnetId=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID,
IamInstanceProfile={
"Arn": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE
},
TagSpecifications=[
{"ResourceType": "instance", "Tags": resource_tags},
{"ResourceType": "volume", "Tags": resource_tags},
{"ResourceType": "network-interface", "Tags": resource_tags},
],
UserData="echo 'I am pytest'",
)
instance_ids = [
i["InstanceId"] for i in instances["Instances"] if "InstanceId" in i
]

waiter = ec2_client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=instance_ids)
instances = await ec2_client.describe_instances(InstanceIds=instance_ids)
assert "Reservations" in instances
assert instances["Reservations"]
assert "Instances" in instances["Reservations"][0]
assert len(instances["Reservations"][0]["Instances"]) == num
for instance in instances["Reservations"][0]["Instances"]:
assert "State" in instance
assert "Name" in instance["State"]
assert instance["State"]["Name"] == "running"

if instance_state_name == "stopped":
await ec2_client.stop_instances(InstanceIds=instance_ids)
instances = await ec2_client.describe_instances(InstanceIds=instance_ids)
assert "Reservations" in instances
assert instances["Reservations"]
assert "Instances" in instances["Reservations"][0]
assert len(instances["Reservations"][0]["Instances"]) == num
for instance in instances["Reservations"][0]["Instances"]:
assert "State" in instance
assert "Name" in instance["State"]
assert instance["State"]["Name"] == "stopped"

return instance_ids

return _do
Loading
Loading