diff --git a/packages/aws-library/src/aws_library/ec2/__init__.py b/packages/aws-library/src/aws_library/ec2/__init__.py index 112c70861b29..0acff01ff0d6 100644 --- a/packages/aws-library/src/aws_library/ec2/__init__.py +++ b/packages/aws-library/src/aws_library/ec2/__init__.py @@ -1,5 +1,10 @@ from ._client import SimcoreEC2API -from ._errors import EC2AccessError, EC2NotConnectedError, EC2RuntimeError +from ._errors import ( + EC2AccessError, + EC2InsufficientCapacityError, + EC2NotConnectedError, + EC2RuntimeError, +) from ._models import ( AWS_TAG_KEY_MAX_LENGTH, AWS_TAG_KEY_MIN_LENGTH, @@ -16,22 +21,22 @@ ) __all__: tuple[str, ...] = ( - "AWSTagKey", - "AWSTagValue", - "AWS_TAG_KEY_MIN_LENGTH", "AWS_TAG_KEY_MAX_LENGTH", - "AWS_TAG_VALUE_MIN_LENGTH", + "AWS_TAG_KEY_MIN_LENGTH", "AWS_TAG_VALUE_MAX_LENGTH", + "AWS_TAG_VALUE_MIN_LENGTH", + "AWSTagKey", + "AWSTagValue", "EC2AccessError", "EC2InstanceBootSpecific", "EC2InstanceConfig", "EC2InstanceData", "EC2InstanceType", + "EC2InsufficientCapacityError", "EC2NotConnectedError", "EC2RuntimeError", "EC2Tags", "Resources", "SimcoreEC2API", ) - # nopycln: file diff --git a/packages/aws-library/src/aws_library/ec2/_client.py b/packages/aws-library/src/aws_library/ec2/_client.py index 970d6130e69a..911d04067ac4 100644 --- a/packages/aws-library/src/aws_library/ec2/_client.py +++ b/packages/aws-library/src/aws_library/ec2/_client.py @@ -13,10 +13,17 @@ from settings_library.ec2 import EC2Settings from types_aiobotocore_ec2 import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType -from types_aiobotocore_ec2.type_defs import FilterTypeDef, TagTypeDef +from types_aiobotocore_ec2.type_defs import ( + FilterTypeDef, + TagTypeDef, +) from ._error_handler import ec2_exception_handler -from ._errors import EC2InstanceNotFoundError, EC2TooManyInstancesError +from ._errors import ( + EC2InstanceNotFoundError, + EC2InsufficientCapacityError, + EC2SubnetsNotEnoughIPsError, +) from ._models import ( AWSTagKey, EC2InstanceConfig, @@ -25,7 +32,13 @@ EC2Tags, Resources, ) -from ._utils import compose_user_data, ec2_instance_data_from_aws_instance +from ._utils import ( + check_max_number_of_instances_not_exceeded, + compose_user_data, + ec2_instance_data_from_aws_instance, + get_subnet_azs, + get_subnet_capacity, +) _logger = logging.getLogger(__name__) @@ -92,6 +105,11 @@ async def get_ec2_instance_capabilities( list_instances: list[EC2InstanceType] = [] for instance in instance_types.get("InstanceTypes", []): with contextlib.suppress(KeyError): + assert "InstanceType" in instance # nosec + assert "VCpuInfo" in instance # nosec + assert "DefaultVCpus" in instance["VCpuInfo"] # nosec + assert "MemoryInfo" in instance # nosec + assert "SizeInMiB" in instance["MemoryInfo"] # nosec list_instances.append( EC2InstanceType( name=instance["InstanceType"], @@ -118,94 +136,145 @@ async def launch_instances( Arguments: instance_config -- The EC2 instance configuration - min_number_of_instances -- the minimal number of instances needed (fails if this amount cannot be reached) + min_number_of_instances -- the minimal number of instances required (fails if this amount cannot be reached) number_of_instances -- the ideal number of instances needed (it it cannot be reached AWS will return a number >=min_number_of_instances) - - Keyword Arguments: - max_total_number_of_instances -- The total maximum allowed number of instances for this given instance_config (default: {10}) + max_total_number_of_instances -- The total maximum allowed number of instances for this given instance_config Raises: - EC2TooManyInstancesError: + EC2TooManyInstancesError: max_total_number_of_instances would be exceeded + EC2SubnetsNotEnoughIPsError: not enough IPs in the subnets + EC2InsufficientCapacityError: not enough capacity in the subnets + Returns: The created instance data infos """ + with log_context( _logger, logging.INFO, - msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}", + msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name}" + f" with {instance_config.tags=} in {instance_config.subnet_ids=}", ): # first check the max amount is not already reached - current_instances = await self.get_instances( - key_names=[instance_config.key_name], tags=instance_config.tags + await check_max_number_of_instances_not_exceeded( + self, + instance_config, + required_number_instances=number_of_instances, + max_total_number_of_instances=max_total_number_of_instances, ) - if ( - len(current_instances) + number_of_instances - > max_total_number_of_instances - ): - raise EC2TooManyInstancesError( - num_instances=max_total_number_of_instances + + # NOTE: checking subnets capacity is not strictly needed as AWS will do it for us + # but it gives us a chance to give early feedback to the user + # and avoid trying to launch instances in subnets that are already full + # and also allows to circumvent a moto bug that does not raise + # InsufficientInstanceCapacity when a subnet is full + subnet_id_to_available_ips = await get_subnet_capacity( + self.client, subnet_ids=instance_config.subnet_ids + ) + + total_available_ips = sum(subnet_id_to_available_ips.values()) + if total_available_ips < min_number_of_instances: + raise EC2SubnetsNotEnoughIPsError( + subnet_ids=instance_config.subnet_ids, + instance_type=instance_config.type.name, + available_ips=total_available_ips, ) + # now let's not try to run instances in subnets that have not enough IPs + subnet_ids_with_capacity = [ + subnet_id + for subnet_id, capacity in subnet_id_to_available_ips.items() + if capacity >= min_number_of_instances + ] + resource_tags: list[TagTypeDef] = [ {"Key": tag_key, "Value": tag_value} for tag_key, tag_value in instance_config.tags.items() ] - instances = await self.client.run_instances( - ImageId=instance_config.ami_id, - MinCount=min_number_of_instances, - MaxCount=number_of_instances, - IamInstanceProfile=( - {"Arn": instance_config.iam_instance_profile} - if instance_config.iam_instance_profile - else {} - ), - InstanceType=instance_config.type.name, - InstanceInitiatedShutdownBehavior="terminate", - KeyName=instance_config.key_name, - TagSpecifications=[ - {"ResourceType": "instance", "Tags": resource_tags}, - {"ResourceType": "volume", "Tags": resource_tags}, - {"ResourceType": "network-interface", "Tags": resource_tags}, - ], - UserData=compose_user_data(instance_config.startup_script), - NetworkInterfaces=[ - { - "AssociatePublicIpAddress": True, - "DeviceIndex": 0, - "SubnetId": instance_config.subnet_id, - "Groups": instance_config.security_group_ids, - } - ], - ) - instance_ids = [i["InstanceId"] for i in instances["Instances"]] - _logger.info( - "%s New instances launched: %s, waiting for them to start now...", - len(instance_ids), - instance_ids, - ) + # Try each subnet in order until one succeeds + for subnet_id in subnet_ids_with_capacity: + try: + _logger.debug( + "Attempting to launch instances in subnet %s", subnet_id + ) - # wait for the instance to be in a pending state - # NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html - waiter = self.client.get_waiter("instance_exists") - await waiter.wait(InstanceIds=instance_ids) - _logger.debug("instances %s exists now.", instance_ids) + instances = await self.client.run_instances( + ImageId=instance_config.ami_id, + MinCount=min_number_of_instances, + MaxCount=number_of_instances, + IamInstanceProfile=( + {"Arn": instance_config.iam_instance_profile} + if instance_config.iam_instance_profile + else {} + ), + InstanceType=instance_config.type.name, + InstanceInitiatedShutdownBehavior="terminate", + KeyName=instance_config.key_name, + TagSpecifications=[ + {"ResourceType": "instance", "Tags": resource_tags}, + {"ResourceType": "volume", "Tags": resource_tags}, + { + "ResourceType": "network-interface", + "Tags": resource_tags, + }, + ], + UserData=compose_user_data(instance_config.startup_script), + NetworkInterfaces=[ + { + "AssociatePublicIpAddress": True, + "DeviceIndex": 0, + "SubnetId": subnet_id, + "Groups": instance_config.security_group_ids, + } + ], + ) + # If we get here, the launch succeeded + break + except botocore.exceptions.ClientError as exc: + error_code = exc.response.get("Error", {}).get("Code") + if error_code == "InsufficientInstanceCapacity": + _logger.warning( + "Insufficient capacity in subnet %s for instance type %s, trying next subnet", + subnet_id, + instance_config.type.name, + ) + continue + # For any other ClientError, re-raise to let the decorator handle it + raise + + else: + subnet_zones = await get_subnet_azs( + self.client, subnet_ids=subnet_ids_with_capacity + ) + raise EC2InsufficientCapacityError( + availability_zones=subnet_zones, + instance_type=instance_config.type.name, + ) + instance_ids = [ + i["InstanceId"] # pyright: ignore[reportTypedDictNotRequiredAccess] + for i in instances["Instances"] + ] + with log_context( + _logger, + logging.INFO, + msg=f"{len(instance_ids)} instances: {instance_ids=} launched. Wait to reach pending state", + ): + # wait for the instance to be in a pending state + # NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html + waiter = self.client.get_waiter("instance_exists") + await waiter.wait(InstanceIds=instance_ids) - # NOTE: waiting for pending ensure we get all the IPs back + # NOTE: waiting for pending ensures we get all the IPs back described_instances = await self.client.describe_instances( InstanceIds=instance_ids ) assert "Instances" in described_instances["Reservations"][0] # nosec - instance_datas = [ + return [ await ec2_instance_data_from_aws_instance(self, i) for i in described_instances["Reservations"][0]["Instances"] ] - _logger.info( - "%s are pending now", - f"{instance_ids=}", - ) - return instance_datas @ec2_exception_handler(_logger) async def get_instances( diff --git a/packages/aws-library/src/aws_library/ec2/_error_handler.py b/packages/aws-library/src/aws_library/ec2/_error_handler.py index 8984cf6a0a36..a8ed79717aa7 100644 --- a/packages/aws-library/src/aws_library/ec2/_error_handler.py +++ b/packages/aws-library/src/aws_library/ec2/_error_handler.py @@ -1,7 +1,16 @@ import functools import logging +import re from collections.abc import Callable, Coroutine -from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Concatenate, + Final, + ParamSpec, + TypeVar, + cast, +) from botocore import exceptions as botocore_exc @@ -9,6 +18,7 @@ EC2AccessError, EC2InstanceNotFoundError, EC2InstanceTypeInvalidError, + EC2InsufficientCapacityError, EC2NotConnectedError, EC2RuntimeError, EC2TimeoutError, @@ -26,30 +36,46 @@ Self = TypeVar("Self", bound="SimcoreEC2API") +_INSUFFICIENT_CAPACITY_ERROR_MSG_PATTERN: Final[re.Pattern] = re.compile( + r"sufficient (?P\S+) capacity in the Availability Zone you requested " + r"\((?P\S+)\)" +) + + def _map_botocore_client_exception( botocore_error: botocore_exc.ClientError, *args, # pylint: disable=unused-argument # noqa: ARG001 **kwargs, # pylint: disable=unused-argument # noqa: ARG001 ) -> EC2AccessError: - status_code = int( - botocore_error.response.get("ResponseMetadata", {}).get("HTTPStatusCode") - or botocore_error.response.get("Error", {}).get("Code", -1) + # see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services + status_code = cast( + int, + botocore_error.response.get("ResponseMetadata", {}).get("HTTPStatusCode", "-1"), ) + error_code = botocore_error.response.get("Error", {}).get("Code", "Unknown") + error_msg = botocore_error.response.get("Error", {}).get("Message", "Unknown") operation_name = botocore_error.operation_name - match status_code, operation_name: - case 400, "StartInstances": + match error_code: + case "InvalidInstanceID.NotFound": return EC2InstanceNotFoundError() - case 400, "StopInstances": - return EC2InstanceNotFoundError() - case 400, "TerminateInstances": - return EC2InstanceNotFoundError() - case 400, "DescribeInstanceTypes": + case "InvalidInstanceType": return EC2InstanceTypeInvalidError() + case "InsufficientInstanceCapacity": + availability_zone = "unknown" + instance_type = "unknown" + if match := re.search(_INSUFFICIENT_CAPACITY_ERROR_MSG_PATTERN, error_msg): + instance_type = match.group("instance_type") + availability_zone = match.group("failed_az") + + raise EC2InsufficientCapacityError( + availability_zones=availability_zone, instance_type=instance_type + ) case _: return EC2AccessError( + status_code=status_code, operation_name=operation_name, - code=status_code, - error=f"{botocore_error}", + code=error_code, + error=error_msg, ) diff --git a/packages/aws-library/src/aws_library/ec2/_errors.py b/packages/aws-library/src/aws_library/ec2/_errors.py index 4fb0e611ed2b..81a0d0c1d695 100644 --- a/packages/aws-library/src/aws_library/ec2/_errors.py +++ b/packages/aws-library/src/aws_library/ec2/_errors.py @@ -16,7 +16,7 @@ class EC2NotConnectedError(EC2RuntimeError): class EC2AccessError(EC2RuntimeError): msg_template: str = ( - "Unexpected error while accessing EC2 backend: {operation_name}:{code}:{error}" + "Unexpected error while accessing EC2 backend responded with {status_code}: {operation_name}:{code}:{error}" ) @@ -36,3 +36,16 @@ class EC2TooManyInstancesError(EC2AccessError): msg_template: str = ( "The maximum amount of instances {num_instances} is already reached!" ) + + +class EC2InsufficientCapacityError(EC2AccessError): + msg_template: str = ( + "Insufficient capacity in {availability_zones} for {instance_type}" + ) + + +class EC2SubnetsNotEnoughIPsError(EC2AccessError): + msg_template: str = ( + "Not enough free IPs in subnet(s) {subnet_ids} for {num_instances} instances" + ". Only {available_ips} IPs available." + ) diff --git a/packages/aws-library/src/aws_library/ec2/_models.py b/packages/aws-library/src/aws_library/ec2/_models.py index ada7482452e8..b136fb503007 100644 --- a/packages/aws-library/src/aws_library/ec2/_models.py +++ b/packages/aws-library/src/aws_library/ec2/_models.py @@ -17,6 +17,7 @@ StringConstraints, field_validator, ) +from pydantic.config import JsonDict from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType @@ -134,7 +135,7 @@ class EC2InstanceConfig: ami_id: str key_name: str security_group_ids: list[str] - subnet_id: str + subnet_ids: list[str] iam_instance_profile: str @@ -179,68 +180,74 @@ def validate_bash_calls(cls, v): temp_file.writelines(v) temp_file.flush() # NOTE: this will not capture runtime errors, but at least some syntax errors such as invalid quotes - sh.bash("-n", temp_file.name) + sh.bash("-n", temp_file.name) # pyright: ignore[reportCallIssue] # sh is untyped, but this call is safe for bash syntax checking except sh.ErrorReturnCode as exc: msg = f"Invalid bash call in custom_boot_scripts: {v}, Error: {exc.stderr}" raise ValueError(msg) from exc return v + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + # just AMI + "ami_id": "ami-123456789abcdef", + }, + { + # AMI + scripts + "ami_id": "ami-123456789abcdef", + "custom_boot_scripts": ["ls -tlah", "echo blahblah"], + }, + { + # AMI + scripts + pre-pull + "ami_id": "ami-123456789abcdef", + "custom_boot_scripts": ["ls -tlah", "echo blahblah"], + "pre_pull_images": [ + "nginx:latest", + "itisfoundation/my-very-nice-service:latest", + "simcore/services/dynamic/another-nice-one:2.4.5", + "asd", + ], + }, + { + # AMI + pre-pull + "ami_id": "ami-123456789abcdef", + "pre_pull_images": [ + "nginx:latest", + "itisfoundation/my-very-nice-service:latest", + "simcore/services/dynamic/another-nice-one:2.4.5", + "asd", + ], + }, + { + # AMI + pre-pull + cron + "ami_id": "ami-123456789abcdef", + "pre_pull_images": [ + "nginx:latest", + "itisfoundation/my-very-nice-service:latest", + "simcore/services/dynamic/another-nice-one:2.4.5", + "asd", + ], + "pre_pull_images_cron_interval": "01:00:00", + }, + { + # AMI + pre-pull + buffer count + "ami_id": "ami-123456789abcdef", + "pre_pull_images": [ + "nginx:latest", + "itisfoundation/my-very-nice-service:latest", + "simcore/services/dynamic/another-nice-one:2.4.5", + "asd", + ], + "buffer_count": 10, + }, + ] + } + ) + model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - # just AMI - "ami_id": "ami-123456789abcdef", - }, - { - # AMI + scripts - "ami_id": "ami-123456789abcdef", - "custom_boot_scripts": ["ls -tlah", "echo blahblah"], - }, - { - # AMI + scripts + pre-pull - "ami_id": "ami-123456789abcdef", - "custom_boot_scripts": ["ls -tlah", "echo blahblah"], - "pre_pull_images": [ - "nginx:latest", - "itisfoundation/my-very-nice-service:latest", - "simcore/services/dynamic/another-nice-one:2.4.5", - "asd", - ], - }, - { - # AMI + pre-pull - "ami_id": "ami-123456789abcdef", - "pre_pull_images": [ - "nginx:latest", - "itisfoundation/my-very-nice-service:latest", - "simcore/services/dynamic/another-nice-one:2.4.5", - "asd", - ], - }, - { - # AMI + pre-pull + cron - "ami_id": "ami-123456789abcdef", - "pre_pull_images": [ - "nginx:latest", - "itisfoundation/my-very-nice-service:latest", - "simcore/services/dynamic/another-nice-one:2.4.5", - "asd", - ], - "pre_pull_images_cron_interval": "01:00:00", - }, - { - # AMI + pre-pull + buffer count - "ami_id": "ami-123456789abcdef", - "pre_pull_images": [ - "nginx:latest", - "itisfoundation/my-very-nice-service:latest", - "simcore/services/dynamic/another-nice-one:2.4.5", - "asd", - ], - "buffer_count": 10, - }, - ] - } + json_schema_extra=_update_json_schema_extra, ) diff --git a/packages/aws-library/src/aws_library/ec2/_utils.py b/packages/aws-library/src/aws_library/ec2/_utils.py index d16be2cf9ead..33e31c5356b4 100644 --- a/packages/aws-library/src/aws_library/ec2/_utils.py +++ b/packages/aws-library/src/aws_library/ec2/_utils.py @@ -1,9 +1,14 @@ from textwrap import dedent from typing import TYPE_CHECKING, cast -from types_aiobotocore_ec2.type_defs import InstanceTypeDef +from types_aiobotocore_ec2 import EC2Client +from types_aiobotocore_ec2.type_defs import ( + InstanceTypeDef, + SubnetTypeDef, +) -from ._models import EC2InstanceData, EC2Tags +from ._errors import EC2TooManyInstancesError +from ._models import EC2InstanceConfig, EC2InstanceData, EC2Tags if TYPE_CHECKING: from ._client import SimcoreEC2API @@ -43,3 +48,60 @@ async def ec2_instance_data_from_aws_instance( resources=ec2_instance_types[0].resources, tags=cast(EC2Tags, {tag["Key"]: tag["Value"] for tag in instance["Tags"]}), ) + + +async def check_max_number_of_instances_not_exceeded( + ec2_client: "SimcoreEC2API", + instance_config: EC2InstanceConfig, + *, + required_number_instances: int, + max_total_number_of_instances: int, +) -> None: + current_instances = await ec2_client.get_instances( + key_names=[instance_config.key_name], tags=instance_config.tags + ) + if ( + len(current_instances) + required_number_instances + > max_total_number_of_instances + ): + raise EC2TooManyInstancesError(num_instances=max_total_number_of_instances) + + +async def get_subnet_capacity( + aioboto_ec2_client: EC2Client, *, subnet_ids: list[str] +) -> dict[str, int]: + subnets = await aioboto_ec2_client.describe_subnets(SubnetIds=subnet_ids) + assert "Subnets" in subnets # nosec + subnet_id_to_subnet_map: dict[str, SubnetTypeDef] = { + subnet["SubnetId"]: subnet # pyright: ignore[reportTypedDictNotRequiredAccess] + for subnet in subnets["Subnets"] + } + # preserve the order of instance_config.subnet_ids + + subnet_id_to_available_ips: dict[str, int] = { + subnet_id: subnet_id_to_subnet_map[subnet_id][ + "AvailableIpAddressCount" + ] # pyright: ignore[reportTypedDictNotRequiredAccess] + for subnet_id in subnet_ids + } + return subnet_id_to_available_ips + + +async def get_subnet_azs( + aioboto_ec2_client: EC2Client, *, subnet_ids: list[str] +) -> list[str]: + subnets = await aioboto_ec2_client.describe_subnets(SubnetIds=subnet_ids) + assert "Subnets" in subnets # nosec + subnet_id_to_subnet_map: dict[str, SubnetTypeDef] = { + subnet["SubnetId"]: subnet # pyright: ignore[reportTypedDictNotRequiredAccess] + for subnet in subnets["Subnets"] + } + # preserve the order of instance_config.subnet_ids + + subnet_azs: list[str] = [ + subnet_id_to_subnet_map[subnet_id][ + "AvailabilityZone" + ] # pyright: ignore[reportTypedDictNotRequiredAccess] + for subnet_id in subnet_ids + ] + return subnet_azs diff --git a/packages/aws-library/tests/test_ec2_client.py b/packages/aws-library/tests/test_ec2_client.py index a1cbdf55c570..535421f65358 100644 --- a/packages/aws-library/tests/test_ec2_client.py +++ b/packages/aws-library/tests/test_ec2_client.py @@ -4,9 +4,9 @@ import random -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import fields -from typing import cast, get_args +from typing import Any, Final, cast, get_args import botocore.exceptions import pytest @@ -14,6 +14,8 @@ from aws_library.ec2._errors import ( EC2InstanceNotFoundError, EC2InstanceTypeInvalidError, + EC2InsufficientCapacityError, + EC2SubnetsNotEnoughIPsError, EC2TooManyInstancesError, ) from aws_library.ec2._models import ( @@ -25,13 +27,15 @@ ) from faker import Faker from moto.server import ThreadedMotoServer +from pydantic import TypeAdapter +from pytest_mock import MockerFixture from settings_library.ec2 import EC2Settings from types_aiobotocore_ec2 import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType def _ec2_allowed_types() -> list[InstanceTypeType]: - return ["t2.nano", "m5.12xlarge", "g4dn.4xlarge"] + return ["m5.12xlarge"] @pytest.fixture(scope="session") @@ -97,7 +101,7 @@ def ec2_instance_config( ami_id=aws_ami_id, key_name=faker.pystr(), security_group_ids=[aws_security_group_id], - subnet_id=aws_subnet_id, + subnet_ids=[aws_subnet_id], iam_instance_profile="", ) @@ -529,7 +533,8 @@ async def test_set_instance_tags( # now remove some, this should do nothing await simcore_ec2_api.remove_instances_tags( - created_instances, tag_keys=[AWSTagKey("whatever_i_dont_exist")] + created_instances, + tag_keys=[TypeAdapter(AWSTagKey).validate_python("whatever_i_dont_exist")], ) await _assert_instances_in_ec2( ec2_client, @@ -575,3 +580,424 @@ async def test_remove_instance_tags_not_existing_raises( await simcore_ec2_api.remove_instances_tags( [fake_ec2_instance_data()], tag_keys=[] ) + + +async def test_launch_instances_insufficient_capacity_fallback( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + fake_ec2_instance_type: EC2InstanceType, + faker: Faker, + aws_subnet_id: str, + create_aws_subnet_id: Callable[[], Awaitable[str]], + aws_security_group_id: str, + aws_ami_id: str, + mocker: MockerFixture, +): + await _assert_no_instances_in_ec2(ec2_client) + + # Create additional valid subnets for testing + subnet1_id = aws_subnet_id + subnet2_id = await create_aws_subnet_id() + + # Create a config with multiple valid subnet IDs + ec2_instance_config = EC2InstanceConfig( + type=fake_ec2_instance_type, + tags=faker.pydict(allowed_types=(str,)), + startup_script=faker.pystr(), + ami_id=aws_ami_id, + key_name=faker.pystr(), + security_group_ids=[aws_security_group_id], + subnet_ids=[subnet1_id, subnet2_id], + iam_instance_profile="", + ) + + # Mock the EC2 client to simulate InsufficientInstanceCapacity on first subnet + original_run_instances = simcore_ec2_api.client.run_instances + call_count = 0 + + async def mock_run_instances(*args, **kwargs) -> Any: + nonlocal call_count + call_count += 1 + if call_count == 1: + assert kwargs["NetworkInterfaces"][0]["SubnetId"] == subnet1_id + # First call (first subnet) - simulate insufficient capacity + error_response: dict[str, Any] = { + "Error": { + "Code": "InsufficientInstanceCapacity", + "Message": "An error occurred (InsufficientInstanceCapacity) when calling the RunInstances operation (reached max retries: 4): We currently do not have sufficient g4dn.4xlarge capacity in the Availability Zone you requested (us-east-1a). Our system will be working on provisioning additional capacity. You can currently get g4dn.4xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1b, us-east-1c, us-east-1d, us-east-1f", + }, + } + raise botocore.exceptions.ClientError(error_response, "RunInstances") # type: ignore + # Second call (second subnet) - succeed normally + assert kwargs["NetworkInterfaces"][0]["SubnetId"] == subnet2_id + return await original_run_instances(*args, **kwargs) + + # Apply the mock + mocker.patch.object( + simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances + ) + instances = await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=1, + number_of_instances=1, + ) + + # Verify that run_instances was called twice (once for each subnet) + assert call_count == 2 + + # Verify that the instance was created (in the second subnet) + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=1, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="running", + ) + + # Verify the instance was created in the second subnet (since first failed) + instance_details = await ec2_client.describe_instances( + InstanceIds=[instances[0].id] + ) + assert "Reservations" in instance_details + assert len(instance_details["Reservations"]) >= 1 + assert "Instances" in instance_details["Reservations"][0] + assert len(instance_details["Reservations"][0]["Instances"]) >= 1 + instance = instance_details["Reservations"][0]["Instances"][0] + assert "SubnetId" in instance + assert instance["SubnetId"] == subnet2_id + + +async def test_launch_instances_all_subnets_insufficient_capacity_raises_error( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + fake_ec2_instance_type: EC2InstanceType, + faker: Faker, + aws_subnet_id: str, + create_aws_subnet_id: Callable[[], Awaitable[str]], + aws_security_group_id: str, + aws_ami_id: str, + mocker: MockerFixture, +): + await _assert_no_instances_in_ec2(ec2_client) + + # Create additional valid subnets for testing + subnet1_id = aws_subnet_id + subnet2_id = await create_aws_subnet_id() + subnet3_id = await create_aws_subnet_id() + + # Create a config with multiple valid subnet IDs + ec2_instance_config = EC2InstanceConfig( + type=fake_ec2_instance_type, + tags=faker.pydict(allowed_types=(str,)), + startup_script=faker.pystr(), + ami_id=aws_ami_id, + key_name=faker.pystr(), + security_group_ids=[aws_security_group_id], + subnet_ids=[subnet1_id, subnet2_id, subnet3_id], + iam_instance_profile="", + ) + + # Mock the EC2 client to simulate InsufficientInstanceCapacity on ALL subnets + call_count = 0 + + async def mock_run_instances(*args, **kwargs) -> Any: + nonlocal call_count + call_count += 1 + # Always simulate insufficient capacity + error_response = { + "Error": { + "Code": "InsufficientInstanceCapacity", + "Message": "An error occurred (InsufficientInstanceCapacity) when calling the RunInstances operation (reached max retries: 4): We currently do not have sufficient g4dn.4xlarge capacity in the Availability Zone you requested (us-east-1a). Our system will be working on provisioning additional capacity. You can currently get g4dn.4xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1b, us-east-1c, us-east-1d, us-east-1f", + }, + } + raise botocore.exceptions.ClientError(error_response, "RunInstances") # type: ignore + + # Apply the mock and expect EC2InsufficientCapacityError + mocker.patch.object( + simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances + ) + with pytest.raises(EC2InsufficientCapacityError) as exc_info: + await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=1, + number_of_instances=1, + ) + + # Verify that run_instances was called for both subnets + assert call_count == 3 + + # Verify the error contains the expected information + assert hasattr(exc_info.value, "instance_type") + assert exc_info.value.instance_type == fake_ec2_instance_type.name # type: ignore + + # Verify no instances were created + await _assert_no_instances_in_ec2(ec2_client) + + +async def test_launch_instances_partial_capacity_then_insufficient_capacity( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + fake_ec2_instance_type: EC2InstanceType, + faker: Faker, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + mocker: MockerFixture, +): + """Test that launch_instances handles partial capacity correctly. + + First call: ask for 3 instances (min 1) -> should get 2, no error + Second call: ask for 3 instances (min 1) -> should raise EC2InsufficientCapacityError + """ + await _assert_no_instances_in_ec2(ec2_client) + + # Create a config with a single subnet (as requested) + ec2_instance_config = EC2InstanceConfig( + type=fake_ec2_instance_type, + tags=faker.pydict(allowed_types=(str,)), + startup_script=faker.pystr(), + ami_id=aws_ami_id, + key_name=faker.pystr(), + security_group_ids=[aws_security_group_id], + subnet_ids=[aws_subnet_id], # Single subnet only + iam_instance_profile="", + ) + + # Mock the EC2 client to simulate partial capacity behavior + original_run_instances = simcore_ec2_api.client.run_instances + call_count = 0 + + async def mock_run_instances(*args, **kwargs): + nonlocal call_count + call_count += 1 + + if call_count == 1: + # First call: return only 2 instances when 3 were requested + # Simulate that the subnet has capacity for only 2 machines + required_instances = kwargs["MaxCount"] + kwargs_copy = kwargs.copy() + kwargs_copy["MinCount"] = required_instances - 1 + kwargs_copy["MaxCount"] = required_instances - 1 + return await original_run_instances(*args, **kwargs_copy) + + # Second call: simulate insufficient capacity (subnet is full) + error_response = { + "Error": { + "Code": "InsufficientInstanceCapacity", + "Message": "An error occurred (InsufficientInstanceCapacity) when calling the RunInstances operation (reached max retries: 4): We currently do not have sufficient g4dn.4xlarge capacity in the Availability Zone you requested (us-east-1a). Our system will be working on provisioning additional capacity. You can currently get g4dn.4xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1b, us-east-1c, us-east-1d, us-east-1f", + }, + } + raise botocore.exceptions.ClientError(error_response, "RunInstances") # type: ignore + + # Apply the mock for the first call + mocker.patch.object( + simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances + ) + # First call: ask for 3 instances (min 1) -> should get 2, no error + instances = await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=1, + number_of_instances=3, + ) + + # Verify we got 2 instances (partial capacity) + assert len(instances) == 2 + assert call_count == 1 + + # Verify instances were created + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=2, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="running", + ) + + # Second call: ask for 3 instances (min 1) -> should raise EC2InsufficientCapacityError + with pytest.raises(EC2InsufficientCapacityError) as exc_info: + await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=1, + number_of_instances=3, + ) + + # Verify that run_instances was called twice total + assert call_count == 2 + + # Verify the error contains the expected information + subnet_desc = await ec2_client.describe_subnets(SubnetIds=[aws_subnet_id]) + assert hasattr(exc_info.value, "instance_type") + assert exc_info.value.instance_type == fake_ec2_instance_type.name # type: ignore + assert exc_info.value.availability_zones == [ # type: ignore + subnet_desc["Subnets"][0]["AvailabilityZone"] # type: ignore + ] + + # Verify still only 2 instances exist (no new ones were created) + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=2, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="running", + ) + + +_RESERVED_IPS: Final[int] = 5 # AWS reserves 5 IPs in each subnet + + +@pytest.fixture +async def with_small_subnet( + create_aws_subnet_id: Callable[..., Awaitable[str]], +) -> tuple[str, int]: + """Creates a subnet with a single IP address to simulate InsufficientInstanceCapacity""" + single_ip_cidr = ( + "10.0.11.0/29" # /29 is the minimum allowed by AWS, gives 8 addresses + ) + return ( + await create_aws_subnet_id(single_ip_cidr), + 8 - _RESERVED_IPS, + ) # 5 are reserved by AWS + + +async def test_launch_instances_with_small_subnet( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + fake_ec2_instance_type: EC2InstanceType, + faker: Faker, + with_small_subnet: tuple[str, int], + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + mocker: MockerFixture, +): + await _assert_no_instances_in_ec2(ec2_client) + small_subnet_id, capacity = with_small_subnet + # Create a config with a single subnet (as requested) + ec2_instance_config = EC2InstanceConfig( + type=fake_ec2_instance_type, + tags=faker.pydict(allowed_types=(str,)), + startup_script=faker.pystr(), + ami_id=aws_ami_id, + key_name=faker.pystr(), + security_group_ids=[aws_security_group_id], + subnet_ids=[small_subnet_id, aws_subnet_id], + iam_instance_profile="", + ) + + # first call shall work in the first subnet + instances = await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=capacity, + number_of_instances=capacity, + ) + + # Verify we got 2 instances (partial capacity) + assert len(instances) == capacity + + # Verify instances were created + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=capacity, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="running", + ) + + instances = await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=1, + number_of_instances=1, + ) + + +async def test_launch_instances_raises_ec2_subnets_not_enough_ips_error( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + fake_ec2_instance_type: EC2InstanceType, + faker: Faker, + create_aws_subnet_id: Callable[..., Awaitable[str]], + aws_security_group_id: str, + aws_ami_id: str, + mocker: MockerFixture, +) -> None: + """Test that EC2SubnetsNotEnoughIPsError is raised when subnets don't have enough IPs.""" + await _assert_no_instances_in_ec2(ec2_client) + + # Create additional small subnets + subnet1_id = await create_aws_subnet_id("10.0.200.0/29") # 3 usable IPs + subnet2_id = await create_aws_subnet_id("10.0.201.0/29") # 3 usable IPs + + ec2_instance_config = EC2InstanceConfig( + type=fake_ec2_instance_type, + tags=faker.pydict(allowed_types=(str,)), + startup_script=faker.pystr(), + ami_id=aws_ami_id, + key_name=faker.pystr(), + security_group_ids=[aws_security_group_id], + subnet_ids=[subnet1_id, subnet2_id], + iam_instance_profile="", + ) + + with pytest.raises(EC2SubnetsNotEnoughIPsError) as exc_info: + await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=7, + number_of_instances=7, + ) + + error = exc_info.value + assert error.subnet_ids == [subnet1_id, subnet2_id] # type: ignore + assert error.instance_type == fake_ec2_instance_type.name # type: ignore + assert error.available_ips == 6 # type: ignore + + +@pytest.mark.xfail( + reason="if the user asks for a minimum number of instances that cannot fit a subnet, then it currently raises! " + "it is currently not required that the instances are distributed among subnets" +) +async def test_launch_instances_distributes_instances_among_subnets( + simcore_ec2_api: SimcoreEC2API, + ec2_client: EC2Client, + fake_ec2_instance_type: EC2InstanceType, + faker: Faker, + create_aws_subnet_id: Callable[..., Awaitable[str]], + aws_security_group_id: str, + aws_ami_id: str, + mocker: MockerFixture, +) -> None: + """Test that EC2SubnetsNotEnoughIPsError is raised when subnets don't have enough IPs.""" + await _assert_no_instances_in_ec2(ec2_client) + + # Create additional small subnets + subnet1_id = await create_aws_subnet_id("10.0.200.0/29") # 3 usable IPs + subnet2_id = await create_aws_subnet_id("10.0.201.0/29") # 3 usable IPs + + ec2_instance_config = EC2InstanceConfig( + type=fake_ec2_instance_type, + tags=faker.pydict(allowed_types=(str,)), + startup_script=faker.pystr(), + ami_id=aws_ami_id, + key_name=faker.pystr(), + security_group_ids=[aws_security_group_id], + subnet_ids=[subnet1_id, subnet2_id], + iam_instance_profile="", + ) + + await simcore_ec2_api.launch_instances( + ec2_instance_config, + min_number_of_instances=5, + number_of_instances=5, + ) + + await _assert_instances_in_ec2( + ec2_client, + expected_num_reservations=1, + expected_num_instances=5, + expected_instance_type=ec2_instance_config.type, + expected_tags=ec2_instance_config.tags, + expected_state="running", + ) diff --git a/packages/pytest-simcore/src/pytest_simcore/aws_ec2_service.py b/packages/pytest-simcore/src/pytest_simcore/aws_ec2_service.py index f971ef9b8f7d..efca123aaa30 100644 --- a/packages/pytest-simcore/src/pytest_simcore/aws_ec2_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/aws_ec2_service.py @@ -5,7 +5,7 @@ import contextlib import datetime import random -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from typing import cast import aioboto3 @@ -60,45 +60,79 @@ async def aws_vpc_id( print(f"<-- Deleted Vpc in AWS with {vpc_id=}") -@pytest.fixture(scope="session") -def subnet_cidr_block() -> str: - return "10.0.1.0/24" +@pytest.fixture +def create_subnet_cidr_block(faker: Faker) -> Callable[[], str]: + # Keep track of used subnet numbers to avoid overlaps + used_subnets: set[int] = set() + + def _() -> str: + # Generate subnet CIDR blocks within the VPC range 10.0.0.0/16 + # Using /24 subnets (10.0.X.0/24) where X is between 1-255 + while True: + subnet_number = faker.random_int(min=1, max=255) + if subnet_number not in used_subnets: + used_subnets.add(subnet_number) + return f"10.0.{subnet_number}.0/24" + + return _ @pytest.fixture -async def aws_subnet_id( +def subnet_cidr_block(create_subnet_cidr_block: Callable[[], str]) -> str: + return create_subnet_cidr_block() + + +@pytest.fixture +async def create_aws_subnet_id( aws_vpc_id: str, ec2_client: EC2Client, - subnet_cidr_block: str, -) -> AsyncIterator[str]: - subnet = await ec2_client.create_subnet( - CidrBlock=subnet_cidr_block, VpcId=aws_vpc_id - ) - assert "Subnet" in subnet - assert "SubnetId" in subnet["Subnet"] - subnet_id = subnet["Subnet"]["SubnetId"] - print(f"--> Created Subnet in AWS with {subnet_id=}") + create_subnet_cidr_block: Callable[[], str], +) -> AsyncIterator[Callable[..., Awaitable[str]]]: + created_subnet_ids: set[str] = set() - yield subnet_id + async def _(cidr_override: str | None = None) -> str: + subnet = await ec2_client.create_subnet( + CidrBlock=cidr_override or create_subnet_cidr_block(), VpcId=aws_vpc_id + ) + assert "Subnet" in subnet + assert "SubnetId" in subnet["Subnet"] + subnet_id = subnet["Subnet"]["SubnetId"] + print(f"--> Created Subnet in AWS with {subnet_id=}") + created_subnet_ids.add(subnet_id) + return subnet_id + yield _ + + # cleanup # all the instances in the subnet must be terminated before that works - instances_in_subnet = await ec2_client.describe_instances( - Filters=[{"Name": "subnet-id", "Values": [subnet_id]}] - ) - if instances_in_subnet["Reservations"]: - print(f"--> terminating {len(instances_in_subnet)} instances in subnet") - await ec2_client.terminate_instances( - InstanceIds=[ - instance["Instances"][0]["InstanceId"] # type: ignore - for instance in instances_in_subnet["Reservations"] - ] + for subnet_id in created_subnet_ids: + instances_in_subnet = await ec2_client.describe_instances( + Filters=[{"Name": "subnet-id", "Values": [subnet_id]}] ) - print(f"<-- terminated {len(instances_in_subnet)} instances in subnet") + if instances_in_subnet["Reservations"]: + print(f"--> terminating {len(instances_in_subnet)} instances in subnet") + await ec2_client.terminate_instances( + InstanceIds=[ + instance["Instances"][0]["InstanceId"] # type: ignore + for instance in instances_in_subnet["Reservations"] + ] + ) + print(f"<-- terminated {len(instances_in_subnet)} instances in subnet") - await ec2_client.delete_subnet(SubnetId=subnet_id) - subnets = await ec2_client.describe_subnets() - print(f"<-- Deleted Subnet in AWS with {subnet_id=}") - print(f"current {subnets=}") + await ec2_client.delete_subnet(SubnetId=subnet_id) + subnets = await ec2_client.describe_subnets() + print(f"<-- Deleted Subnet in AWS with {subnet_id=}") + print(f"current {subnets=}") + + +@pytest.fixture +async def aws_subnet_id( + aws_vpc_id: str, + ec2_client: EC2Client, + subnet_cidr_block: str, + create_aws_subnet_id: Callable[[], Awaitable[str]], +) -> str: + return await create_aws_subnet_id() @pytest.fixture @@ -133,7 +167,7 @@ def _creator(**overrides) -> EC2InstanceData: return EC2InstanceData( **( { - "launch_time": faker.date_time(tzinfo=datetime.timezone.utc), + "launch_time": faker.date_time(tzinfo=datetime.UTC), "id": faker.uuid4(), "aws_private_dns": f"ip-{faker.ipv4().replace('.', '-')}.ec2.internal", "aws_public_ip": faker.ipv4(), diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/monkeypatch_envs.py b/packages/pytest-simcore/src/pytest_simcore/helpers/monkeypatch_envs.py index d81356144304..13ca659d2a4a 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/monkeypatch_envs.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/monkeypatch_envs.py @@ -3,6 +3,7 @@ """ import os +from collections.abc import Mapping from io import StringIO from pathlib import Path @@ -17,7 +18,7 @@ def setenvs_from_dict( - monkeypatch: pytest.MonkeyPatch, envs: dict[str, str | bool] + monkeypatch: pytest.MonkeyPatch, envs: Mapping[str, str | bool] ) -> EnvVarsDict: env_vars = {} diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index f39f7b0208b6..c18bf519362b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -112,13 +112,14 @@ class EC2InstancesSettings(BaseCustomSettings): " this is required to start a new EC2 instance", ), ] - EC2_INSTANCES_SUBNET_ID: Annotated[ - str, + EC2_INSTANCES_SUBNET_IDS: Annotated[ + list[str], Field( min_length=1, description="A subnet is a range of IP addresses in your VPC " " (https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html), " - "this is required to start a new EC2 instance", + "this is required to start a new EC2 instance. The subnets are used in the given order " + "until the capacity is used up.", ), ] EC2_INSTANCES_TIME_BEFORE_DRAINING: Annotated[ diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index 05d2589dcf2d..c2d714ebbaa2 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -15,7 +15,7 @@ EC2Tags, Resources, ) -from aws_library.ec2._errors import EC2TooManyInstancesError +from aws_library.ec2._errors import EC2AccessError, EC2TooManyInstancesError from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Node from models_library.rabbitmq_messages import ProgressType @@ -421,7 +421,7 @@ async def _activate_drained_nodes( ) -async def _start_warm_buffer_instances( +async def _try_start_warm_buffer_instances( app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider ) -> Cluster: """starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed""" @@ -471,9 +471,21 @@ async def _start_warm_buffer_instances( with log_context( _logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines" ): - started_instances = await get_ec2_client(app).start_instances( - instances_to_start - ) + try: + started_instances = await get_ec2_client(app).start_instances( + instances_to_start + ) + except EC2AccessError: + _logger.warning( + "Could not start warm buffer instances! " + "TIP: This can happen in case of Insufficient " + "Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. " + "Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this " + "will prevent fulfilling this instance type need.", + exc_info=True, + ) + # we need to re-assign the tasks assigned to the warm buffer instances + return cluster # NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity) await get_ec2_client(app).set_instances_tags( started_instances, @@ -816,7 +828,7 @@ async def _launch_instances( ].ami_id, key_name=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME, security_group_ids=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS, - subnet_id=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID, + subnet_ids=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_IDS, iam_instance_profile=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE, ), min_number_of_instances=1, # NOTE: we want at least 1 if possible @@ -1231,7 +1243,7 @@ async def _autoscale_cluster( cluster = await _activate_drained_nodes(app, cluster) # 3. start warm buffer instances to cover the remaining tasks - cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode) + cluster = await _try_start_warm_buffer_instances(app, cluster, auto_scaling_mode) # 4. scale down unused instances cluster = await _scale_down_unused_cluster_instances( diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py index f8203fe8d239..7dcaa48dd61d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py @@ -311,7 +311,7 @@ async def _add_remove_buffer_instances( ami_id=ec2_boot_specific.ami_id, key_name=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME, security_group_ids=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS, - subnet_id=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID, + subnet_ids=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_IDS, iam_instance_profile=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE, ), min_number_of_instances=1, # NOTE: we want at least 1 diff --git a/services/autoscaling/tests/manual/.env-devel b/services/autoscaling/tests/manual/.env-devel index e654a4df5236..f19312b8eb30 100644 --- a/services/autoscaling/tests/manual/.env-devel +++ b/services/autoscaling/tests/manual/.env-devel @@ -21,7 +21,7 @@ EC2_INSTANCES_ATTACHED_IAM_PROFILE=XXXXXXXXX EC2_INSTANCES_KEY_NAME=XXXXXXXXXX EC2_INSTANCES_NAME_PREFIX=testing-osparc-computational-cluster EC2_INSTANCES_SECURITY_GROUP_IDS=["XXXXXXXXXX"] -EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX +EC2_INSTANCES_SUBNET_IDS=["XXXXXXXXXX"] EC2_INSTANCES_CUSTOM_TAGS={"special": "testing"} EC2_INSTANCES_TIME_BEFORE_DRAINING=00:00:20 EC2_INSTANCES_TIME_BEFORE_TERMINATION=00:01:00 diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 3a2f89d7ab2d..8af62e808c8f 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -8,6 +8,7 @@ import json import logging import random +import secrets from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from copy import deepcopy from pathlib import Path @@ -244,11 +245,6 @@ def app_environment( delenvs_from_dict(monkeypatch, mock_env_devel_environment, raising=False) return setenvs_from_dict(monkeypatch, {**external_envfile_dict}) - assert "json_schema_extra" in EC2InstanceBootSpecific.model_config - assert isinstance(EC2InstanceBootSpecific.model_config["json_schema_extra"], dict) - assert isinstance( - EC2InstanceBootSpecific.model_config["json_schema_extra"]["examples"], list - ) envs = setenvs_from_dict( monkeypatch, { @@ -261,21 +257,19 @@ def app_environment( "SSM_ACCESS_KEY_ID": faker.pystr(), "SSM_SECRET_ACCESS_KEY": faker.pystr(), "EC2_INSTANCES_KEY_NAME": faker.pystr(), - "EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps( + "EC2_INSTANCES_SECURITY_GROUP_IDS": json_dumps( faker.pylist(allowed_types=(str,)) ), - "EC2_INSTANCES_SUBNET_ID": faker.pystr(), - "EC2_INSTANCES_ALLOWED_TYPES": json.dumps( + "EC2_INSTANCES_SUBNET_IDS": json_dumps(faker.pylist(allowed_types=(str,))), + "EC2_INSTANCES_ALLOWED_TYPES": json_dumps( { ec2_type_name: random.choice( # noqa: S311 - EC2InstanceBootSpecific.model_config["json_schema_extra"][ - "examples" - ] + EC2InstanceBootSpecific.model_json_schema()["examples"] ) for ec2_type_name in aws_allowed_ec2_instance_type_names } ), - "EC2_INSTANCES_CUSTOM_TAGS": json.dumps(ec2_instance_custom_tags), + "EC2_INSTANCES_CUSTOM_TAGS": json_dumps(ec2_instance_custom_tags), "EC2_INSTANCES_ATTACHED_IAM_PROFILE": faker.pystr(), }, ) @@ -292,25 +286,18 @@ def mocked_ec2_instances_envs( aws_allowed_ec2_instance_type_names: list[InstanceTypeType], aws_instance_profile: str, ) -> EnvVarsDict: - assert "json_schema_extra" in EC2InstanceBootSpecific.model_config - assert isinstance(EC2InstanceBootSpecific.model_config["json_schema_extra"], dict) - assert isinstance( - EC2InstanceBootSpecific.model_config["json_schema_extra"]["examples"], list - ) envs = setenvs_from_dict( monkeypatch, { "EC2_INSTANCES_KEY_NAME": "osparc-pytest", - "EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps([aws_security_group_id]), - "EC2_INSTANCES_SUBNET_ID": aws_subnet_id, - "EC2_INSTANCES_ALLOWED_TYPES": json.dumps( + "EC2_INSTANCES_SECURITY_GROUP_IDS": json_dumps([aws_security_group_id]), + "EC2_INSTANCES_SUBNET_IDS": json_dumps([aws_subnet_id]), + "EC2_INSTANCES_ALLOWED_TYPES": json_dumps( { ec2_type_name: cast( dict, - random.choice( # noqa: S311 - EC2InstanceBootSpecific.model_config["json_schema_extra"][ - "examples" - ] + secrets.choice( + EC2InstanceBootSpecific.model_json_schema()["examples"] ), ) | {"ami_id": aws_ami_id} @@ -371,11 +358,11 @@ def enabled_dynamic_mode( monkeypatch, { "AUTOSCALING_NODES_MONITORING": "{}", - "NODES_MONITORING_NODE_LABELS": json.dumps(["pytest.fake-node-label"]), - "NODES_MONITORING_SERVICE_LABELS": json.dumps( + "NODES_MONITORING_NODE_LABELS": json_dumps(["pytest.fake-node-label"]), + "NODES_MONITORING_SERVICE_LABELS": json_dumps( ["pytest.fake-service-label"] ), - "NODES_MONITORING_NEW_NODES_LABELS": json.dumps( + "NODES_MONITORING_NEW_NODES_LABELS": json_dumps( ["pytest.fake-new-node-label"] ), }, @@ -756,9 +743,9 @@ async def _() -> None: assert tasks, f"no tasks available for {found_service['Spec']['Name']}" assert len(tasks) == 1 service_task = tasks[0] - assert service_task["Status"]["State"] in expected_states, ( - f"service {found_service['Spec']['Name']}'s task is {service_task['Status']['State']}" - ) + assert ( + service_task["Status"]["State"] in expected_states + ), f"service {found_service['Spec']['Name']}'s task is {service_task['Status']['State']}" ctx.logger.info( "%s", f"service {found_service['Spec']['Name']} is now {service_task['Status']['State']} {'.' * number_of_success['count']}", @@ -792,7 +779,7 @@ def aws_allowed_ec2_instance_type_names_env( aws_allowed_ec2_instance_type_names: list[InstanceTypeType], ) -> EnvVarsDict: changed_envs: dict[str, str | bool] = { - "EC2_INSTANCES_ALLOWED_TYPES": json.dumps(aws_allowed_ec2_instance_type_names), + "EC2_INSTANCES_ALLOWED_TYPES": json_dumps(aws_allowed_ec2_instance_type_names), } return app_environment | setenvs_from_dict(monkeypatch, changed_envs) @@ -985,9 +972,7 @@ def _creator( assert ( datetime.timedelta(seconds=10) < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - ), ( - "this tests relies on the fact that the time before termination is above 10 seconds" - ) + ), "this tests relies on the fact that the time before termination is above 10 seconds" assert app_settings.AUTOSCALING_EC2_INSTANCES seconds_delta = ( -datetime.timedelta(seconds=10) @@ -1128,12 +1113,12 @@ def ec2_instances_allowed_types_with_only_1_buffered( allowed_ec2_types.items(), ) ) - assert allowed_ec2_types_with_buffer_defined, ( - "one type with buffer is needed for the tests!" - ) - assert len(allowed_ec2_types_with_buffer_defined) == 1, ( - "more than one type with buffer is disallowed in this test!" - ) + assert ( + allowed_ec2_types_with_buffer_defined + ), "one type with buffer is needed for the tests!" + assert ( + len(allowed_ec2_types_with_buffer_defined) == 1 + ), "more than one type with buffer is disallowed in this test!" return { TypeAdapter(InstanceTypeType).validate_python(k): v for k, v in allowed_ec2_types_with_buffer_defined.items() @@ -1157,9 +1142,9 @@ def _by_buffer_count( filter(_by_buffer_count, allowed_ec2_types.items()) ) assert allowed_ec2_types_with_buffer_defined, "you need one type with buffer" - assert len(allowed_ec2_types_with_buffer_defined) == 1, ( - "more than one type with buffer is disallowed in this test!" - ) + assert ( + len(allowed_ec2_types_with_buffer_defined) == 1 + ), "more than one type with buffer is disallowed in this test!" return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count @@ -1209,7 +1194,9 @@ async def _do( InstanceType=instance_type, KeyName=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME, SecurityGroupIds=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS, - SubnetId=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID, + SubnetId=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_IDS[ + 0 + ], IamInstanceProfile={ "Arn": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE }, diff --git a/services/autoscaling/tests/unit/test_core_settings.py b/services/autoscaling/tests/unit/test_core_settings.py index a7d3b70fe27a..9d0abdae9e33 100644 --- a/services/autoscaling/tests/unit/test_core_settings.py +++ b/services/autoscaling/tests/unit/test_core_settings.py @@ -9,6 +9,7 @@ $ pytest --external-envfile=.secrets --pdb tests/unit/test_core_settings.py """ + import datetime import json import logging @@ -16,6 +17,7 @@ from typing import Final import pytest +from common_library.json_serialization import json_dumps from faker import Faker from pydantic import ValidationError from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict @@ -72,6 +74,16 @@ def test_settings(app_environment: EnvVarsDict): assert settings.AUTOSCALING_REDIS +def test_settings_multiple_subnets( + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker +): + subnets = [faker.pystr() for _ in range(3)] + monkeypatch.setenv("EC2_INSTANCES_SUBNET_IDS", json_dumps(subnets)) + settings = ApplicationSettings.create_from_envs() + assert settings.AUTOSCALING_EC2_INSTANCES + assert subnets == settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_IDS + + def test_settings_dynamic_mode(enabled_dynamic_mode: EnvVarsDict): settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_ACCESS @@ -172,7 +184,6 @@ def test_EC2_INSTANCES_ALLOWED_TYPES_passing_invalid_image_tags( # noqa: N802 ) with caplog.at_level(logging.WARNING): - settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_INSTANCES is None @@ -265,7 +276,6 @@ def test_EC2_INSTANCES_ALLOWED_TYPES_empty_not_allowed_with_main_field_env_var( # NOTE: input captured via EnvSettingsWithAutoDefaultSource # default env factory -> None with caplog.at_level(logging.WARNING): - settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_INSTANCES is None @@ -293,7 +303,6 @@ def test_EC2_INSTANCES_ALLOWED_TYPES_empty_not_allowed_without_main_field_env_va # removing any value for AUTOSCALING_EC2_INSTANCES caplog.clear() with caplog.at_level(logging.WARNING): - settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_INSTANCES is None @@ -330,7 +339,6 @@ def test_EC2_INSTANCES_ALLOWED_TYPES_invalid_instance_names( # noqa: N802 ) caplog.clear() with caplog.at_level(logging.WARNING): - settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_INSTANCES is None diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py index fea09027a191..874ec6b733c7 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py @@ -18,9 +18,11 @@ import aiodocker import arrow +import botocore.exceptions import pytest import tenacity from aws_library.ec2 import EC2InstanceBootSpecific, EC2InstanceData, Resources +from common_library.json_serialization import json_dumps from fastapi import FastAPI from models_library.docker import ( DockerGenericTag, @@ -68,6 +70,7 @@ AutoscalingDocker, get_docker_client, ) +from simcore_service_autoscaling.modules.ec2 import get_ec2_client from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY, @@ -522,9 +525,9 @@ async def _test_cluster_scaling_up_and_down( # noqa: PLR0915 all_instances = await ec2_client.describe_instances(Filters=instance_type_filters) assert not all_instances["Reservations"] - assert scale_up_params.expected_num_instances == 1, ( - "This test is not made to work with more than 1 expected instance. so please adapt if needed" - ) + assert ( + scale_up_params.expected_num_instances == 1 + ), "This test is not made to work with more than 1 expected instance. so please adapt if needed" # create the service(s) created_docker_services = await create_services_batch(scale_up_params) @@ -1254,7 +1257,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB expected_num_instances=7, ), - id="A batch of services requiring g3.4xlarge and a batch requiring g4dn.8xlarge", + id="A batch of services requiring g4dn.2xlarge and a batch requiring g4dn.8xlarge", ), ], ) @@ -1283,9 +1286,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 assert ( scale_up_params1.num_services >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES - ), ( - "this test requires to run a first batch of more services than the maximum number of instances allowed" - ) + ), "this test requires to run a first batch of more services than the maximum number of instances allowed" # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] @@ -1502,9 +1503,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 assert "Instances" in reservation1 assert len(reservation1["Instances"]) == ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES - ), ( - f"expected {app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES} EC2 instances, found {len(reservation1['Instances'])}" - ) + ), f"expected {app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES} EC2 instances, found {len(reservation1['Instances'])}" for instance in reservation1["Instances"]: assert "InstanceType" in instance assert instance["InstanceType"] == scale_up_params1.expected_instance_type @@ -1518,9 +1517,9 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 reservation2 = all_instances["Reservations"][1] assert "Instances" in reservation2 - assert len(reservation2["Instances"]) == 1, ( - f"expected 1 EC2 instances, found {len(reservation2['Instances'])}" - ) + assert ( + len(reservation2["Instances"]) == 1 + ), f"expected 1 EC2 instances, found {len(reservation2['Instances'])}" for instance in reservation2["Instances"]: assert "InstanceType" in instance assert instance["InstanceType"] == scale_up_params2.expected_instance_type @@ -2086,7 +2085,7 @@ async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], indirect=True, ) -async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7071( +async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7071( # noqa: PLR0915 patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, with_instances_machines_hot_buffer: EnvVarsDict, @@ -2247,9 +2246,9 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 # BUG REPRODUCTION # # start a service that imposes same type as the hot buffer - assert hot_buffer_instance_type == "t2.xlarge", ( - "the test is hard-coded for this type and accordingly resource. If this changed then the resource shall be changed too" - ) + assert ( + hot_buffer_instance_type == "t2.xlarge" + ), "the test is hard-coded for this type and accordingly resource. If this changed then the resource shall be changed too" scale_up_params = _ScaleUpParams( imposed_instance_type=hot_buffer_instance_type, service_resources=Resources( @@ -2335,3 +2334,195 @@ async def _check_autoscaling_is_stable() -> None: with pytest.raises(tenacity.RetryError): await _check_autoscaling_is_stable() + + +@pytest.fixture +async def with_multiple_small_subnet_ids( + create_aws_subnet_id: Callable[..., Awaitable[str]], monkeypatch: pytest.MonkeyPatch +) -> tuple[str, ...]: + subnet_1 = await create_aws_subnet_id("10.0.200.0/29") # 3 usable IPs + subnet_2 = await create_aws_subnet_id("10.0.201.0/29") # 3 usable IPs + monkeypatch.setenv("EC2_INSTANCES_SUBNET_IDS", json_dumps([subnet_1, subnet_2])) + return subnet_1, subnet_2 + + +@pytest.mark.parametrize( + "scale_up_params", + [ + pytest.param( + _ScaleUpParams( + imposed_instance_type=None, + service_resources=Resources( + cpus=5, ram=TypeAdapter(ByteSize).validate_python("36Gib") + ), + num_services=1, + expected_instance_type="r5n.4xlarge", # 1 GPU, 16 CPUs, 128GiB + expected_num_instances=1, + ), + ), + ], +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) +async def test_fresh_instance_is_started_in_second_subnet_if_warm_buffers_used_up_all_ips_in_first_subnet( + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, + minimal_configuration: None, + with_multiple_small_subnet_ids: tuple[str, ...], + initialized_app: FastAPI, + app_settings: ApplicationSettings, + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None], + Awaitable[list[str]], + ], + ec2_client: EC2Client, + scale_up_params: _ScaleUpParams, + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], + ec2_instance_custom_tags: dict[str, str], + instance_type_filters: Sequence[FilterTypeDef], +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # have warm buffers in the first subnet *fixture uses subnet_1 by default*, this will use all the IPs in the first subnet + assert app_settings.AUTOSCALING_EC2_INSTANCES + await create_buffer_machines( + 3, + cast( + InstanceTypeType, + next( + iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) + ), + ), + "stopped", + None, + ) + + # create several tasks that needs more power + await create_services_batch(scale_up_params) + # now autoscale shall create machines in the second subnet + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() + ) + # check the instances were started + created_instances = await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + # check the instance is in the second subnet + assert created_instances + assert "SubnetId" in created_instances[0] + assert created_instances[0]["SubnetId"] == with_multiple_small_subnet_ids[1] + + +@pytest.fixture +def mock_start_instances_to_raise_insufficient_capacity_error( + initialized_app: FastAPI, + mocker: MockerFixture, +) -> mock.Mock: + async def _raise_insufficient_capacity_error(*args: Any, **kwargs: Any) -> None: + raise botocore.exceptions.ClientError( + error_response={ + "Error": { + "Code": "InsufficientInstanceCapacity", + "Message": "An error occurred (InsufficientInstanceCapacity) when calling the RunInstances operation (reached max retries: 4): We currently do not have sufficient g4dn.4xlarge capacity in the Availability Zone you requested (us-east-1a). Our system will be working on provisioning additional capacity. You can currently get g4dn.4xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1b, us-east-1c, us-east-1d, us-east-1f", + } + }, + operation_name="StartInstances", + ) + + return mocker.patch.object( + get_ec2_client(initialized_app).client, + "start_instances", + autospec=True, + side_effect=_raise_insufficient_capacity_error, + ) + + +@pytest.mark.xfail( + reason="bug described in https://github.com/ITISFoundation/osparc-simcore/issues/8273" +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) +async def test_fresh_instance_is_launched_if_warm_buffers_cannot_start_due_to_insufficient_capacity_error( + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, + minimal_configuration: None, + with_multiple_small_subnet_ids: tuple[str, ...], + initialized_app: FastAPI, + mock_start_instances_to_raise_insufficient_capacity_error: None, + app_settings: ApplicationSettings, + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None], + Awaitable[list[str]], + ], + ec2_client: EC2Client, + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], + ec2_instance_custom_tags: dict[str, str], + instance_type_filters: Sequence[FilterTypeDef], +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # have warm buffers in the first subnet *fixture uses subnet_1 by default*, this will use all the IPs in the first subnet + assert app_settings.AUTOSCALING_EC2_INSTANCES + warm_buffer_instance_type = cast( + InstanceTypeType, + next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)), + ) + await create_buffer_machines(1, warm_buffer_instance_type, "stopped", None) + + # create several tasks that needs more power + scale_up_params = _ScaleUpParams( + imposed_instance_type=warm_buffer_instance_type, + service_resources=Resources( + cpus=1, ram=TypeAdapter(ByteSize).validate_python("1Gib") + ), + num_services=1, + expected_instance_type=warm_buffer_instance_type, + expected_num_instances=1, + ) + await create_services_batch(scale_up_params) + # now autoscale shall create machines in the second subnet + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() + ) + # check the instances were started + created_instances = await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + # check the instance is in the second subnet + assert created_instances + assert "SubnetId" in created_instances[0] + assert created_instances[0]["SubnetId"] == with_multiple_small_subnet_ids[1] diff --git a/services/autoscaling/tests/unit/test_modules_ec2.py b/services/autoscaling/tests/unit/test_modules_ec2.py index aab1747983a5..0ec83790d7e5 100644 --- a/services/autoscaling/tests/unit/test_modules_ec2.py +++ b/services/autoscaling/tests/unit/test_modules_ec2.py @@ -51,7 +51,7 @@ def _(instance_type: InstanceTypeType) -> EC2InstanceConfig: ami_id=aws_ami_id, key_name=faker.pystr(), security_group_ids=[aws_security_group_id], - subnet_id=aws_subnet_id, + subnet_ids=[aws_subnet_id], iam_instance_profile="", ) @@ -69,7 +69,7 @@ def _assert_metrics( *, expected_num_samples: int, check_sample_index: int | None, - expected_sample: _ExpectedSample | None + expected_sample: _ExpectedSample | None, ) -> None: collected_metrics = list(metrics_to_collect.collect()) assert len(collected_metrics) == 1 diff --git a/services/clusters-keeper/.env-devel b/services/clusters-keeper/.env-devel index 1c103bc8dd29..cd6039eec611 100644 --- a/services/clusters-keeper/.env-devel +++ b/services/clusters-keeper/.env-devel @@ -6,7 +6,7 @@ EC2_CLUSTERS_KEEPER_ACCESS_KEY_ID=XXXXXXXXXX PRIMARY_EC2_INSTANCES_ALLOWED_TYPES='{"t2.medium":"ami_id": "XXXXXXXXXX", "custom_boot_scripts": ["whoami"]}}' PRIMARY_EC2_INSTANCES_KEY_NAME=XXXXXXXXXX PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS=XXXXXXXXXX -PRIMARY_EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX +PRIMARY_EC2_INSTANCES_SUBNET_IDS='["XXXXXXXXXX"]' EC2_CLUSTERS_KEEPER_SECRET_ACCESS_KEY=XXXXXXXXXX CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX="testing" LOG_FORMAT_LOCAL_DEV_ENABLED=True diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py index 2fde9fcbcd8c..612abeabf771 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py @@ -118,8 +118,8 @@ class WorkersEC2InstancesSettings(BaseCustomSettings): " this is required to start a new EC2 instance", ), ] - WORKERS_EC2_INSTANCES_SUBNET_ID: Annotated[ - str, + WORKERS_EC2_INSTANCES_SUBNET_IDS: Annotated[ + list[str], Field( min_length=1, description="A subnet is a range of IP addresses in your VPC " @@ -186,8 +186,8 @@ class PrimaryEC2InstancesSettings(BaseCustomSettings): " this is required to start a new EC2 instance", ), ] - PRIMARY_EC2_INSTANCES_SUBNET_ID: Annotated[ - str, + PRIMARY_EC2_INSTANCES_SUBNET_IDS: Annotated[ + list[str], Field( min_length=1, description="A subnet is a range of IP addresses in your VPC " diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index 6ba13f58eac9..8ddbc592f19b 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -111,7 +111,7 @@ services: EC2_INSTANCES_MAX_INSTANCES: ${WORKERS_EC2_INSTANCES_MAX_INSTANCES} EC2_INSTANCES_NAME_PREFIX: ${EC2_INSTANCES_NAME_PREFIX} EC2_INSTANCES_SECURITY_GROUP_IDS: ${WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS} - EC2_INSTANCES_SUBNET_ID: ${WORKERS_EC2_INSTANCES_SUBNET_ID} + EC2_INSTANCES_SUBNET_IDS: ${WORKERS_EC2_INSTANCES_SUBNET_IDS} EC2_INSTANCES_TIME_BEFORE_DRAINING: ${WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING} EC2_INSTANCES_TIME_BEFORE_TERMINATION: ${WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION} LOG_FORMAT_LOCAL_DEV_ENABLED: 1 diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py index 89860549fd3a..a95c879a2a34 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py @@ -46,10 +46,10 @@ async def _get_primary_ec2_params( app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_ALLOWED_TYPES.items() ) ) - ec2_instance_types: list[ - EC2InstanceType - ] = await ec2_client.get_ec2_instance_capabilities( - instance_type_names={ec2_type_name} + ec2_instance_types: list[EC2InstanceType] = ( + await ec2_client.get_ec2_instance_capabilities( + instance_type_names={ec2_type_name} + ) ) assert ec2_instance_types # nosec assert len(ec2_instance_types) == 1 # nosec @@ -77,7 +77,7 @@ async def create_cluster( ami_id=ec2_instance_boot_specs.ami_id, key_name=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_KEY_NAME, security_group_ids=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS, - subnet_id=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SUBNET_ID, + subnet_ids=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SUBNET_IDS, iam_instance_profile=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_ATTACHED_IAM_PROFILE, ) new_ec2_instance_data: list[EC2InstanceData] = await ec2_client.launch_instances( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 81bd7279399f..c1d42b9ebfc9 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -102,7 +102,7 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str: f"WORKERS_EC2_INSTANCES_KEY_NAME={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_KEY_NAME}", f"WORKERS_EC2_INSTANCES_MAX_INSTANCES={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_MAX_INSTANCES}", f"WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS={_convert_to_env_list(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS)}", - f"WORKERS_EC2_INSTANCES_SUBNET_ID={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SUBNET_ID}", + f"WORKERS_EC2_INSTANCES_SUBNET_IDS={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SUBNET_IDS}", f"WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING}", f"WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION}", f"AUTOSCALING_RABBITMQ={_convert_to_env_dict(model_dump_with_secrets(app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_RABBIT, show_secrets=True)) if app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_RABBIT else 'null'}", diff --git a/services/clusters-keeper/tests/manual/README.md b/services/clusters-keeper/tests/manual/README.md index 4ef8e0bd72c6..6fd9ce5efb68 100644 --- a/services/clusters-keeper/tests/manual/README.md +++ b/services/clusters-keeper/tests/manual/README.md @@ -70,14 +70,14 @@ PRIMARY_EC2_INSTANCES_ALLOWED_TYPES='{"t2.medium":"ami_id": "XXXXXXXX", "custom_ PRIMARY_EC2_INSTANCES_KEY_NAME=XXXXXXX PRIMARY_EC2_INSTANCES_MAX_INSTANCES=10 PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS="[\"XXXXXXX\"]" -PRIMARY_EC2_INSTANCES_SUBNET_ID=XXXXXXX +PRIMARY_EC2_INSTANCES_SUBNET_IDS="[\"XXXXXXX\"]" CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES={} WORKERS_EC2_INSTANCES_ALLOWED_TYPES='{"g4dn.xlarge": {"ami_id": "XXXXXXXX", "custom_boot_scripts": ["whoami"], "pre_pull_images": ["ubuntu:latest"]}}' WORKERS_EC2_INSTANCES_KEY_NAME=XXXXXXX WORKERS_EC2_INSTANCES_MAX_INSTANCES=10 WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS="[\"XXXXXXX\"]" -WORKERS_EC2_INSTANCES_SUBNET_ID=XXXXXXX +WORKERS_EC2_INSTANCES_SUBNET_IDS="[\"XXXXXXX\"]" WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING="00:00:20" WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION="00:03:00" WORKERS_EC2_INSTANCES_CUSTOM_TAGS='{"osparc-tag": "some fun tag value"}' diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index 15c52ee996d3..cbcbd09ee5a3 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -3,7 +3,6 @@ # pylint:disable=redefined-outer-name import importlib.resources -import json import random from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from pathlib import Path @@ -17,6 +16,7 @@ import yaml from asgi_lifespan import LifespanManager from aws_library.ec2 import EC2InstanceBootSpecific +from common_library.json_serialization import json_dumps from faker import Faker from fakeredis.aioredis import FakeRedis from fastapi import FastAPI @@ -134,22 +134,22 @@ def app_environment( "CLUSTERS_KEEPER_DASK_WORKER_SATURATION": f"{faker.pyfloat(min_value=0.1)}", "CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH": "{}", "PRIMARY_EC2_INSTANCES_KEY_NAME": faker.pystr(), - "PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps( + "PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS": json_dumps( faker.pylist(allowed_types=(str,)) ), - "PRIMARY_EC2_INSTANCES_SUBNET_ID": faker.pystr(), - "PRIMARY_EC2_INSTANCES_ALLOWED_TYPES": json.dumps( + "PRIMARY_EC2_INSTANCES_SUBNET_IDS": json_dumps( + faker.pylist(allowed_types=(str,)) + ), + "PRIMARY_EC2_INSTANCES_ALLOWED_TYPES": json_dumps( { random.choice( # noqa: S311 ec2_instances - ): EC2InstanceBootSpecific.model_config["json_schema_extra"][ - "examples" - ][ + ): EC2InstanceBootSpecific.model_json_schema()["examples"][ 1 ] # NOTE: we use example with custom script } ), - "PRIMARY_EC2_INSTANCES_CUSTOM_TAGS": json.dumps( + "PRIMARY_EC2_INSTANCES_CUSTOM_TAGS": json_dumps( {"osparc-tag": "the pytest tag is here"} ), "PRIMARY_EC2_INSTANCES_ATTACHED_IAM_PROFILE": "", # must be empty since we would need to add it to moto as well @@ -159,22 +159,22 @@ def app_environment( "PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME": faker.user_name(), "PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD": faker.password(), "CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES": "{}", - "WORKERS_EC2_INSTANCES_ALLOWED_TYPES": json.dumps( + "WORKERS_EC2_INSTANCES_ALLOWED_TYPES": json_dumps( { ec2_type_name: random.choice( # noqa: S311 - EC2InstanceBootSpecific.model_config["json_schema_extra"][ - "examples" - ] + EC2InstanceBootSpecific.model_json_schema()["examples"] ) for ec2_type_name in ec2_instances } ), - "WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps( + "WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS": json_dumps( + faker.pylist(allowed_types=(str,)) + ), + "WORKERS_EC2_INSTANCES_SUBNET_IDS": json_dumps( faker.pylist(allowed_types=(str,)) ), - "WORKERS_EC2_INSTANCES_SUBNET_ID": faker.pystr(), "WORKERS_EC2_INSTANCES_KEY_NAME": faker.pystr(), - "WORKERS_EC2_INSTANCES_CUSTOM_TAGS": json.dumps( + "WORKERS_EC2_INSTANCES_CUSTOM_TAGS": json_dumps( {"osparc-tag": "the pytest worker tag value is here"} ), }, @@ -194,10 +194,10 @@ def mocked_primary_ec2_instances_envs( monkeypatch, { "PRIMARY_EC2_INSTANCES_KEY_NAME": "osparc-pytest", - "PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps( + "PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS": json_dumps( [aws_security_group_id] ), - "PRIMARY_EC2_INSTANCES_SUBNET_ID": aws_subnet_id, + "PRIMARY_EC2_INSTANCES_SUBNET_IDS": json_dumps([aws_subnet_id]), }, ) return app_environment | envs diff --git a/services/clusters-keeper/tests/unit/test_core_settings.py b/services/clusters-keeper/tests/unit/test_core_settings.py index bbd7073d77f2..914f097cc979 100644 --- a/services/clusters-keeper/tests/unit/test_core_settings.py +++ b/services/clusters-keeper/tests/unit/test_core_settings.py @@ -4,7 +4,7 @@ import json -import random +import secrets import pytest from aws_library.ec2 import EC2InstanceBootSpecific @@ -53,10 +53,8 @@ def test_multiple_primary_ec2_instances_raises( { "PRIMARY_EC2_INSTANCES_ALLOWED_TYPES": json.dumps( { - ec2_type_name: random.choice( # noqa: S311 - EC2InstanceBootSpecific.model_config["json_schema_extra"][ - "examples" - ] + ec2_type_name: secrets.choice( + EC2InstanceBootSpecific.model_json_schema()["examples"] ) for ec2_type_name in ec2_instances } diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 7153a9122b39..7b78dd6df21d 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -1,10 +1,10 @@ x-dask-tls-secrets: &dask_tls_secrets - source: dask_tls_key target: ${DASK_TLS_KEY} - # mode: 444 # not supported by docker stack compose as of 26.0.0 + # mode: 444 # not supported by docker stack compose as of 26.0.0 - source: dask_tls_cert target: ${DASK_TLS_CERT} - # mode: 444 # not supported by docker stack compose as of 26.0.0 +# mode: 444 # not supported by docker stack compose as of 26.0.0 x-tracing-open-telemetry: &tracing_open_telemetry_environs TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT} @@ -108,7 +108,7 @@ services: EC2_INSTANCES_MAX_START_TIME: ${EC2_INSTANCES_MAX_START_TIME} EC2_INSTANCES_NAME_PREFIX: ${EC2_INSTANCES_NAME_PREFIX} EC2_INSTANCES_SECURITY_GROUP_IDS: ${EC2_INSTANCES_SECURITY_GROUP_IDS} - EC2_INSTANCES_SUBNET_ID: ${EC2_INSTANCES_SUBNET_ID} + EC2_INSTANCES_SUBNET_IDS: ${EC2_INSTANCES_SUBNET_IDS} EC2_INSTANCES_KEY_NAME: ${EC2_INSTANCES_KEY_NAME} EC2_INSTANCES_TIME_BEFORE_DRAINING: ${EC2_INSTANCES_TIME_BEFORE_DRAINING} EC2_INSTANCES_TIME_BEFORE_TERMINATION: ${EC2_INSTANCES_TIME_BEFORE_TERMINATION} @@ -223,7 +223,7 @@ services: PRIMARY_EC2_INSTANCES_KEY_NAME: ${PRIMARY_EC2_INSTANCES_KEY_NAME} PRIMARY_EC2_INSTANCES_MAX_INSTANCES: ${PRIMARY_EC2_INSTANCES_MAX_INSTANCES} PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS: ${PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS} - PRIMARY_EC2_INSTANCES_SUBNET_ID: ${PRIMARY_EC2_INSTANCES_SUBNET_ID} + PRIMARY_EC2_INSTANCES_SUBNET_IDS: ${PRIMARY_EC2_INSTANCES_SUBNET_IDS} PRIMARY_EC2_INSTANCES_CUSTOM_TAGS: ${PRIMARY_EC2_INSTANCES_CUSTOM_TAGS} PRIMARY_EC2_INSTANCES_ATTACHED_IAM_PROFILE: ${PRIMARY_EC2_INSTANCES_ATTACHED_IAM_PROFILE} PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_CA: ${PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_CA} @@ -253,7 +253,7 @@ services: WORKERS_EC2_INSTANCES_MAX_INSTANCES: ${WORKERS_EC2_INSTANCES_MAX_INSTANCES} WORKERS_EC2_INSTANCES_MAX_START_TIME: ${WORKERS_EC2_INSTANCES_MAX_START_TIME} WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS: ${WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS} - WORKERS_EC2_INSTANCES_SUBNET_ID: ${WORKERS_EC2_INSTANCES_SUBNET_ID} + WORKERS_EC2_INSTANCES_SUBNET_IDS: ${WORKERS_EC2_INSTANCES_SUBNET_IDS} WORKERS_EC2_INSTANCES_CUSTOM_TAGS: ${WORKERS_EC2_INSTANCES_CUSTOM_TAGS} CLUSTERS_KEEPER_TRACING: ${CLUSTERS_KEEPER_TRACING} secrets: *dask_tls_secrets