Skip to content

Commit 02172c1

Browse files
committed
fix
1 parent 3198758 commit 02172c1

File tree

2 files changed

+80
-28
lines changed

2 files changed

+80
-28
lines changed

services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
EC2Tags,
1616
Resources,
1717
)
18-
from aws_library.ec2._errors import EC2AccessError, EC2TooManyInstancesError
18+
from aws_library.ec2._errors import (
19+
EC2AccessError,
20+
EC2InsufficientCapacityError,
21+
EC2TooManyInstancesError,
22+
)
1923
from fastapi import FastAPI
2024
from models_library.generated_models.docker_rest_api import Node
2125
from models_library.rabbitmq_messages import ProgressType
@@ -421,10 +425,46 @@ async def _activate_drained_nodes(
421425
)
422426

423427

428+
def _de_assign_tasks_from_warm_buffer_ec2s(
429+
cluster: Cluster, instances_to_start: list[EC2InstanceData]
430+
) -> tuple[Cluster, list]:
431+
# de-assign tasks from the warm buffer instances that could not be started
432+
deassigned_tasks = list(
433+
itertools.chain.from_iterable(
434+
i.assigned_tasks
435+
for i in cluster.warm_buffer_ec2s
436+
if i.ec2_instance in instances_to_start
437+
)
438+
)
439+
# upgrade the cluster
440+
return (
441+
dataclasses.replace(
442+
cluster,
443+
warm_buffer_ec2s=[
444+
(
445+
dataclasses.replace(i, assigned_tasks=[])
446+
if i.ec2_instance in instances_to_start
447+
else i
448+
)
449+
for i in cluster.warm_buffer_ec2s
450+
],
451+
),
452+
deassigned_tasks,
453+
)
454+
455+
424456
async def _try_start_warm_buffer_instances(
425457
app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider
426-
) -> Cluster:
427-
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""
458+
) -> tuple[Cluster, list]:
459+
"""
460+
starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed
461+
462+
Returns:
463+
A tuple containing:
464+
- The updated cluster instance after attempting to start warm buffer instances.
465+
- In case warm buffer could not be started, a list of de-assigned tasks (tasks whose resource requirements cannot be fulfilled by warm buffers anymore).
466+
467+
"""
428468

429469
app_settings = get_application_settings(app)
430470
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
@@ -466,26 +506,34 @@ async def _try_start_warm_buffer_instances(
466506
]
467507

468508
if not instances_to_start:
469-
return cluster
509+
return cluster, []
470510

471511
with log_context(
472-
_logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines"
512+
_logger,
513+
logging.INFO,
514+
f"start {len(instances_to_start)} warm buffer machines '{[i.id for i in instances_to_start]}'",
473515
):
474516
try:
475517
started_instances = await get_ec2_client(app).start_instances(
476518
instances_to_start
477519
)
478-
except EC2AccessError:
520+
except EC2InsufficientCapacityError:
521+
# NOTE: this warning is only raised if none of the instances could be started due to InsufficientCapacity
479522
_logger.warning(
480-
"Could not start warm buffer instances! "
481-
"TIP: This can happen in case of Insufficient "
482-
"Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. "
483-
"Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this "
484-
"will prevent fulfilling this instance type need.",
485-
exc_info=True,
523+
"Could not start warm buffer instances: %s due to Insufficient Capacity in the current AWS Availability Zone! "
524+
"The warm buffer assigned tasks will be moved to new instances if possible.",
525+
[i.id for i in instances_to_start],
486526
)
487-
# we need to re-assign the tasks assigned to the warm buffer instances
488-
return cluster
527+
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)
528+
529+
except EC2AccessError:
530+
_logger.exception(
531+
"Could not start warm buffer instances %s! TIP: This needs to be analysed!"
532+
"The warm buffer assigned tasks will be moved to new instances if possible.",
533+
[i.id for i in instances_to_start],
534+
)
535+
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)
536+
489537
# NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity)
490538
await get_ec2_client(app).set_instances_tags(
491539
started_instances,
@@ -495,15 +543,18 @@ async def _try_start_warm_buffer_instances(
495543
)
496544
started_instance_ids = [i.id for i in started_instances]
497545

498-
return dataclasses.replace(
499-
cluster,
500-
warm_buffer_ec2s=[
501-
i
502-
for i in cluster.warm_buffer_ec2s
503-
if i.ec2_instance.id not in started_instance_ids
504-
],
505-
pending_ec2s=cluster.pending_ec2s
506-
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
546+
return (
547+
dataclasses.replace(
548+
cluster,
549+
warm_buffer_ec2s=[
550+
i
551+
for i in cluster.warm_buffer_ec2s
552+
if i.ec2_instance.id not in started_instance_ids
553+
],
554+
pending_ec2s=cluster.pending_ec2s
555+
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
556+
),
557+
[],
507558
)
508559

509560

@@ -1243,7 +1294,11 @@ async def _autoscale_cluster(
12431294
cluster = await _activate_drained_nodes(app, cluster)
12441295

12451296
# 3. start warm buffer instances to cover the remaining tasks
1246-
cluster = await _try_start_warm_buffer_instances(app, cluster, auto_scaling_mode)
1297+
cluster, de_assigned_tasks = await _try_start_warm_buffer_instances(
1298+
app, cluster, auto_scaling_mode
1299+
)
1300+
# 3.1 if some tasks were de-assigned, we need to add them to the still pending tasks
1301+
still_pending_tasks.extend(de_assigned_tasks)
12471302

12481303
# 4. scale down unused instances
12491304
cluster = await _scale_down_unused_cluster_instances(

services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2454,9 +2454,6 @@ async def _raise_insufficient_capacity_error(*args: Any, **kwargs: Any) -> None:
24542454
)
24552455

24562456

2457-
@pytest.mark.xfail(
2458-
reason="bug described in https://github.com/ITISFoundation/osparc-simcore/issues/8273"
2459-
)
24602457
@pytest.mark.parametrize(
24612458
# NOTE: only the main test test_cluster_scaling_up_and_down is run with all options
24622459
"with_docker_join_drained",
@@ -2495,7 +2492,7 @@ async def test_fresh_instance_is_launched_if_warm_buffers_cannot_start_due_to_in
24952492
InstanceTypeType,
24962493
next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)),
24972494
)
2498-
await create_buffer_machines(1, warm_buffer_instance_type, "stopped", None)
2495+
await create_buffer_machines(3, warm_buffer_instance_type, "stopped", None)
24992496

25002497
# create several tasks that needs more power
25012498
scale_up_params = _ScaleUpParams(

0 commit comments

Comments
 (0)