diff --git a/packages/aws-library/tests/test_ec2_client.py b/packages/aws-library/tests/test_ec2_client.py index 535421f65358..bfa54403248c 100644 --- a/packages/aws-library/tests/test_ec2_client.py +++ b/packages/aws-library/tests/test_ec2_client.py @@ -419,6 +419,63 @@ async def test_stop_start_instances( assert getattr(s, f.name) == getattr(c, f.name) +async def test_start_instances_with_insufficient_instance_capacity( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + faker: Faker, + ec2_instance_config: EC2InstanceConfig, + mocker: MockerFixture, +): + # we have nothing running now in ec2 + await _assert_no_instances_in_ec2(ec2_client) + # create some instance + _NUM_INSTANCES = 10 + num_instances = faker.pyint(min_value=1, max_value=_NUM_INSTANCES) + created_instances = await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=num_instances, + number_of_instances=num_instances, + ) + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=num_instances, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="running", + ) + # stop the instances + await simcore_ec2_api.stop_instances(created_instances) + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=num_instances, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="stopped", + ) + + # Mock the EC2 client to simulate InsufficientInstanceCapacity on first subnet + async def mock_start_instances(*args, **kwargs) -> Any: + # no more machines, simulate insufficient capacity + error_response: dict[str, Any] = { + "Error": { + "Code": "InsufficientInstanceCapacity", + "Message": "An error occurred (InsufficientInstanceCapacity) when calling the StartInstances operation (reached max retries: 4): Insufficient capacity.", + }, + } + raise botocore.exceptions.ClientError(error_response, "StartInstances") # type: ignore + + # Apply the mock + mocker.patch.object( + simcore_ec2_api.client, "start_instances", side_effect=mock_start_instances + ) + + # start the instances now + with pytest.raises(EC2InsufficientCapacityError): + await simcore_ec2_api.start_instances(created_instances) + + async def test_terminate_instance( simcore_ec2_api: SimcoreEC2API, ec2_client: EC2Client, @@ -717,7 +774,9 @@ async def mock_run_instances(*args, **kwargs) -> Any: mocker.patch.object( simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances ) - with pytest.raises(EC2InsufficientCapacityError) as exc_info: + with pytest.raises( + EC2InsufficientCapacityError, match=fake_ec2_instance_type.name + ) as exc_info: await simcore_ec2_api.launch_instances( ec2_instance_config, min_number_of_instances=1, diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index c2d714ebbaa2..2c12b625571e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -15,7 +15,11 @@ EC2Tags, Resources, ) -from aws_library.ec2._errors import EC2AccessError, EC2TooManyInstancesError +from aws_library.ec2._errors import ( + EC2AccessError, + EC2InsufficientCapacityError, + EC2TooManyInstancesError, +) from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Node from models_library.rabbitmq_messages import ProgressType @@ -421,10 +425,46 @@ async def _activate_drained_nodes( ) +def _de_assign_tasks_from_warm_buffer_ec2s( + cluster: Cluster, instances_to_start: list[EC2InstanceData] +) -> tuple[Cluster, list]: + # de-assign tasks from the warm buffer instances that could not be started + deassigned_tasks = list( + itertools.chain.from_iterable( + i.assigned_tasks + for i in cluster.warm_buffer_ec2s + if i.ec2_instance in instances_to_start + ) + ) + # upgrade the cluster + return ( + dataclasses.replace( + cluster, + warm_buffer_ec2s=[ + ( + dataclasses.replace(i, assigned_tasks=[]) + if i.ec2_instance in instances_to_start + else i + ) + for i in cluster.warm_buffer_ec2s + ], + ), + deassigned_tasks, + ) + + async def _try_start_warm_buffer_instances( app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider -) -> Cluster: - """starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed""" +) -> tuple[Cluster, list]: + """ + starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed + + Returns: + A tuple containing: + - The updated cluster instance after attempting to start warm buffer instances. + - In case warm buffer could not be started, a list of de-assigned tasks (tasks whose resource requirements cannot be fulfilled by warm buffers anymore). + + """ app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec @@ -466,26 +506,34 @@ async def _try_start_warm_buffer_instances( ] if not instances_to_start: - return cluster + return cluster, [] with log_context( - _logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines" + _logger, + logging.INFO, + f"start {len(instances_to_start)} warm buffer machines '{[i.id for i in instances_to_start]}'", ): try: started_instances = await get_ec2_client(app).start_instances( instances_to_start ) - except EC2AccessError: + except EC2InsufficientCapacityError: + # NOTE: this warning is only raised if none of the instances could be started due to InsufficientCapacity _logger.warning( - "Could not start warm buffer instances! " - "TIP: This can happen in case of Insufficient " - "Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. " - "Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this " - "will prevent fulfilling this instance type need.", - exc_info=True, + "Could not start warm buffer instances: %s due to Insufficient Capacity in the current AWS Availability Zone! " + "The warm buffer assigned tasks will be moved to new instances if possible.", + [i.id for i in instances_to_start], ) - # we need to re-assign the tasks assigned to the warm buffer instances - return cluster + return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start) + + except EC2AccessError: + _logger.exception( + "Could not start warm buffer instances %s! TIP: This needs to be analysed!" + "The warm buffer assigned tasks will be moved to new instances if possible.", + [i.id for i in instances_to_start], + ) + return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start) + # NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity) await get_ec2_client(app).set_instances_tags( started_instances, @@ -495,15 +543,18 @@ async def _try_start_warm_buffer_instances( ) started_instance_ids = [i.id for i in started_instances] - return dataclasses.replace( - cluster, - warm_buffer_ec2s=[ - i - for i in cluster.warm_buffer_ec2s - if i.ec2_instance.id not in started_instance_ids - ], - pending_ec2s=cluster.pending_ec2s - + [NonAssociatedInstance(ec2_instance=i) for i in started_instances], + return ( + dataclasses.replace( + cluster, + warm_buffer_ec2s=[ + i + for i in cluster.warm_buffer_ec2s + if i.ec2_instance.id not in started_instance_ids + ], + pending_ec2s=cluster.pending_ec2s + + [NonAssociatedInstance(ec2_instance=i) for i in started_instances], + ), + [], ) @@ -1243,7 +1294,11 @@ async def _autoscale_cluster( cluster = await _activate_drained_nodes(app, cluster) # 3. start warm buffer instances to cover the remaining tasks - cluster = await _try_start_warm_buffer_instances(app, cluster, auto_scaling_mode) + cluster, de_assigned_tasks = await _try_start_warm_buffer_instances( + app, cluster, auto_scaling_mode + ) + # 3.1 if some tasks were de-assigned, we need to add them to the still pending tasks + still_pending_tasks.extend(de_assigned_tasks) # 4. scale down unused instances cluster = await _scale_down_unused_cluster_instances( diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py index 874ec6b733c7..40b3f6b3b90c 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py @@ -2454,9 +2454,6 @@ async def _raise_insufficient_capacity_error(*args: Any, **kwargs: Any) -> None: ) -@pytest.mark.xfail( - reason="bug described in https://github.com/ITISFoundation/osparc-simcore/issues/8273" -) @pytest.mark.parametrize( # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options "with_docker_join_drained", @@ -2495,7 +2492,7 @@ async def test_fresh_instance_is_launched_if_warm_buffers_cannot_start_due_to_in InstanceTypeType, next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)), ) - await create_buffer_machines(1, warm_buffer_instance_type, "stopped", None) + await create_buffer_machines(3, warm_buffer_instance_type, "stopped", None) # create several tasks that needs more power scale_up_params = _ScaleUpParams(