Skip to content

Commit 6ed663d

Browse files
authored
šŸ›Autoscaling: ensure unstarteable warm buffer are replaced by cold instances if available (#8277)
1 parent 5affe86 commit 6ed663d

File tree

3 files changed

+140
-29
lines changed

3 files changed

+140
-29
lines changed

ā€Žpackages/aws-library/tests/test_ec2_client.pyā€Ž

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,63 @@ async def test_stop_start_instances(
419419
assert getattr(s, f.name) == getattr(c, f.name)
420420

421421

422+
async def test_start_instances_with_insufficient_instance_capacity(
423+
simcore_ec2_api: SimcoreEC2API,
424+
ec2_client: EC2Client,
425+
faker: Faker,
426+
ec2_instance_config: EC2InstanceConfig,
427+
mocker: MockerFixture,
428+
):
429+
# we have nothing running now in ec2
430+
await _assert_no_instances_in_ec2(ec2_client)
431+
# create some instance
432+
_NUM_INSTANCES = 10
433+
num_instances = faker.pyint(min_value=1, max_value=_NUM_INSTANCES)
434+
created_instances = await simcore_ec2_api.launch_instances(
435+
ec2_instance_config,
436+
min_number_of_instances=num_instances,
437+
number_of_instances=num_instances,
438+
)
439+
await _assert_instances_in_ec2(
440+
ec2_client,
441+
expected_num_reservations=1,
442+
expected_num_instances=num_instances,
443+
expected_instance_type=ec2_instance_config.type,
444+
expected_tags=ec2_instance_config.tags,
445+
expected_state="running",
446+
)
447+
# stop the instances
448+
await simcore_ec2_api.stop_instances(created_instances)
449+
await _assert_instances_in_ec2(
450+
ec2_client,
451+
expected_num_reservations=1,
452+
expected_num_instances=num_instances,
453+
expected_instance_type=ec2_instance_config.type,
454+
expected_tags=ec2_instance_config.tags,
455+
expected_state="stopped",
456+
)
457+
458+
# Mock the EC2 client to simulate InsufficientInstanceCapacity on first subnet
459+
async def mock_start_instances(*args, **kwargs) -> Any:
460+
# no more machines, simulate insufficient capacity
461+
error_response: dict[str, Any] = {
462+
"Error": {
463+
"Code": "InsufficientInstanceCapacity",
464+
"Message": "An error occurred (InsufficientInstanceCapacity) when calling the StartInstances operation (reached max retries: 4): Insufficient capacity.",
465+
},
466+
}
467+
raise botocore.exceptions.ClientError(error_response, "StartInstances") # type: ignore
468+
469+
# Apply the mock
470+
mocker.patch.object(
471+
simcore_ec2_api.client, "start_instances", side_effect=mock_start_instances
472+
)
473+
474+
# start the instances now
475+
with pytest.raises(EC2InsufficientCapacityError):
476+
await simcore_ec2_api.start_instances(created_instances)
477+
478+
422479
async def test_terminate_instance(
423480
simcore_ec2_api: SimcoreEC2API,
424481
ec2_client: EC2Client,
@@ -717,7 +774,9 @@ async def mock_run_instances(*args, **kwargs) -> Any:
717774
mocker.patch.object(
718775
simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances
719776
)
720-
with pytest.raises(EC2InsufficientCapacityError) as exc_info:
777+
with pytest.raises(
778+
EC2InsufficientCapacityError, match=fake_ec2_instance_type.name
779+
) as exc_info:
721780
await simcore_ec2_api.launch_instances(
722781
ec2_instance_config,
723782
min_number_of_instances=1,

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.pyā€Ž

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
EC2Tags,
1616
Resources,
1717
)
18-
from aws_library.ec2._errors import EC2AccessError, EC2TooManyInstancesError
18+
from aws_library.ec2._errors import (
19+
EC2AccessError,
20+
EC2InsufficientCapacityError,
21+
EC2TooManyInstancesError,
22+
)
1923
from fastapi import FastAPI
2024
from models_library.generated_models.docker_rest_api import Node
2125
from models_library.rabbitmq_messages import ProgressType
@@ -421,10 +425,46 @@ async def _activate_drained_nodes(
421425
)
422426

423427

428+
def _de_assign_tasks_from_warm_buffer_ec2s(
429+
cluster: Cluster, instances_to_start: list[EC2InstanceData]
430+
) -> tuple[Cluster, list]:
431+
# de-assign tasks from the warm buffer instances that could not be started
432+
deassigned_tasks = list(
433+
itertools.chain.from_iterable(
434+
i.assigned_tasks
435+
for i in cluster.warm_buffer_ec2s
436+
if i.ec2_instance in instances_to_start
437+
)
438+
)
439+
# upgrade the cluster
440+
return (
441+
dataclasses.replace(
442+
cluster,
443+
warm_buffer_ec2s=[
444+
(
445+
dataclasses.replace(i, assigned_tasks=[])
446+
if i.ec2_instance in instances_to_start
447+
else i
448+
)
449+
for i in cluster.warm_buffer_ec2s
450+
],
451+
),
452+
deassigned_tasks,
453+
)
454+
455+
424456
async def _try_start_warm_buffer_instances(
425457
app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider
426-
) -> Cluster:
427-
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""
458+
) -> tuple[Cluster, list]:
459+
"""
460+
starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed
461+
462+
Returns:
463+
A tuple containing:
464+
- The updated cluster instance after attempting to start warm buffer instances.
465+
- In case warm buffer could not be started, a list of de-assigned tasks (tasks whose resource requirements cannot be fulfilled by warm buffers anymore).
466+
467+
"""
428468

429469
app_settings = get_application_settings(app)
430470
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
@@ -466,26 +506,34 @@ async def _try_start_warm_buffer_instances(
466506
]
467507

468508
if not instances_to_start:
469-
return cluster
509+
return cluster, []
470510

471511
with log_context(
472-
_logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines"
512+
_logger,
513+
logging.INFO,
514+
f"start {len(instances_to_start)} warm buffer machines '{[i.id for i in instances_to_start]}'",
473515
):
474516
try:
475517
started_instances = await get_ec2_client(app).start_instances(
476518
instances_to_start
477519
)
478-
except EC2AccessError:
520+
except EC2InsufficientCapacityError:
521+
# NOTE: this warning is only raised if none of the instances could be started due to InsufficientCapacity
479522
_logger.warning(
480-
"Could not start warm buffer instances! "
481-
"TIP: This can happen in case of Insufficient "
482-
"Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. "
483-
"Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this "
484-
"will prevent fulfilling this instance type need.",
485-
exc_info=True,
523+
"Could not start warm buffer instances: %s due to Insufficient Capacity in the current AWS Availability Zone! "
524+
"The warm buffer assigned tasks will be moved to new instances if possible.",
525+
[i.id for i in instances_to_start],
486526
)
487-
# we need to re-assign the tasks assigned to the warm buffer instances
488-
return cluster
527+
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)
528+
529+
except EC2AccessError:
530+
_logger.exception(
531+
"Could not start warm buffer instances %s! TIP: This needs to be analysed!"
532+
"The warm buffer assigned tasks will be moved to new instances if possible.",
533+
[i.id for i in instances_to_start],
534+
)
535+
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)
536+
489537
# NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity)
490538
await get_ec2_client(app).set_instances_tags(
491539
started_instances,
@@ -495,15 +543,18 @@ async def _try_start_warm_buffer_instances(
495543
)
496544
started_instance_ids = [i.id for i in started_instances]
497545

498-
return dataclasses.replace(
499-
cluster,
500-
warm_buffer_ec2s=[
501-
i
502-
for i in cluster.warm_buffer_ec2s
503-
if i.ec2_instance.id not in started_instance_ids
504-
],
505-
pending_ec2s=cluster.pending_ec2s
506-
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
546+
return (
547+
dataclasses.replace(
548+
cluster,
549+
warm_buffer_ec2s=[
550+
i
551+
for i in cluster.warm_buffer_ec2s
552+
if i.ec2_instance.id not in started_instance_ids
553+
],
554+
pending_ec2s=cluster.pending_ec2s
555+
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
556+
),
557+
[],
507558
)
508559

509560

@@ -1243,7 +1294,11 @@ async def _autoscale_cluster(
12431294
cluster = await _activate_drained_nodes(app, cluster)
12441295

12451296
# 3. start warm buffer instances to cover the remaining tasks
1246-
cluster = await _try_start_warm_buffer_instances(app, cluster, auto_scaling_mode)
1297+
cluster, de_assigned_tasks = await _try_start_warm_buffer_instances(
1298+
app, cluster, auto_scaling_mode
1299+
)
1300+
# 3.1 if some tasks were de-assigned, we need to add them to the still pending tasks
1301+
still_pending_tasks.extend(de_assigned_tasks)
12471302

12481303
# 4. scale down unused instances
12491304
cluster = await _scale_down_unused_cluster_instances(

ā€Žservices/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.pyā€Ž

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2454,9 +2454,6 @@ async def _raise_insufficient_capacity_error(*args: Any, **kwargs: Any) -> None:
24542454
)
24552455

24562456

2457-
@pytest.mark.xfail(
2458-
reason="bug described in https://github.com/ITISFoundation/osparc-simcore/issues/8273"
2459-
)
24602457
@pytest.mark.parametrize(
24612458
# NOTE: only the main test test_cluster_scaling_up_and_down is run with all options
24622459
"with_docker_join_drained",
@@ -2495,7 +2492,7 @@ async def test_fresh_instance_is_launched_if_warm_buffers_cannot_start_due_to_in
24952492
InstanceTypeType,
24962493
next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)),
24972494
)
2498-
await create_buffer_machines(1, warm_buffer_instance_type, "stopped", None)
2495+
await create_buffer_machines(3, warm_buffer_instance_type, "stopped", None)
24992496

25002497
# create several tasks that needs more power
25012498
scale_up_params = _ScaleUpParams(

0 commit comments

Comments
Ā (0)