Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion packages/aws-library/tests/test_ec2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,63 @@ async def test_stop_start_instances(
assert getattr(s, f.name) == getattr(c, f.name)


async def test_start_instances_with_insufficient_instance_capacity(
simcore_ec2_api: SimcoreEC2API,
ec2_client: EC2Client,
faker: Faker,
ec2_instance_config: EC2InstanceConfig,
mocker: MockerFixture,
):
# we have nothing running now in ec2
await _assert_no_instances_in_ec2(ec2_client)
# create some instance
_NUM_INSTANCES = 10
num_instances = faker.pyint(min_value=1, max_value=_NUM_INSTANCES)
created_instances = await simcore_ec2_api.launch_instances(
ec2_instance_config,
min_number_of_instances=num_instances,
number_of_instances=num_instances,
)
await _assert_instances_in_ec2(
ec2_client,
expected_num_reservations=1,
expected_num_instances=num_instances,
expected_instance_type=ec2_instance_config.type,
expected_tags=ec2_instance_config.tags,
expected_state="running",
)
# stop the instances
await simcore_ec2_api.stop_instances(created_instances)
await _assert_instances_in_ec2(
ec2_client,
expected_num_reservations=1,
expected_num_instances=num_instances,
expected_instance_type=ec2_instance_config.type,
expected_tags=ec2_instance_config.tags,
expected_state="stopped",
)

# Mock the EC2 client to simulate InsufficientInstanceCapacity on first subnet
async def mock_start_instances(*args, **kwargs) -> Any:
# no more machines, simulate insufficient capacity
error_response: dict[str, Any] = {
"Error": {
"Code": "InsufficientInstanceCapacity",
"Message": "An error occurred (InsufficientInstanceCapacity) when calling the StartInstances operation (reached max retries: 4): Insufficient capacity.",
},
}
raise botocore.exceptions.ClientError(error_response, "StartInstances") # type: ignore

# Apply the mock
mocker.patch.object(
simcore_ec2_api.client, "start_instances", side_effect=mock_start_instances
)

# start the instances now
with pytest.raises(EC2InsufficientCapacityError):
await simcore_ec2_api.start_instances(created_instances)


async def test_terminate_instance(
simcore_ec2_api: SimcoreEC2API,
ec2_client: EC2Client,
Expand Down Expand Up @@ -717,7 +774,9 @@ async def mock_run_instances(*args, **kwargs) -> Any:
mocker.patch.object(
simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances
)
with pytest.raises(EC2InsufficientCapacityError) as exc_info:
with pytest.raises(
EC2InsufficientCapacityError, match=fake_ec2_instance_type.name
) as exc_info:
await simcore_ec2_api.launch_instances(
ec2_instance_config,
min_number_of_instances=1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
EC2Tags,
Resources,
)
from aws_library.ec2._errors import EC2AccessError, EC2TooManyInstancesError
from aws_library.ec2._errors import (
EC2AccessError,
EC2InsufficientCapacityError,
EC2TooManyInstancesError,
)
from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import Node
from models_library.rabbitmq_messages import ProgressType
Expand Down Expand Up @@ -421,10 +425,46 @@ async def _activate_drained_nodes(
)


def _de_assign_tasks_from_warm_buffer_ec2s(
cluster: Cluster, instances_to_start: list[EC2InstanceData]
) -> tuple[Cluster, list]:
# de-assign tasks from the warm buffer instances that could not be started
deassigned_tasks = list(
itertools.chain.from_iterable(
i.assigned_tasks
for i in cluster.warm_buffer_ec2s
if i.ec2_instance in instances_to_start
)
)
# upgrade the cluster
return (
dataclasses.replace(
cluster,
warm_buffer_ec2s=[
(
dataclasses.replace(i, assigned_tasks=[])
if i.ec2_instance in instances_to_start
else i
)
for i in cluster.warm_buffer_ec2s
],
),
deassigned_tasks,
)


async def _try_start_warm_buffer_instances(
app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider
) -> Cluster:
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""
) -> tuple[Cluster, list]:
"""
starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed

Returns:
A tuple containing:
- The updated cluster instance after attempting to start warm buffer instances.
- In case warm buffer could not be started, a list of de-assigned tasks (tasks whose resource requirements cannot be fulfilled by warm buffers anymore).

"""

app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
Expand Down Expand Up @@ -466,26 +506,34 @@ async def _try_start_warm_buffer_instances(
]

if not instances_to_start:
return cluster
return cluster, []

with log_context(
_logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines"
_logger,
logging.INFO,
f"start {len(instances_to_start)} warm buffer machines '{[i.id for i in instances_to_start]}'",
):
try:
started_instances = await get_ec2_client(app).start_instances(
instances_to_start
)
except EC2AccessError:
except EC2InsufficientCapacityError:
# NOTE: this warning is only raised if none of the instances could be started due to InsufficientCapacity
_logger.warning(
"Could not start warm buffer instances! "
"TIP: This can happen in case of Insufficient "
"Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. "
"Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this "
"will prevent fulfilling this instance type need.",
exc_info=True,
"Could not start warm buffer instances: %s due to Insufficient Capacity in the current AWS Availability Zone! "
"The warm buffer assigned tasks will be moved to new instances if possible.",
[i.id for i in instances_to_start],
)
# we need to re-assign the tasks assigned to the warm buffer instances
return cluster
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)

except EC2AccessError:
_logger.exception(
"Could not start warm buffer instances %s! TIP: This needs to be analysed!"
"The warm buffer assigned tasks will be moved to new instances if possible.",
[i.id for i in instances_to_start],
)
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)

# NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity)
await get_ec2_client(app).set_instances_tags(
started_instances,
Expand All @@ -495,15 +543,18 @@ async def _try_start_warm_buffer_instances(
)
started_instance_ids = [i.id for i in started_instances]

return dataclasses.replace(
cluster,
warm_buffer_ec2s=[
i
for i in cluster.warm_buffer_ec2s
if i.ec2_instance.id not in started_instance_ids
],
pending_ec2s=cluster.pending_ec2s
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
return (
dataclasses.replace(
cluster,
warm_buffer_ec2s=[
i
for i in cluster.warm_buffer_ec2s
if i.ec2_instance.id not in started_instance_ids
],
pending_ec2s=cluster.pending_ec2s
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
),
[],
)


Expand Down Expand Up @@ -1243,7 +1294,11 @@ async def _autoscale_cluster(
cluster = await _activate_drained_nodes(app, cluster)

# 3. start warm buffer instances to cover the remaining tasks
cluster = await _try_start_warm_buffer_instances(app, cluster, auto_scaling_mode)
cluster, de_assigned_tasks = await _try_start_warm_buffer_instances(
app, cluster, auto_scaling_mode
)
# 3.1 if some tasks were de-assigned, we need to add them to the still pending tasks
still_pending_tasks.extend(de_assigned_tasks)

# 4. scale down unused instances
cluster = await _scale_down_unused_cluster_instances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2454,9 +2454,6 @@ async def _raise_insufficient_capacity_error(*args: Any, **kwargs: Any) -> None:
)


@pytest.mark.xfail(
reason="bug described in https://github.com/ITISFoundation/osparc-simcore/issues/8273"
)
@pytest.mark.parametrize(
# NOTE: only the main test test_cluster_scaling_up_and_down is run with all options
"with_docker_join_drained",
Expand Down Expand Up @@ -2495,7 +2492,7 @@ async def test_fresh_instance_is_launched_if_warm_buffers_cannot_start_due_to_in
InstanceTypeType,
next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)),
)
await create_buffer_machines(1, warm_buffer_instance_type, "stopped", None)
await create_buffer_machines(3, warm_buffer_instance_type, "stopped", None)

# create several tasks that needs more power
scale_up_params = _ScaleUpParams(
Expand Down
Loading