diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index 37fb97ab572..e4fd7e19706 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -587,7 +587,7 @@ jobs: unit-test-autoscaling: needs: changes if: ${{ needs.changes.outputs.autoscaling == 'true' || github.event_name == 'push' || github.event.inputs.force_all_builds == 'true' }} - timeout-minutes: 22 # temporary: mypy takes a huge amount of time to run here, maybe we should cache it + timeout-minutes: 18 # if this timeout gets too small, then split the tests name: "[unit] autoscaling" runs-on: ${{ matrix.os }} strategy: diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py b/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py index 2d6c278d92c..3648284faa7 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py @@ -39,7 +39,7 @@ def create_fake_association( ): fake_node_to_instance_map = {} - async def _fake_node_creator( + def _fake_node_creator( _nodes: list[Node], ec2_instances: list[EC2InstanceData] ) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]: def _create_fake_node_with_labels(instance: EC2InstanceData) -> Node: diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/application.py b/services/autoscaling/src/simcore_service_autoscaling/core/application.py index 95fcff3b4b7..f0c9d7f3c32 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/application.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/application.py @@ -17,8 +17,12 @@ APP_STARTED_DYNAMIC_BANNER_MSG, ) from ..api.routes import setup_api_routes -from ..modules.auto_scaling_task import setup as setup_auto_scaler_background_task -from ..modules.buffer_machines_pool_task import setup as setup_buffer_machines_pool_task +from ..modules.cluster_scaling.auto_scaling_task import ( + setup as setup_auto_scaler_background_task, +) +from ..modules.cluster_scaling.warm_buffer_machines_pool_task import ( + setup as setup_warm_buffer_machines_pool_task, +) from ..modules.docker import setup as setup_docker from ..modules.ec2 import setup as setup_ec2 from ..modules.instrumentation import setup as setup_instrumentation @@ -78,7 +82,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI: initialize_fastapi_app_tracing(app) setup_auto_scaler_background_task(app) - setup_buffer_machines_pool_task(app) + setup_warm_buffer_machines_pool_task(app) # ERROR HANDLERS diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index c77f9fe349c..7645b300e8d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -47,8 +47,7 @@ class AssociatedInstance(_BaseInstance): @dataclass(frozen=True, kw_only=True, slots=True) -class NonAssociatedInstance(_BaseInstance): - ... +class NonAssociatedInstance(_BaseInstance): ... @dataclass(frozen=True, kw_only=True, slots=True) @@ -68,9 +67,9 @@ class Cluster: # pylint: disable=too-many-instance-attributes "description": "This is a EC2-backed docker node which is drained (cannot accept tasks)" } ) - buffer_drained_nodes: list[AssociatedInstance] = field( + hot_buffer_drained_nodes: list[AssociatedInstance] = field( metadata={ - "description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks)" + "description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks, a.k.a. hot buffer)" } ) pending_ec2s: list[NonAssociatedInstance] = field( @@ -83,9 +82,9 @@ class Cluster: # pylint: disable=too-many-instance-attributes "description": "This is an existing EC2 instance that never properly joined the cluster and is deemed as broken and will be terminated" } ) - buffer_ec2s: list[NonAssociatedInstance] = field( + warm_buffer_ec2s: list[NonAssociatedInstance] = field( metadata={ - "description": "This is a prepared stopped EC2 instance, not yet associated to a docker node, ready to be used" + "description": "This is a prepared stopped EC2 instance, not yet associated to a docker node, ready to be used (a.k.a. warm buffer)" } ) disconnected_nodes: list[Node] = field( @@ -121,7 +120,7 @@ def total_number_of_machines(self) -> int: len(self.active_nodes) + len(self.pending_nodes) + len(self.drained_nodes) - + len(self.buffer_drained_nodes) + + len(self.hot_buffer_drained_nodes) + len(self.pending_ec2s) + len(self.broken_ec2s) + len(self.terminating_nodes) @@ -138,10 +137,10 @@ def _get_instance_ids( f"Cluster(active-nodes: count={len(self.active_nodes)} {_get_instance_ids(self.active_nodes)}, " f"pending-nodes: count={len(self.pending_nodes)} {_get_instance_ids(self.pending_nodes)}, " f"drained-nodes: count={len(self.drained_nodes)} {_get_instance_ids(self.drained_nodes)}, " - f"reserve-drained-nodes: count={len(self.buffer_drained_nodes)} {_get_instance_ids(self.buffer_drained_nodes)}, " + f"hot-buffer-drained-nodes: count={len(self.hot_buffer_drained_nodes)} {_get_instance_ids(self.hot_buffer_drained_nodes)}, " f"pending-ec2s: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, " f"broken-ec2s: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, " - f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, " + f"warm-buffer-ec2s: count={len(self.warm_buffer_ec2s)} {_get_instance_ids(self.warm_buffer_ec2s)}, " f"disconnected-nodes: count={len(self.disconnected_nodes)}, " f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, " f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, " @@ -159,7 +158,7 @@ class DaskTask: @dataclass(kw_only=True, slots=True) -class BufferPool: +class WarmBufferPool: ready_instances: set[EC2InstanceData] = field(default_factory=set) pending_instances: set[EC2InstanceData] = field(default_factory=set) waiting_to_pull_instances: set[EC2InstanceData] = field(default_factory=set) @@ -170,7 +169,7 @@ class BufferPool: def __repr__(self) -> str: return ( - f"BufferPool(ready-count={len(self.ready_instances)}, " + f"WarmBufferPool(ready-count={len(self.ready_instances)}, " f"pending-count={len(self.pending_instances)}, " f"waiting-to-pull-count={len(self.waiting_to_pull_instances)}, " f"waiting-to-stop-count={len(self.waiting_to_stop_instances)}, " @@ -213,20 +212,20 @@ def remove_instance(self, instance: EC2InstanceData) -> None: @dataclass -class BufferPoolManager: - buffer_pools: dict[InstanceTypeType, BufferPool] = field( - default_factory=lambda: defaultdict(BufferPool) +class WarmBufferPoolManager: + buffer_pools: dict[InstanceTypeType, WarmBufferPool] = field( + default_factory=lambda: defaultdict(WarmBufferPool) ) def __repr__(self) -> str: - return f"BufferPoolManager({dict(self.buffer_pools)})" + return f"WarmBufferPoolManager({dict(self.buffer_pools)})" - def flatten_buffer_pool(self) -> BufferPool: + def flatten_buffer_pool(self) -> WarmBufferPool: """returns a flattened buffer pool with all the EC2InstanceData""" - flat_pool = BufferPool() + flat_pool = WarmBufferPool() for buffer_pool in self.buffer_pools.values(): - for f in fields(BufferPool): + for f in fields(WarmBufferPool): getattr(flat_pool, f.name).update(getattr(buffer_pool, f.name)) return flat_pool diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py deleted file mode 100644 index b9df042c622..00000000000 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ /dev/null @@ -1,80 +0,0 @@ -from abc import ABC, abstractmethod -from dataclasses import dataclass - -from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources -from fastapi import FastAPI -from models_library.docker import DockerLabelKey -from models_library.generated_models.docker_rest_api import Node as DockerNode -from types_aiobotocore_ec2.literals import InstanceTypeType - -from ..models import AssociatedInstance -from ..utils import utils_docker - - -@dataclass -class BaseAutoscaling(ABC): # pragma: no cover - @staticmethod - @abstractmethod - async def get_monitored_nodes(app: FastAPI) -> list[DockerNode]: ... - - @staticmethod - @abstractmethod - def get_ec2_tags(app: FastAPI) -> EC2Tags: ... - - @staticmethod - @abstractmethod - def get_new_node_docker_tags( - app: FastAPI, ec2_instance_data: EC2InstanceData - ) -> dict[DockerLabelKey, str]: ... - - @staticmethod - @abstractmethod - async def list_unrunnable_tasks(app: FastAPI) -> list: ... - - @staticmethod - @abstractmethod - def get_task_required_resources(task) -> Resources: ... - - @staticmethod - @abstractmethod - async def get_task_defined_instance( - app: FastAPI, task - ) -> InstanceTypeType | None: ... - - @staticmethod - @abstractmethod - async def compute_node_used_resources( - app: FastAPI, instance: AssociatedInstance - ) -> Resources: ... - - @staticmethod - @abstractmethod - async def compute_cluster_used_resources( - app: FastAPI, instances: list[AssociatedInstance] - ) -> Resources: ... - - @staticmethod - @abstractmethod - async def compute_cluster_total_resources( - app: FastAPI, instances: list[AssociatedInstance] - ) -> Resources: ... - - @staticmethod - @abstractmethod - async def is_instance_active( - app: FastAPI, instance: AssociatedInstance - ) -> bool: ... - - @staticmethod - @abstractmethod - async def is_instance_retired( - app: FastAPI, instance: AssociatedInstance - ) -> bool: ... - - @staticmethod - def is_instance_drained(instance: AssociatedInstance) -> bool: - return not utils_docker.is_node_osparc_ready(instance.node) - - @staticmethod - @abstractmethod - async def try_retire_nodes(app: FastAPI) -> None: ... diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/__init__.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py similarity index 91% rename from services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index a3a34e7b5d0..05d2589dcf2 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -17,63 +17,57 @@ ) from aws_library.ec2._errors import EC2TooManyInstancesError from fastapi import FastAPI -from models_library.generated_models.docker_rest_api import Node, NodeState +from models_library.generated_models.docker_rest_api import Node from models_library.rabbitmq_messages import ProgressType from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_gather from servicelib.utils_formatting import timedelta_as_minute_second from types_aiobotocore_ec2.literals import InstanceTypeType -from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME -from ..core.errors import ( +from ...constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME +from ...core.errors import ( Ec2InvalidDnsNameError, TaskBestFittingInstanceNotFoundError, TaskRequirementsAboveRequiredEC2InstanceTypeError, TaskRequiresUnauthorizedEC2InstanceTypeError, ) -from ..core.settings import ApplicationSettings, get_application_settings -from ..models import ( +from ...core.settings import ApplicationSettings, get_application_settings +from ...models import ( AssignedTasksToInstanceType, AssociatedInstance, Cluster, NonAssociatedInstance, ) -from ..utils import utils_docker, utils_ec2 -from ..utils.auto_scaling_core import ( +from ...utils import utils_docker, utils_ec2 +from ...utils.cluster_scaling import ( associate_ec2_instances_with_nodes, ec2_startup_script, find_selected_instance_type_for_task, - get_machine_buffer_type, - node_host_name_from_ec2_private_dns, + get_hot_buffer_type, sort_drained_nodes, ) -from ..utils.buffer_machines_pool_core import ( - get_activated_buffer_ec2_tags, - get_deactivated_buffer_ec2_tags, - is_buffer_machine, -) -from ..utils.rabbitmq import ( +from ...utils.rabbitmq import ( post_autoscaling_status_message, post_tasks_log_message, post_tasks_progress_message, ) -from .auto_scaling_mode_base import BaseAutoscaling -from .docker import get_docker_client -from .ec2 import get_ec2_client -from .instrumentation import get_instrumentation, has_instrumentation -from .ssm import get_ssm_client +from ...utils.warm_buffer_machines import ( + get_activated_warm_buffer_ec2_tags, + get_deactivated_warm_buffer_ec2_tags, + is_warm_buffer_machine, +) +from ..docker import get_docker_client +from ..ec2 import get_ec2_client +from ..instrumentation import get_instrumentation, has_instrumentation +from ..ssm import get_ssm_client +from ._provider_protocol import AutoscalingProvider _logger = logging.getLogger(__name__) -def _node_not_ready(node: Node) -> bool: - assert node.status # nosec - return bool(node.status.state != NodeState.ready) - - async def _analyze_current_cluster( app: FastAPI, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, allowed_instance_types: list[EC2InstanceType], ) -> Cluster: app_settings = get_application_settings(app) @@ -95,13 +89,13 @@ async def _analyze_current_cluster( state_names=["terminated"], ) - buffer_ec2_instances = await get_ec2_client(app).get_instances( + warm_buffer_ec2_instances = await get_ec2_client(app).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], - tags=get_deactivated_buffer_ec2_tags(app, auto_scaling_mode), + tags=get_deactivated_warm_buffer_ec2_tags(auto_scaling_mode.get_ec2_tags(app)), state_names=["stopped"], ) - attached_ec2s, pending_ec2s = await associate_ec2_instances_with_nodes( + attached_ec2s, pending_ec2s = associate_ec2_instances_with_nodes( docker_nodes, existing_ec2_instances ) @@ -141,7 +135,7 @@ async def _analyze_current_cluster( - node_used_resources, ) ) - elif auto_scaling_mode.is_instance_drained(instance): + elif utils_docker.is_instance_drained(instance): all_drained_nodes.append(instance) elif await auto_scaling_mode.is_instance_retired(app, instance): # it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed @@ -149,24 +143,26 @@ async def _analyze_current_cluster( else: pending_nodes.append(instance) - drained_nodes, buffer_drained_nodes, terminating_nodes = sort_drained_nodes( + drained_nodes, hot_buffer_drained_nodes, terminating_nodes = sort_drained_nodes( app_settings, all_drained_nodes, allowed_instance_types ) cluster = Cluster( active_nodes=active_nodes, pending_nodes=pending_nodes, drained_nodes=drained_nodes, - buffer_drained_nodes=buffer_drained_nodes, + hot_buffer_drained_nodes=hot_buffer_drained_nodes, pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s], broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s], - buffer_ec2s=[ - NonAssociatedInstance(ec2_instance=i) for i in buffer_ec2_instances + warm_buffer_ec2s=[ + NonAssociatedInstance(ec2_instance=i) for i in warm_buffer_ec2_instances ], terminating_nodes=terminating_nodes, terminated_instances=[ NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances ], - disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], + disconnected_nodes=[ + n for n in docker_nodes if not utils_docker.is_node_ready(n) + ], retired_nodes=retired_nodes, ) _logger.info("current state: %s", f"{cluster!r}") @@ -207,7 +203,7 @@ async def _terminate_broken_ec2s(app: FastAPI, cluster: Cluster) -> Cluster: ) -async def _make_pending_buffer_ec2s_join_cluster( +async def _make_pending_warm_buffer_ec2s_join_cluster( app: FastAPI, cluster: Cluster, ) -> Cluster: @@ -215,7 +211,7 @@ async def _make_pending_buffer_ec2s_join_cluster( if buffer_ec2s_pending := [ i.ec2_instance for i in cluster.pending_ec2s - if is_buffer_machine(i.ec2_instance.tags) + if is_warm_buffer_machine(i.ec2_instance.tags) and (DOCKER_JOIN_COMMAND_EC2_TAG_KEY not in i.ec2_instance.tags) ]: # started buffer instance shall be asked to join the cluster once they are running @@ -278,7 +274,7 @@ async def _make_pending_buffer_ec2s_join_cluster( async def _try_attach_pending_ec2s( app: FastAPI, cluster: Cluster, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, allowed_instance_types: list[EC2InstanceType], ) -> Cluster: """label the drained instances that connected to the swarm which are missing the monitoring labels""" @@ -288,7 +284,7 @@ async def _try_attach_pending_ec2s( assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec for instance_data in cluster.pending_ec2s: try: - node_host_name = node_host_name_from_ec2_private_dns( + node_host_name = utils_ec2.node_host_name_from_ec2_private_dns( instance_data.ec2_instance ) if new_node := await utils_docker.find_node_with_name( @@ -317,15 +313,15 @@ async def _try_attach_pending_ec2s( _logger.exception("Unexpected EC2 private dns") # NOTE: first provision the reserve drained nodes if possible all_drained_nodes = ( - cluster.drained_nodes + cluster.buffer_drained_nodes + new_found_instances + cluster.drained_nodes + cluster.hot_buffer_drained_nodes + new_found_instances ) - drained_nodes, buffer_drained_nodes, _ = sort_drained_nodes( + drained_nodes, hot_buffer_drained_nodes, _ = sort_drained_nodes( app_settings, all_drained_nodes, allowed_instance_types ) return dataclasses.replace( cluster, drained_nodes=drained_nodes, - buffer_drained_nodes=buffer_drained_nodes, + hot_buffer_drained_nodes=hot_buffer_drained_nodes, pending_ec2s=still_pending_ec2s, ) @@ -389,7 +385,9 @@ async def _activate_drained_nodes( ) -> Cluster: nodes_to_activate = [ node - for node in itertools.chain(cluster.drained_nodes, cluster.buffer_drained_nodes) + for node in itertools.chain( + cluster.drained_nodes, cluster.hot_buffer_drained_nodes + ) if node.assigned_tasks ] @@ -412,19 +410,19 @@ async def _activate_drained_nodes( ] remaining_reserved_drained_nodes = [ node - for node in cluster.buffer_drained_nodes + for node in cluster.hot_buffer_drained_nodes if node.ec2_instance.id not in new_active_node_ids ] return dataclasses.replace( cluster, active_nodes=cluster.active_nodes + activated_nodes, drained_nodes=remaining_drained_nodes, - buffer_drained_nodes=remaining_reserved_drained_nodes, + hot_buffer_drained_nodes=remaining_reserved_drained_nodes, ) async def _start_warm_buffer_instances( - app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling + app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider ) -> Cluster: """starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed""" @@ -432,11 +430,11 @@ async def _start_warm_buffer_instances( assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec instances_to_start = [ - i.ec2_instance for i in cluster.buffer_ec2s if i.assigned_tasks + i.ec2_instance for i in cluster.warm_buffer_ec2s if i.assigned_tasks ] if ( - len(cluster.buffer_drained_nodes) + len(cluster.hot_buffer_drained_nodes) < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER ): # check if we can migrate warm buffers to hot buffers @@ -448,7 +446,7 @@ async def _start_warm_buffer_instances( ) free_startable_warm_buffers_to_replace_hot_buffers = [ warm_buffer.ec2_instance - for warm_buffer in cluster.buffer_ec2s + for warm_buffer in cluster.warm_buffer_ec2s if (warm_buffer.ec2_instance.type == hot_buffer_instance_type) and not warm_buffer.assigned_tasks ] @@ -462,7 +460,7 @@ async def _start_warm_buffer_instances( instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[ : app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - - len(cluster.buffer_drained_nodes) + - len(cluster.hot_buffer_drained_nodes) - len(unnassigned_pending_ec2s) - len(unnassigned_pending_nodes) ] @@ -471,7 +469,7 @@ async def _start_warm_buffer_instances( return cluster with log_context( - _logger, logging.INFO, f"start {len(instances_to_start)} buffer machines" + _logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines" ): started_instances = await get_ec2_client(app).start_instances( instances_to_start @@ -479,15 +477,17 @@ async def _start_warm_buffer_instances( # NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity) await get_ec2_client(app).set_instances_tags( started_instances, - tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode), + tags=get_activated_warm_buffer_ec2_tags( + auto_scaling_mode.get_ec2_tags(app) + ), ) started_instance_ids = [i.id for i in started_instances] return dataclasses.replace( cluster, - buffer_ec2s=[ + warm_buffer_ec2s=[ i - for i in cluster.buffer_ec2s + for i in cluster.warm_buffer_ec2s if i.ec2_instance.id not in started_instance_ids ], pending_ec2s=cluster.pending_ec2s @@ -547,7 +547,7 @@ async def _assign_tasks_to_current_cluster( app: FastAPI, tasks: list, cluster: Cluster, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, ) -> tuple[list, Cluster]: """ Evaluates whether a task can be executed on any instance within the cluster. If the task's resource requirements are met, the task is *denoted* as assigned to the cluster. @@ -563,10 +563,10 @@ async def _assign_tasks_to_current_cluster( functools.partial(_try_assign_task_to_ec2_instance, instances=instances) for instances in ( cluster.active_nodes, - cluster.drained_nodes + cluster.buffer_drained_nodes, + cluster.drained_nodes + cluster.hot_buffer_drained_nodes, cluster.pending_nodes, cluster.pending_ec2s, - cluster.buffer_ec2s, + cluster.warm_buffer_ec2s, ) ] @@ -605,7 +605,7 @@ async def _find_needed_instances( unassigned_tasks: list, available_ec2_types: list[EC2InstanceType], cluster: Cluster, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, ) -> dict[EC2InstanceType, int]: # 1. check first the pending task needs needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType] = [] @@ -634,8 +634,8 @@ async def _find_needed_instances( defined_ec2 = find_selected_instance_type_for_task( task_required_ec2_instance, available_ec2_types, - auto_scaling_mode, task, + auto_scaling_mode.get_task_required_resources(task), ) needed_new_instance_types_for_tasks.append( AssignedTasksToInstanceType( @@ -684,13 +684,13 @@ async def _find_needed_instances( ), ) - # 2. check the buffer needs + # 2. check the hot buffer needs app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec if ( num_missing_nodes := ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - - len(cluster.buffer_drained_nodes) + - len(cluster.hot_buffer_drained_nodes) ) ) > 0: # check if some are already pending @@ -699,9 +699,9 @@ async def _find_needed_instances( ] + [i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks] if len(remaining_pending_instances) < ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - - len(cluster.buffer_drained_nodes) + - len(cluster.hot_buffer_drained_nodes) ): - default_instance_type = get_machine_buffer_type(available_ec2_types) + default_instance_type = get_hot_buffer_type(available_ec2_types) num_instances_per_type[default_instance_type] += num_missing_nodes return num_instances_per_type @@ -778,7 +778,7 @@ async def _launch_instances( app: FastAPI, needed_instances: dict[EC2InstanceType, int], tasks: list, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, ) -> list[EC2InstanceData]: ec2_client = get_ec2_client(app) app_settings = get_application_settings(app) @@ -847,8 +847,8 @@ async def _launch_instances( new_pending_instances.append(r) log_message = ( - f"{sum(n for n in capped_needed_machines.values())} new machines launched" - ", it might take up to 3 minutes to start, Please wait..." + f"{sum(capped_needed_machines.values())} new machines launched" + f", it might take up to {timedelta_as_minute_second(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME)} minutes to start, Please wait..." ) await post_tasks_log_message( app, tasks=tasks, message=log_message, level=logging.INFO @@ -951,7 +951,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: ) -async def _find_terminateable_instances( +def _find_terminateable_instances( app: FastAPI, cluster: Cluster ) -> list[AssociatedInstance]: app_settings: ApplicationSettings = app.state.settings @@ -996,7 +996,7 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec # instances found to be terminateable will now start the termination process. new_terminating_instances = [] - for instance in await _find_terminateable_instances(app, cluster): + for instance in _find_terminateable_instances(app, cluster): assert instance.node.description is not None # nosec with ( log_context( @@ -1150,7 +1150,7 @@ async def _drain_retired_nodes( async def _scale_down_unused_cluster_instances( app: FastAPI, cluster: Cluster, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, ) -> Cluster: await auto_scaling_mode.try_retire_nodes(app) cluster = await _deactivate_empty_nodes(app, cluster) @@ -1160,14 +1160,14 @@ async def _scale_down_unused_cluster_instances( async def _scale_up_cluster( app: FastAPI, cluster: Cluster, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, allowed_instance_types: list[EC2InstanceType], unassigned_tasks: list, ) -> Cluster: app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec if not unassigned_tasks and ( - len(cluster.buffer_drained_nodes) + len(cluster.hot_buffer_drained_nodes) >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER ): return cluster @@ -1212,7 +1212,7 @@ async def _scale_up_cluster( async def _autoscale_cluster( app: FastAPI, cluster: Cluster, - auto_scaling_mode: BaseAutoscaling, + auto_scaling_mode: AutoscalingProvider, allowed_instance_types: list[EC2InstanceType], ) -> Cluster: # 1. check if we have pending tasks @@ -1245,11 +1245,13 @@ async def _autoscale_cluster( async def _notify_autoscaling_status( - app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling + app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider ) -> None: monitored_instances = list( itertools.chain( - cluster.active_nodes, cluster.drained_nodes, cluster.buffer_drained_nodes + cluster.active_nodes, + cluster.drained_nodes, + cluster.hot_buffer_drained_nodes, ) ) @@ -1274,7 +1276,7 @@ async def _notify_autoscaling_status( async def auto_scale_cluster( - *, app: FastAPI, auto_scaling_mode: BaseAutoscaling + *, app: FastAPI, auto_scaling_mode: AutoscalingProvider ) -> None: """Check that there are no pending tasks requiring additional resources in the cluster (docker swarm) If there are such tasks, this method will allocate new machines in AWS to cope with @@ -1289,7 +1291,7 @@ async def auto_scale_cluster( # cleanup cluster = await _cleanup_disconnected_nodes(app, cluster) cluster = await _terminate_broken_ec2s(app, cluster) - cluster = await _make_pending_buffer_ec2s_join_cluster(app, cluster) + cluster = await _make_pending_warm_buffer_ec2s_join_cluster(app, cluster) cluster = await _try_attach_pending_ec2s( app, cluster, auto_scaling_mode, allowed_instance_types ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py similarity index 75% rename from services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py index 2fb2344f22f..d5d28f99710 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py @@ -14,18 +14,17 @@ from servicelib.utils import logged_gather from types_aiobotocore_ec2.literals import InstanceTypeType -from ..core.errors import ( +from ...core.errors import ( DaskNoWorkersError, DaskSchedulerNotFoundError, DaskWorkerNotFoundError, ) -from ..core.settings import get_application_settings -from ..models import AssociatedInstance, DaskTask -from ..utils import computational_scaling as utils -from ..utils import utils_docker, utils_ec2 -from . import dask -from .auto_scaling_mode_base import BaseAutoscaling -from .docker import get_docker_client +from ...core.settings import get_application_settings +from ...models import AssociatedInstance, DaskTask +from ...utils import utils_docker, utils_ec2 +from .. import dask +from ..docker import get_docker_client +from . import _utils_computational as utils _logger = logging.getLogger(__name__) @@ -42,27 +41,27 @@ def _scheduler_auth(app: FastAPI) -> ClusterAuthentication: return app_settings.AUTOSCALING_DASK.DASK_SCHEDULER_AUTH -class ComputationalAutoscaling(BaseAutoscaling): - @staticmethod - async def get_monitored_nodes(app: FastAPI) -> list[Node]: +class ComputationalAutoscalingProvider: + async def get_monitored_nodes(self, app: FastAPI) -> list[Node]: + assert self # nosec return await utils_docker.get_worker_nodes(get_docker_client(app)) - @staticmethod - def get_ec2_tags(app: FastAPI) -> EC2Tags: + def get_ec2_tags(self, app: FastAPI) -> EC2Tags: + assert self # nosec app_settings = get_application_settings(app) return utils_ec2.get_ec2_tags_computational(app_settings) - @staticmethod def get_new_node_docker_tags( - app: FastAPI, ec2_instance_data: EC2InstanceData + self, app: FastAPI, ec2_instance_data: EC2InstanceData ) -> dict[DockerLabelKey, str]: + assert self # nosec assert app # nosec return { DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: ec2_instance_data.type } - @staticmethod - async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: + async def list_unrunnable_tasks(self, app: FastAPI) -> list[DaskTask]: + assert self # nosec try: unrunnable_tasks = await dask.list_unrunnable_tasks( _scheduler_url(app), _scheduler_auth(app) @@ -87,19 +86,21 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: ) return [] - @staticmethod - def get_task_required_resources(task) -> Resources: + def get_task_required_resources(self, task) -> Resources: + assert self # nosec return utils.resources_from_dask_task(task) - @staticmethod - async def get_task_defined_instance(app: FastAPI, task) -> InstanceTypeType | None: + async def get_task_defined_instance( + self, app: FastAPI, task + ) -> InstanceTypeType | None: + assert self # nosec assert app # nosec return cast(InstanceTypeType | None, utils.get_task_instance_restriction(task)) - @staticmethod async def compute_node_used_resources( - app: FastAPI, instance: AssociatedInstance + self, app: FastAPI, instance: AssociatedInstance ) -> Resources: + assert self # nosec try: resource = await dask.get_worker_used_resources( _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance @@ -127,25 +128,22 @@ async def compute_node_used_resources( _logger.debug("no resource found for %s", f"{instance.ec2_instance.id}") return Resources.create_as_empty() - @staticmethod async def compute_cluster_used_resources( - app: FastAPI, instances: list[AssociatedInstance] + self, app: FastAPI, instances: list[AssociatedInstance] ) -> Resources: + assert self # nosec list_of_used_resources: list[Resources] = await logged_gather( - *( - ComputationalAutoscaling.compute_node_used_resources(app, i) - for i in instances - ) + *(self.compute_node_used_resources(app, i) for i in instances) ) - counter = collections.Counter({k: 0 for k in Resources.model_fields}) + counter = collections.Counter(dict.fromkeys(Resources.model_fields, 0)) for result in list_of_used_resources: counter.update(result.model_dump()) return Resources.model_validate(dict(counter)) - @staticmethod async def compute_cluster_total_resources( - app: FastAPI, instances: list[AssociatedInstance] + self, app: FastAPI, instances: list[AssociatedInstance] ) -> Resources: + assert self # nosec try: return await dask.compute_cluster_total_resources( _scheduler_url(app), _scheduler_auth(app), instances @@ -153,8 +151,10 @@ async def compute_cluster_total_resources( except DaskNoWorkersError: return Resources.create_as_empty() - @staticmethod - async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: + async def is_instance_active( + self, app: FastAPI, instance: AssociatedInstance + ) -> bool: + assert self # nosec if not utils_docker.is_node_osparc_ready(instance.node): return False @@ -163,14 +163,16 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) - @staticmethod - async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + async def is_instance_retired( + self, app: FastAPI, instance: AssociatedInstance + ) -> bool: + assert self # nosec if not utils_docker.is_node_osparc_ready(instance.node): return False return await dask.is_worker_retired( _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) - @staticmethod - async def try_retire_nodes(app: FastAPI) -> None: + async def try_retire_nodes(self, app: FastAPI) -> None: + assert self # nosec await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py similarity index 64% rename from services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py index a8dcd7552ac..e6dbca840e3 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py @@ -4,16 +4,15 @@ from models_library.generated_models.docker_rest_api import Node, Task from types_aiobotocore_ec2.literals import InstanceTypeType -from ..core.settings import get_application_settings -from ..models import AssociatedInstance -from ..utils import utils_docker, utils_ec2 -from .auto_scaling_mode_base import BaseAutoscaling -from .docker import get_docker_client +from ...core.settings import get_application_settings +from ...models import AssociatedInstance +from ...utils import utils_docker, utils_ec2 +from ..docker import get_docker_client -class DynamicAutoscaling(BaseAutoscaling): - @staticmethod - async def get_monitored_nodes(app: FastAPI) -> list[Node]: +class DynamicAutoscalingProvider: + async def get_monitored_nodes(self, app: FastAPI) -> list[Node]: + assert self # nosec app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_NODES_MONITORING # nosec return await utils_docker.get_monitored_nodes( @@ -21,20 +20,20 @@ async def get_monitored_nodes(app: FastAPI) -> list[Node]: node_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS, ) - @staticmethod - def get_ec2_tags(app: FastAPI) -> EC2Tags: + def get_ec2_tags(self, app: FastAPI) -> EC2Tags: + assert self # nosec app_settings = get_application_settings(app) return utils_ec2.get_ec2_tags_dynamic(app_settings) - @staticmethod def get_new_node_docker_tags( - app: FastAPI, ec2_instance_data: EC2InstanceData + self, app: FastAPI, ec2_instance_data: EC2InstanceData ) -> dict[DockerLabelKey, str]: + assert self # nosec app_settings = get_application_settings(app) return utils_docker.get_new_node_docker_tags(app_settings, ec2_instance_data) - @staticmethod - async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: + async def list_unrunnable_tasks(self, app: FastAPI) -> list[Task]: + assert self # nosec app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_NODES_MONITORING # nosec return await utils_docker.pending_service_tasks_with_insufficient_resources( @@ -42,20 +41,22 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS, ) - @staticmethod - def get_task_required_resources(task) -> Resources: + def get_task_required_resources(self, task) -> Resources: + assert self # nosec return utils_docker.get_max_resources_from_docker_task(task) - @staticmethod - async def get_task_defined_instance(app: FastAPI, task) -> InstanceTypeType | None: + async def get_task_defined_instance( + self, app: FastAPI, task + ) -> InstanceTypeType | None: + assert self # nosec return await utils_docker.get_task_instance_restriction( get_docker_client(app), task ) - @staticmethod async def compute_node_used_resources( - app: FastAPI, instance: AssociatedInstance + self, app: FastAPI, instance: AssociatedInstance ) -> Resources: + assert self # nosec docker_client = get_docker_client(app) app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_NODES_MONITORING # nosec @@ -65,37 +66,41 @@ async def compute_node_used_resources( service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS, ) - @staticmethod async def compute_cluster_used_resources( - app: FastAPI, instances: list[AssociatedInstance] + self, app: FastAPI, instances: list[AssociatedInstance] ) -> Resources: + assert self # nosec docker_client = get_docker_client(app) return await utils_docker.compute_cluster_used_resources( docker_client, [i.node for i in instances] ) - @staticmethod async def compute_cluster_total_resources( - app: FastAPI, instances: list[AssociatedInstance] + self, app: FastAPI, instances: list[AssociatedInstance] ) -> Resources: + assert self # nosec assert app # nosec return await utils_docker.compute_cluster_total_resources( [i.node for i in instances] ) - @staticmethod - async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: + async def is_instance_active( + self, app: FastAPI, instance: AssociatedInstance + ) -> bool: + assert self # nosec assert app # nosec return utils_docker.is_node_osparc_ready(instance.node) - @staticmethod - async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + async def is_instance_retired( + self, app: FastAPI, instance: AssociatedInstance + ) -> bool: + assert self # nosec assert app # nosec assert instance # nosec # nothing to do here return False - @staticmethod - async def try_retire_nodes(app: FastAPI) -> None: + async def try_retire_nodes(self, app: FastAPI) -> None: + assert self # nosec assert app # nosec # nothing to do here diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py new file mode 100644 index 00000000000..355394b9f1d --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py @@ -0,0 +1,49 @@ +from typing import Protocol + +from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources +from fastapi import FastAPI +from models_library.docker import DockerLabelKey +from models_library.generated_models.docker_rest_api import Node as DockerNode +from types_aiobotocore_ec2.literals import InstanceTypeType + +from ...models import AssociatedInstance + + +class AutoscalingProvider(Protocol): + async def get_monitored_nodes(self, app: FastAPI) -> list[DockerNode]: ... + + def get_ec2_tags(self, app: FastAPI) -> EC2Tags: ... + + def get_new_node_docker_tags( + self, app: FastAPI, ec2_instance_data: EC2InstanceData + ) -> dict[DockerLabelKey, str]: ... + + async def list_unrunnable_tasks(self, app: FastAPI) -> list: ... + + def get_task_required_resources(self, task) -> Resources: ... + + async def get_task_defined_instance( + self, app: FastAPI, task + ) -> InstanceTypeType | None: ... + + async def compute_node_used_resources( + self, app: FastAPI, instance: AssociatedInstance + ) -> Resources: ... + + async def compute_cluster_used_resources( + self, app: FastAPI, instances: list[AssociatedInstance] + ) -> Resources: ... + + async def compute_cluster_total_resources( + self, app: FastAPI, instances: list[AssociatedInstance] + ) -> Resources: ... + + async def is_instance_active( + self, app: FastAPI, instance: AssociatedInstance + ) -> bool: ... + + async def is_instance_retired( + self, app: FastAPI, instance: AssociatedInstance + ) -> bool: ... + + async def try_retire_nodes(self, app: FastAPI) -> None: ... diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py similarity index 95% rename from services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py index 07c55bf746a..4fb76ee5e12 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py @@ -6,7 +6,7 @@ get_ec2_instance_type_from_resources, ) -from ..models import DaskTask +from ...models import DaskTask _logger = logging.getLogger(__name__) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py similarity index 76% rename from services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py index d9f1c550568..79de4ccdc16 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py @@ -20,7 +20,6 @@ import arrow from aws_library.ec2 import ( - AWSTagValue, EC2InstanceConfig, EC2InstanceData, EC2InstanceType, @@ -35,94 +34,132 @@ from servicelib.logging_utils import log_context from types_aiobotocore_ec2.literals import InstanceTypeType -from ..constants import ( +from ...constants import ( BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, BUFFER_MACHINE_PULLING_EC2_TAG_KEY, DOCKER_PULL_COMMAND, PREPULL_COMMAND_NAME, ) -from ..core.settings import get_application_settings -from ..models import BufferPool, BufferPoolManager -from ..utils.auto_scaling_core import ec2_buffer_startup_script -from ..utils.buffer_machines_pool_core import ( +from ...core.settings import get_application_settings +from ...models import WarmBufferPool, WarmBufferPoolManager +from ...utils.warm_buffer_machines import ( dump_pre_pulled_images_as_tags, - get_deactivated_buffer_ec2_tags, + ec2_warm_buffer_startup_script, + get_deactivated_warm_buffer_ec2_tags, load_pre_pulled_images_from_tags, ) -from .auto_scaling_mode_base import BaseAutoscaling -from .ec2 import get_ec2_client -from .instrumentation import get_instrumentation, has_instrumentation -from .ssm import get_ssm_client +from ..ec2 import get_ec2_client +from ..instrumentation import get_instrumentation, has_instrumentation +from ..ssm import get_ssm_client +from ._provider_protocol import AutoscalingProvider _logger = logging.getLogger(__name__) -async def _analyze_running_instance_state( - app: FastAPI, *, buffer_pool: BufferPool, instance: EC2InstanceData -): - ssm_client = get_ssm_client(app) +def _record_instance_ready_metrics(app: FastAPI, *, instance: EC2InstanceData) -> None: + """Record metrics for instances ready to pull images.""" + if has_instrumentation(app): + get_instrumentation( + app + ).buffer_machines_pools_metrics.instances_ready_to_pull_seconds.labels( + instance_type=instance.type + ).observe( + (arrow.utcnow().datetime - instance.launch_time).total_seconds() + ) + + +def _handle_completed_cloud_init_instance( + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData +) -> None: + """Handle instance that has completed cloud init.""" app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - if BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags: - buffer_pool.pulling_instances.add(instance) - elif await ssm_client.is_instance_connected_to_ssm_server(instance.id): - try: - if await ssm_client.wait_for_has_instance_completed_cloud_init(instance.id): - if has_instrumentation(app): - get_instrumentation( - app - ).buffer_machines_pools_metrics.instances_ready_to_pull_seconds.labels( - instance_type=instance.type - ).observe( - (arrow.utcnow().datetime - instance.launch_time).total_seconds() - ) - if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ - instance.type - ].pre_pull_images: - buffer_pool.waiting_to_pull_instances.add(instance) - else: - buffer_pool.waiting_to_stop_instances.add(instance) - else: - buffer_pool.pending_instances.add(instance) - except ( - SSMCommandExecutionResultError, - SSMCommandExecutionTimeoutError, - ): - _logger.exception( - "Unnexpected error when checking EC2 cloud initialization completion!. " - "The machine will be terminated. TIP: check the initialization phase for errors." - ) - buffer_pool.broken_instances.add(instance) + _record_instance_ready_metrics(app, instance=instance) + + if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + instance.type + ].pre_pull_images: + buffer_pool.waiting_to_pull_instances.add(instance) else: - is_broken = bool( - (arrow.utcnow().datetime - instance.launch_time) - > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME - ) + buffer_pool.waiting_to_stop_instances.add(instance) + - if is_broken: - _logger.error( - "The machine does not connect to the SSM server after %s. It will be terminated. TIP: check the initialization phase for errors.", - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME, +async def _handle_ssm_connected_instance( + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData +) -> None: + """Handle instance connected to SSM server.""" + ssm_client = get_ssm_client(app) + + try: + if await ssm_client.wait_for_has_instance_completed_cloud_init(instance.id): + _handle_completed_cloud_init_instance( + app, buffer_pool=buffer_pool, instance=instance ) - buffer_pool.broken_instances.add(instance) else: buffer_pool.pending_instances.add(instance) + except ( + SSMCommandExecutionResultError, + SSMCommandExecutionTimeoutError, + ): + _logger.exception( + "Unnexpected error when checking EC2 cloud initialization completion!. " + "The machine will be terminated. TIP: check the initialization phase for errors." + ) + buffer_pool.broken_instances.add(instance) + + +def _handle_unconnected_instance( + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData +) -> None: + """Handle instance not connected to SSM server.""" + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + + is_broken = bool( + (arrow.utcnow().datetime - instance.launch_time) + > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + ) + + if is_broken: + _logger.error( + "The machine does not connect to the SSM server after %s. It will be terminated. TIP: check the initialization phase for errors.", + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME, + ) + buffer_pool.broken_instances.add(instance) + else: + buffer_pool.pending_instances.add(instance) + + +async def _analyze_running_instance_state( + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData +) -> None: + """Analyze and categorize running instance based on its current state.""" + ssm_client = get_ssm_client(app) + + if BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags: + buffer_pool.pulling_instances.add(instance) + elif await ssm_client.is_instance_connected_to_ssm_server(instance.id): + await _handle_ssm_connected_instance( + app, buffer_pool=buffer_pool, instance=instance + ) + else: + _handle_unconnected_instance(app, buffer_pool=buffer_pool, instance=instance) async def _analyse_current_state( - app: FastAPI, *, auto_scaling_mode: BaseAutoscaling -) -> BufferPoolManager: + app: FastAPI, *, auto_scaling_mode: AutoscalingProvider +) -> WarmBufferPoolManager: ec2_client = get_ec2_client(app) app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec all_buffer_instances = await ec2_client.get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], - tags=get_deactivated_buffer_ec2_tags(app, auto_scaling_mode), + tags=get_deactivated_warm_buffer_ec2_tags(auto_scaling_mode.get_ec2_tags(app)), state_names=["stopped", "pending", "running", "stopping"], ) - buffers_manager = BufferPoolManager() + buffers_manager = WarmBufferPoolManager() for instance in all_buffer_instances: match instance.state: case "stopped": @@ -150,8 +187,8 @@ async def _analyse_current_state( async def _terminate_unneeded_pools( app: FastAPI, - buffers_manager: BufferPoolManager, -) -> BufferPoolManager: + buffers_manager: WarmBufferPoolManager, +) -> WarmBufferPoolManager: ec2_client = get_ec2_client(app) app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec @@ -178,8 +215,8 @@ async def _terminate_unneeded_pools( async def _terminate_instances_with_invalid_pre_pulled_images( - app: FastAPI, buffers_manager: BufferPoolManager -) -> BufferPoolManager: + app: FastAPI, buffers_manager: WarmBufferPoolManager +) -> WarmBufferPoolManager: ec2_client = get_ec2_client(app) app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec @@ -212,8 +249,8 @@ async def _terminate_instances_with_invalid_pre_pulled_images( async def _terminate_broken_instances( - app: FastAPI, buffers_manager: BufferPoolManager -) -> BufferPoolManager: + app: FastAPI, buffers_manager: WarmBufferPoolManager +) -> WarmBufferPoolManager: ec2_client = get_ec2_client(app) termineatable_instances = set() for pool in buffers_manager.buffer_pools.values(): @@ -227,10 +264,10 @@ async def _terminate_broken_instances( async def _add_remove_buffer_instances( app: FastAPI, - buffers_manager: BufferPoolManager, + buffers_manager: WarmBufferPoolManager, *, - auto_scaling_mode: BaseAutoscaling, -) -> BufferPoolManager: + auto_scaling_mode: AutoscalingProvider, +) -> WarmBufferPoolManager: ec2_client = get_ec2_client(app) app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec @@ -265,8 +302,10 @@ async def _add_remove_buffer_instances( name=ec2_type, resources=Resources.create_as_empty(), # fake resources ), - tags=get_deactivated_buffer_ec2_tags(app, auto_scaling_mode), - startup_script=ec2_buffer_startup_script( + tags=get_deactivated_warm_buffer_ec2_tags( + auto_scaling_mode.get_ec2_tags(app) + ), + startup_script=ec2_warm_buffer_startup_script( ec2_boot_specific, app_settings ), ami_id=ec2_boot_specific.ami_id, @@ -291,7 +330,7 @@ async def _add_remove_buffer_instances( async def _handle_pool_image_pulling( - app: FastAPI, instance_type: InstanceTypeType, pool: BufferPool + app: FastAPI, instance_type: InstanceTypeType, pool: WarmBufferPool ) -> tuple[InstancesToStop, InstancesToTerminate]: ec2_client = get_ec2_client(app) ssm_client = get_ssm_client(app) @@ -305,10 +344,8 @@ async def _handle_pool_image_pulling( await ec2_client.set_instances_tags( tuple(pool.waiting_to_pull_instances), tags={ - BUFFER_MACHINE_PULLING_EC2_TAG_KEY: AWSTagValue("true"), - BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: AWSTagValue( - ssm_command.command_id - ), + BUFFER_MACHINE_PULLING_EC2_TAG_KEY: "true", + BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: ssm_command.command_id, }, ) @@ -361,7 +398,7 @@ async def _handle_pool_image_pulling( async def _handle_image_pre_pulling( - app: FastAPI, buffers_manager: BufferPoolManager + app: FastAPI, buffers_manager: WarmBufferPoolManager ) -> None: ec2_client = get_ec2_client(app) instances_to_stop: set[EC2InstanceData] = set() @@ -397,7 +434,7 @@ async def _handle_image_pre_pulling( async def monitor_buffer_machines( - app: FastAPI, *, auto_scaling_mode: BaseAutoscaling + app: FastAPI, *, auto_scaling_mode: AutoscalingProvider ) -> None: """Buffer machine creation works like so: 1. a EC2 is created with an EBS attached volume wO auto prepulling and wO auto connect to swarm diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/auto_scaling_task.py similarity index 82% rename from services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/auto_scaling_task.py index b3d5ab8e193..fd3f43975c4 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/auto_scaling_task.py @@ -7,12 +7,12 @@ from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive -from ..core.settings import ApplicationSettings -from ..utils.redis import create_lock_key_and_value -from .auto_scaling_core import auto_scale_cluster -from .auto_scaling_mode_computational import ComputationalAutoscaling -from .auto_scaling_mode_dynamic import DynamicAutoscaling -from .redis import get_redis_client +from ...core.settings import ApplicationSettings +from ...utils.redis import create_lock_key_and_value +from ..redis import get_redis_client +from ._auto_scaling_core import auto_scale_cluster +from ._provider_computational import ComputationalAutoscalingProvider +from ._provider_dynamic import DynamicAutoscalingProvider _TASK_NAME: Final[str] = "Autoscaling EC2 instances" @@ -33,9 +33,9 @@ async def _startup() -> None: task_name=_TASK_NAME, app=app, auto_scaling_mode=( - DynamicAutoscaling() + DynamicAutoscalingProvider() if app_settings.AUTOSCALING_NODES_MONITORING is not None - else ComputationalAutoscaling() + else ComputationalAutoscalingProvider() ), ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/warm_buffer_machines_pool_task.py similarity index 88% rename from services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/warm_buffer_machines_pool_task.py index 625c0170476..26ea0c4e0f8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/warm_buffer_machines_pool_task.py @@ -7,11 +7,11 @@ from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive -from ..core.settings import ApplicationSettings -from ..utils.redis import create_lock_key_and_value -from .auto_scaling_mode_dynamic import DynamicAutoscaling -from .buffer_machines_pool_core import monitor_buffer_machines -from .redis import get_redis_client +from ...core.settings import ApplicationSettings +from ...utils.redis import create_lock_key_and_value +from ..redis import get_redis_client +from ._provider_dynamic import DynamicAutoscalingProvider +from ._warm_buffer_machines_pool_core import monitor_buffer_machines _TASK_NAME_BUFFER: Final[str] = "Autoscaling Buffer Machines Pool" @@ -35,7 +35,7 @@ async def _startup() -> None: interval=app_settings.AUTOSCALING_POLL_INTERVAL, task_name=_TASK_NAME_BUFFER, app=app, - auto_scaling_mode=(DynamicAutoscaling()), + auto_scaling_mode=(DynamicAutoscalingProvider()), ) return _startup diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index d57508babf8..966593295e8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -21,7 +21,7 @@ DaskWorkerNotFoundError, ) from ..models import AssociatedInstance, DaskTask, DaskTaskId -from ..utils.auto_scaling_core import ( +from ..utils.utils_ec2 import ( node_host_name_from_ec2_private_dns, node_ip_from_ec2_private_dns, ) @@ -30,7 +30,7 @@ async def _wrap_client_async_routine( - client_coroutine: Coroutine[Any, Any, Any] | Any | None + client_coroutine: Coroutine[Any, Any, Any] | Any | None, ) -> Any: """Dask async behavior does not go well with Pylance as it returns a union of types. this wrapper makes both mypy and pylance happy""" @@ -96,7 +96,7 @@ def _dask_worker_from_ec2_instance( # dict is of type dask_worker_address: worker_details def _find_by_worker_host( - dask_worker: tuple[DaskWorkerUrl, DaskWorkerDetails] + dask_worker: tuple[DaskWorkerUrl, DaskWorkerDetails], ) -> bool: _, details = dask_worker if match := re.match(DASK_NAME_PATTERN, details["name"]): @@ -173,9 +173,9 @@ def _list_tasks( } async with _scheduler_client(scheduler_url, authentication) as client: - list_of_tasks: dict[ - dask.typing.Key, DaskTaskResources - ] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = ( + await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + ) _logger.debug("found unrunnable tasks: %s", list_of_tasks) return [ DaskTask( @@ -207,10 +207,10 @@ def _list_processing_tasks( return worker_to_processing_tasks async with _scheduler_client(scheduler_url, authentication) as client: - worker_to_tasks: dict[ - str, list[tuple[dask.typing.Key, DaskTaskResources]] - ] = await _wrap_client_async_routine( - client.run_on_scheduler(_list_processing_tasks) + worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler(_list_processing_tasks) + ) ) _logger.debug("found processing tasks: %s", worker_to_tasks) tasks_per_worker = defaultdict(list) @@ -276,12 +276,12 @@ def _list_processing_tasks_on_worker( _logger.debug("looking for processing tasks for %s", f"{worker_url=}") # now get the used resources - worker_processing_tasks: list[ - tuple[dask.typing.Key, DaskTaskResources] - ] = await _wrap_client_async_routine( - client.run_on_scheduler( - _list_processing_tasks_on_worker, worker_url=worker_url - ), + worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler( + _list_processing_tasks_on_worker, worker_url=worker_url + ), + ) ) total_resources_used: collections.Counter[str] = collections.Counter() diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py index 1224ea71907..cc4c2a2126d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py @@ -20,7 +20,7 @@ "Number of EC2-backed docker nodes which are drained", EC2_INSTANCE_LABELS, ), - "buffer_drained_nodes": ( + "hot_buffer_drained_nodes": ( "Number of EC2-backed docker nodes which are drained and in buffer/reserve", EC2_INSTANCE_LABELS, ), @@ -32,7 +32,7 @@ "Number of EC2 instances that failed joining the cluster", EC2_INSTANCE_LABELS, ), - "buffer_ec2s": ( + "warm_buffer_ec2s": ( "Number of buffer EC2 instances prepared, stopped, and ready to be activated", EC2_INSTANCE_LABELS, ), @@ -54,7 +54,7 @@ ), } -BUFFER_POOLS_METRICS_DEFINITIONS: Final[dict[str, tuple[str, tuple[str, ...]]]] = { +WARM_BUFFER_POOLS_METRICS_DEFINITIONS: Final[dict[str, tuple[str, tuple[str, ...]]]] = { "ready_instances": ( "Number of EC2 buffer instances that are ready for use", EC2_INSTANCE_LABELS, diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py index 3831b33b826..82e1790880c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py @@ -4,12 +4,12 @@ from prometheus_client import CollectorRegistry, Counter, Histogram from servicelib.instrumentation import MetricsBase -from ...models import BufferPoolManager, Cluster +from ...models import Cluster, WarmBufferPoolManager from ._constants import ( - BUFFER_POOLS_METRICS_DEFINITIONS, CLUSTER_METRICS_DEFINITIONS, EC2_INSTANCE_LABELS, METRICS_NAMESPACE, + WARM_BUFFER_POOLS_METRICS_DEFINITIONS, ) from ._utils import TrackedGauge, create_gauge @@ -19,10 +19,10 @@ class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attribut active_nodes: TrackedGauge = field(init=False) pending_nodes: TrackedGauge = field(init=False) drained_nodes: TrackedGauge = field(init=False) - buffer_drained_nodes: TrackedGauge = field(init=False) + hot_buffer_drained_nodes: TrackedGauge = field(init=False) pending_ec2s: TrackedGauge = field(init=False) broken_ec2s: TrackedGauge = field(init=False) - buffer_ec2s: TrackedGauge = field(init=False) + warm_buffer_ec2s: TrackedGauge = field(init=False) disconnected_nodes: TrackedGauge = field(init=False) terminating_nodes: TrackedGauge = field(init=False) retired_nodes: TrackedGauge = field(init=False) @@ -110,7 +110,7 @@ def instance_terminated(self, instance_type: str) -> None: @dataclass(slots=True, kw_only=True) -class BufferPoolsMetrics(MetricsBase): +class WarmBufferPoolsMetrics(MetricsBase): ready_instances: TrackedGauge = field(init=False) pending_instances: TrackedGauge = field(init=False) waiting_to_pull_instances: TrackedGauge = field(init=False) @@ -124,7 +124,7 @@ class BufferPoolsMetrics(MetricsBase): def __post_init__(self) -> None: buffer_pools_subsystem = f"{self.subsystem}_buffer_machines_pools" - for field_name, definition in BUFFER_POOLS_METRICS_DEFINITIONS.items(): + for field_name, definition in WARM_BUFFER_POOLS_METRICS_DEFINITIONS.items(): setattr( self, field_name, @@ -165,11 +165,11 @@ def __post_init__(self) -> None: ) def update_from_buffer_pool_manager( - self, buffer_pool_manager: BufferPoolManager + self, buffer_pool_manager: WarmBufferPoolManager ) -> None: flat_pool = buffer_pool_manager.flatten_buffer_pool() - for field_name in BUFFER_POOLS_METRICS_DEFINITIONS: + for field_name in WARM_BUFFER_POOLS_METRICS_DEFINITIONS: tracked_gauge = getattr(self, field_name) assert isinstance(tracked_gauge, TrackedGauge) # nosec instances = getattr(flat_pool, field_name) @@ -183,19 +183,15 @@ class AutoscalingInstrumentation(MetricsBase): cluster_metrics: ClusterMetrics = field(init=False) ec2_client_metrics: EC2ClientMetrics = field(init=False) - buffer_machines_pools_metrics: BufferPoolsMetrics = field(init=False) + buffer_machines_pools_metrics: WarmBufferPoolsMetrics = field(init=False) def __post_init__(self) -> None: self.cluster_metrics = ClusterMetrics( # pylint: disable=unexpected-keyword-arg subsystem=self.subsystem, registry=self.registry ) - self.ec2_client_metrics = ( - EC2ClientMetrics( # pylint: disable=unexpected-keyword-arg - subsystem=self.subsystem, registry=self.registry - ) + self.ec2_client_metrics = EC2ClientMetrics( # pylint: disable=unexpected-keyword-arg + subsystem=self.subsystem, registry=self.registry ) - self.buffer_machines_pools_metrics = ( - BufferPoolsMetrics( # pylint: disable=unexpected-keyword-arg - subsystem=self.subsystem, registry=self.registry - ) + self.buffer_machines_pools_metrics = WarmBufferPoolsMetrics( # pylint: disable=unexpected-keyword-arg + subsystem=self.subsystem, registry=self.registry ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py similarity index 63% rename from services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py rename to services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py index d7f69d50b54..d8a43eecfe2 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py @@ -1,9 +1,13 @@ import functools import logging -import re -from typing import Final +from typing import TypeAlias -from aws_library.ec2 import EC2InstanceBootSpecific, EC2InstanceData, EC2InstanceType +from aws_library.ec2 import ( + EC2InstanceBootSpecific, + EC2InstanceData, + EC2InstanceType, + Resources, +) from models_library.generated_models.docker_rest_api import Node from types_aiobotocore_ec2.literals import InstanceTypeType @@ -14,41 +18,12 @@ ) from ..core.settings import ApplicationSettings from ..models import AssociatedInstance -from ..modules.auto_scaling_mode_base import BaseAutoscaling -from . import utils_docker +from . import utils_docker, utils_ec2 -_EC2_INTERNAL_DNS_RE: Final[re.Pattern] = re.compile(r"^(?Pip-[^.]+).*$") _logger = logging.getLogger(__name__) -def node_host_name_from_ec2_private_dns( - ec2_instance_data: EC2InstanceData, -) -> str: - """returns the node host name 'ip-10-2-3-22' from the ec2 private dns - Raises: - Ec2InvalidDnsNameError: if the dns name does not follow the expected pattern - """ - if match := re.match(_EC2_INTERNAL_DNS_RE, ec2_instance_data.aws_private_dns): - host_name: str = match.group("host_name") - return host_name - raise Ec2InvalidDnsNameError(aws_private_dns_name=ec2_instance_data.aws_private_dns) - - -def node_ip_from_ec2_private_dns( - ec2_instance_data: EC2InstanceData, -) -> str: - """returns the node ipv4 from the ec2 private dns string - Raises: - Ec2InvalidDnsNameError: if the dns name does not follow the expected pattern - """ - return ( - node_host_name_from_ec2_private_dns(ec2_instance_data) - .removeprefix("ip-") - .replace("-", ".") - ) - - -async def associate_ec2_instances_with_nodes( +def associate_ec2_instances_with_nodes( nodes: list[Node], ec2_instances: list[EC2InstanceData] ) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]: """returns the associated and non-associated instances""" @@ -61,7 +36,9 @@ def _find_node_with_name(node: Node) -> bool: for instance_data in ec2_instances: try: - docker_node_name = node_host_name_from_ec2_private_dns(instance_data) + docker_node_name = utils_ec2.node_host_name_from_ec2_private_dns( + instance_data + ) except Ec2InvalidDnsNameError: _logger.exception("Unexpected EC2 private dns name") non_associated_instances.append(instance_data) @@ -108,38 +85,17 @@ async def ec2_startup_script( return " && ".join(startup_commands) -def ec2_buffer_startup_script( - ec2_boot_specific: EC2InstanceBootSpecific, app_settings: ApplicationSettings -) -> str: - startup_commands = ec2_boot_specific.custom_boot_scripts.copy() - if ec2_boot_specific.pre_pull_images: - assert app_settings.AUTOSCALING_REGISTRY # nosec - startup_commands.extend( - ( - utils_docker.get_docker_login_on_start_bash_command( - app_settings.AUTOSCALING_REGISTRY - ), - utils_docker.write_compose_file_command( - ec2_boot_specific.pre_pull_images - ), - ) - ) - return " && ".join(startup_commands) - - def _instance_type_by_type_name( ec2_type: EC2InstanceType, *, type_name: InstanceTypeType | None ) -> bool: - if type_name is None: - return True - return bool(ec2_type.name == type_name) + return type_name is None or ec2_type.name == type_name def find_selected_instance_type_for_task( instance_type_name: InstanceTypeType, available_ec2_types: list[EC2InstanceType], - auto_scaling_mode: BaseAutoscaling, task, + task_required_resources: Resources, ) -> EC2InstanceType: filtered_instances = list( filter( @@ -158,36 +114,33 @@ def find_selected_instance_type_for_task( selected_instance = filtered_instances[0] # check that the assigned resources and the machine resource fit - if ( - auto_scaling_mode.get_task_required_resources(task) - > selected_instance.resources - ): + if task_required_resources > selected_instance.resources: raise TaskRequirementsAboveRequiredEC2InstanceTypeError( task=task, instance_type=selected_instance, - resources=auto_scaling_mode.get_task_required_resources(task), + resources=task_required_resources, ) return selected_instance -def get_machine_buffer_type( +def get_hot_buffer_type( available_ec2_types: list[EC2InstanceType], ) -> EC2InstanceType: assert len(available_ec2_types) > 0 # nosec return available_ec2_types[0] -DrainedNodes = list[AssociatedInstance] -BufferDrainedNodes = list[AssociatedInstance] -TerminatingNodes = list[AssociatedInstance] +DrainedNodes: TypeAlias = list[AssociatedInstance] +HotBufferDrainedNodes: TypeAlias = list[AssociatedInstance] +TerminatingNodes: TypeAlias = list[AssociatedInstance] def sort_drained_nodes( app_settings: ApplicationSettings, all_drained_nodes: list[AssociatedInstance], available_ec2_types: list[EC2InstanceType], -) -> tuple[DrainedNodes, BufferDrainedNodes, TerminatingNodes]: +) -> tuple[DrainedNodes, HotBufferDrainedNodes, TerminatingNodes]: assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec # first sort out the drained nodes that started termination terminating_nodes = [ @@ -199,15 +152,15 @@ def sort_drained_nodes( n for n in all_drained_nodes if n not in terminating_nodes ] # we need to keep in reserve only the drained nodes of the right type - machine_buffer_type = get_machine_buffer_type(available_ec2_types) + hot_buffer_type = get_hot_buffer_type(available_ec2_types) # NOTE: we keep only in buffer the drained nodes with the right EC2 type, AND the right amount - buffer_drained_nodes = [ + hot_buffer_drained_nodes = [ node for node in remaining_drained_nodes - if node.ec2_instance.type == machine_buffer_type.name + if node.ec2_instance.type == hot_buffer_type.name ][: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER] # all the others are "normal" drained nodes and may be terminated at some point other_drained_nodes = [ - node for node in remaining_drained_nodes if node not in buffer_drained_nodes + node for node in remaining_drained_nodes if node not in hot_buffer_drained_nodes ] - return (other_drained_nodes, buffer_drained_nodes, terminating_nodes) + return (other_drained_nodes, hot_buffer_drained_nodes, terminating_nodes) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py index b01c0853bb2..0bcad456561 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py @@ -151,9 +151,9 @@ async def _create_autoscaling_status_message( total_nodes = ( len(cluster.active_nodes) + len(cluster.drained_nodes) - + len(cluster.buffer_drained_nodes) + + len(cluster.hot_buffer_drained_nodes) ) - drained_nodes = len(cluster.drained_nodes) + len(cluster.buffer_drained_nodes) + drained_nodes = len(cluster.drained_nodes) + len(cluster.hot_buffer_drained_nodes) return RabbitAutoscalingStatusMessage.model_construct( origin=origin, diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 9c3f187a78f..ac3ff4325c5 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -35,6 +35,7 @@ from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import ApplicationSettings +from ..models import AssociatedInstance from ..modules.docker import AutoscalingDocker logger = logging.getLogger(__name__) @@ -278,24 +279,32 @@ def get_max_resources_from_docker_task(task: Task) -> Resources: return Resources( cpus=max( ( - task.spec.resources.reservations - and task.spec.resources.reservations.nano_cp_us + ( + task.spec.resources.reservations + and task.spec.resources.reservations.nano_cp_us + ) or 0 ), ( - task.spec.resources.limits - and task.spec.resources.limits.nano_cp_us + ( + task.spec.resources.limits + and task.spec.resources.limits.nano_cp_us + ) or 0 ), ) / _NANO_CPU, ram=TypeAdapter(ByteSize).validate_python( max( - task.spec.resources.reservations - and task.spec.resources.reservations.memory_bytes + ( + task.spec.resources.reservations + and task.spec.resources.reservations.memory_bytes + ) or 0, - task.spec.resources.limits - and task.spec.resources.limits.memory_bytes + ( + task.spec.resources.limits + and task.spec.resources.limits.memory_bytes + ) or 0, ) ), @@ -382,7 +391,7 @@ async def compute_cluster_used_resources( list_of_used_resources = await logged_gather( *(compute_node_used_resources(docker_client, node) for node in nodes) ) - counter = collections.Counter({k: 0 for k in list(Resources.model_fields)}) + counter = collections.Counter(dict.fromkeys(list(Resources.model_fields), 0)) for result in list_of_used_resources: counter.update(result.model_dump()) @@ -570,14 +579,14 @@ def get_new_node_docker_tags( ) -> dict[DockerLabelKey, str]: assert app_settings.AUTOSCALING_NODES_MONITORING # nosec return ( - { - tag_key: "true" - for tag_key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS - } - | { - tag_key: "true" - for tag_key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS - } + dict.fromkeys( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS, + "true", + ) + | dict.fromkeys( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS, + "true", + ) | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: ec2_instance.type} ) @@ -601,6 +610,10 @@ def is_node_osparc_ready(node: Node) -> bool: ) +def is_instance_drained(instance: AssociatedInstance) -> bool: + return not is_node_osparc_ready(instance.node) + + async def set_node_osparc_ready( app_settings: ApplicationSettings, docker_client: AutoscalingDocker, @@ -702,3 +715,8 @@ async def attach_node( tags=new_tags, available=app_settings.AUTOSCALING_DRAIN_NODES_WITH_LABELS, # NOTE: full drain sometimes impede on performance ) + + +def is_node_ready(node: Node) -> bool: + assert node.status # nosec + return bool(node.status.state is NodeState.ready) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py index b3b76a48717..4e72b5493b4 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py @@ -1,18 +1,27 @@ """Free helper functions for AWS API""" import logging +import re from collections import OrderedDict from collections.abc import Callable from textwrap import dedent +from typing import Final from aws_library.ec2 import AWSTagKey, AWSTagValue, EC2InstanceType, EC2Tags, Resources +from aws_library.ec2._models import EC2InstanceData from common_library.json_serialization import json_dumps from .._meta import VERSION -from ..core.errors import ConfigurationError, TaskBestFittingInstanceNotFoundError +from ..core.errors import ( + ConfigurationError, + Ec2InvalidDnsNameError, + TaskBestFittingInstanceNotFoundError, +) from ..core.settings import ApplicationSettings -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) + +_EC2_INTERNAL_DNS_RE: Final[re.Pattern] = re.compile(r"^(?Pip-[^.]+)\..+$") def get_ec2_tags_dynamic(app_settings: ApplicationSettings) -> EC2Tags: @@ -105,3 +114,30 @@ def find_best_fitting_ec2_instance( raise TaskBestFittingInstanceNotFoundError(needed_resources=resources) return instance + + +def node_host_name_from_ec2_private_dns( + ec2_instance_data: EC2InstanceData, +) -> str: + """returns the node host name 'ip-10-2-3-22' from the ec2 private dns + Raises: + Ec2InvalidDnsNameError: if the dns name does not follow the expected pattern + """ + if match := re.match(_EC2_INTERNAL_DNS_RE, ec2_instance_data.aws_private_dns): + host_name: str = match.group("host_name") + return host_name + raise Ec2InvalidDnsNameError(aws_private_dns_name=ec2_instance_data.aws_private_dns) + + +def node_ip_from_ec2_private_dns( + ec2_instance_data: EC2InstanceData, +) -> str: + """returns the node ipv4 from the ec2 private dns string + Raises: + Ec2InvalidDnsNameError: if the dns name does not follow the expected pattern + """ + return ( + node_host_name_from_ec2_private_dns(ec2_instance_data) + .removeprefix("ip-") + .replace("-", ".") + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py similarity index 64% rename from services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py rename to services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py index 66ff7972306..3e331bc2f97 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py @@ -3,8 +3,8 @@ from typing import Final from aws_library.ec2 import AWS_TAG_VALUE_MAX_LENGTH, AWSTagKey, AWSTagValue, EC2Tags +from aws_library.ec2._models import EC2InstanceBootSpecific from common_library.json_serialization import json_dumps -from fastapi import FastAPI from models_library.docker import DockerGenericTag from pydantic import TypeAdapter @@ -15,30 +15,25 @@ PRE_PULLED_IMAGES_EC2_TAG_KEY, PRE_PULLED_IMAGES_RE, ) -from ..modules.auto_scaling_mode_base import BaseAutoscaling +from ..core.settings import ApplicationSettings +from . import utils_docker _NAME_EC2_TAG_KEY: Final[AWSTagKey] = TypeAdapter(AWSTagKey).validate_python("Name") -def get_activated_buffer_ec2_tags( - app: FastAPI, auto_scaling_mode: BaseAutoscaling -) -> EC2Tags: - return auto_scaling_mode.get_ec2_tags(app) | ACTIVATED_BUFFER_MACHINE_EC2_TAGS +def get_activated_warm_buffer_ec2_tags(base_ec2_tags: EC2Tags) -> EC2Tags: + return base_ec2_tags | ACTIVATED_BUFFER_MACHINE_EC2_TAGS -def get_deactivated_buffer_ec2_tags( - app: FastAPI, auto_scaling_mode: BaseAutoscaling -) -> EC2Tags: - base_ec2_tags = ( - auto_scaling_mode.get_ec2_tags(app) | DEACTIVATED_BUFFER_MACHINE_EC2_TAGS +def get_deactivated_warm_buffer_ec2_tags(base_ec2_tags: EC2Tags) -> EC2Tags: + new_base_ec2_tags = base_ec2_tags | DEACTIVATED_BUFFER_MACHINE_EC2_TAGS + new_base_ec2_tags[_NAME_EC2_TAG_KEY] = TypeAdapter(AWSTagValue).validate_python( + f"{new_base_ec2_tags[_NAME_EC2_TAG_KEY]}-buffer" ) - base_ec2_tags[_NAME_EC2_TAG_KEY] = AWSTagValue( - f"{base_ec2_tags[_NAME_EC2_TAG_KEY]}-buffer" - ) - return base_ec2_tags + return new_base_ec2_tags -def is_buffer_machine(tags: EC2Tags) -> bool: +def is_warm_buffer_machine(tags: EC2Tags) -> bool: return bool(BUFFER_MACHINE_TAG_KEY in tags) @@ -93,3 +88,22 @@ def load_pre_pulled_images_from_tags(tags: EC2Tags) -> list[DockerGenericTag]: if assembled_json: return TypeAdapter(list[DockerGenericTag]).validate_json(assembled_json) return [] + + +def ec2_warm_buffer_startup_script( + ec2_boot_specific: EC2InstanceBootSpecific, app_settings: ApplicationSettings +) -> str: + startup_commands = ec2_boot_specific.custom_boot_scripts.copy() + if ec2_boot_specific.pre_pull_images: + assert app_settings.AUTOSCALING_REGISTRY # nosec + startup_commands.extend( + ( + utils_docker.get_docker_login_on_start_bash_command( + app_settings.AUTOSCALING_REGISTRY + ), + utils_docker.write_compose_file_command( + ec2_boot_specific.pre_pull_images + ), + ) + ) + return " && ".join(startup_commands) diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 1df51388726..d348fd7abe0 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -78,19 +78,19 @@ Cluster, DaskTaskResources, ) -from simcore_service_autoscaling.modules import auto_scaling_core -from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import ( - DynamicAutoscaling, +from simcore_service_autoscaling.modules.cluster_scaling import _auto_scaling_core +from simcore_service_autoscaling.modules.cluster_scaling._provider_dynamic import ( + DynamicAutoscalingProvider, ) from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API -from simcore_service_autoscaling.utils.buffer_machines_pool_core import ( - get_deactivated_buffer_ec2_tags, -) from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_SERVICE_READY_LABEL_KEY, _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) +from simcore_service_autoscaling.utils.warm_buffer_machines import ( + get_deactivated_warm_buffer_ec2_tags, +) from tenacity import after_log, before_sleep_log, retry from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay @@ -325,12 +325,12 @@ def mocked_ec2_instances_envs( @pytest.fixture def disable_autoscaling_background_task(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_task.create_periodic_task", + "simcore_service_autoscaling.modules.cluster_scaling.auto_scaling_task.create_periodic_task", autospec=True, ) mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_task.cancel_wait_task", + "simcore_service_autoscaling.modules.cluster_scaling.auto_scaling_task.cancel_wait_task", autospec=True, ) @@ -338,12 +338,12 @@ def disable_autoscaling_background_task(mocker: MockerFixture) -> None: @pytest.fixture def disable_buffers_pool_background_task(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_autoscaling.modules.buffer_machines_pool_task.create_periodic_task", + "simcore_service_autoscaling.modules.cluster_scaling.warm_buffer_machines_pool_task.create_periodic_task", autospec=True, ) mocker.patch( - "simcore_service_autoscaling.modules.buffer_machines_pool_task.cancel_wait_task", + "simcore_service_autoscaling.modules.cluster_scaling.warm_buffer_machines_pool_task.cancel_wait_task", autospec=True, ) @@ -446,10 +446,10 @@ def service_monitored_labels( app_settings: ApplicationSettings, ) -> dict[DockerLabelKey, str]: assert app_settings.AUTOSCALING_NODES_MONITORING - return { - key: "true" - for key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS - } + return dict.fromkeys( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS, + "true", + ) @pytest.fixture @@ -849,10 +849,10 @@ def _creator(**cluter_overrides) -> Cluster: active_nodes=[], pending_nodes=[], drained_nodes=[], - buffer_drained_nodes=[], + hot_buffer_drained_nodes=[], pending_ec2s=[], broken_ec2s=[], - buffer_ec2s=[], + warm_buffer_ec2s=[], disconnected_nodes=[], terminating_nodes=[], retired_nodes=[], @@ -903,7 +903,7 @@ async def _fake_set_node_availability( return returned_node return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_availability", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.set_node_availability", autospec=True, side_effect=_fake_set_node_availability, ) @@ -927,7 +927,7 @@ async def fake_tag_node( return updated_node return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.tag_node", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.tag_node", autospec=True, side_effect=fake_tag_node, ) @@ -1044,7 +1044,7 @@ def hot_buffer_instance_type(app_settings: ApplicationSettings) -> InstanceTypeT @pytest.fixture def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.find_node_with_name", autospec=True, return_value=None, ) @@ -1071,18 +1071,18 @@ def with_short_ec2_instances_max_start_time( @pytest.fixture async def spied_cluster_analysis(mocker: MockerFixture) -> MockType: - return mocker.spy(auto_scaling_core, "_analyze_current_cluster") + return mocker.spy(_auto_scaling_core, "_analyze_current_cluster") @pytest.fixture -async def mocked_associate_ec2_instances_with_nodes(mocker: MockerFixture) -> mock.Mock: - async def _( +def mocked_associate_ec2_instances_with_nodes(mocker: MockerFixture) -> mock.Mock: + def _( nodes: list[DockerNode], ec2_instances: list[EC2InstanceData] ) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]: return [], ec2_instances return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.associate_ec2_instances_with_nodes", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.associate_ec2_instances_with_nodes", autospec=True, side_effect=_, ) @@ -1185,8 +1185,8 @@ async def _do( resource_tags: list[TagTypeDef] = [ {"Key": tag_key, "Value": tag_value} - for tag_key, tag_value in get_deactivated_buffer_ec2_tags( - initialized_app, DynamicAutoscaling() + for tag_key, tag_value in get_deactivated_warm_buffer_ec2_tags( + DynamicAutoscalingProvider().get_ec2_tags(initialized_app) ).items() ] if pre_pull_images is not None and instance_state_name == "stopped": diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_task.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_auto_scaling_task.py similarity index 96% rename from services/autoscaling/tests/unit/test_modules_auto_scaling_task.py rename to services/autoscaling/tests/unit/test_modules_cluster_scaling_auto_scaling_task.py index 4a3d3e85bae..8778996d9e6 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_task.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_auto_scaling_task.py @@ -40,7 +40,7 @@ def app_environment( @pytest.fixture def mock_background_task(mocker: MockerFixture) -> mock.Mock: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_task.auto_scale_cluster", + "simcore_service_autoscaling.modules.cluster_scaling.auto_scaling_task.auto_scale_cluster", autospec=True, ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py similarity index 97% rename from services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py rename to services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py index 8a9f82ec847..dfafec0b21f 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py @@ -27,9 +27,14 @@ from faker import Faker from fastapi import FastAPI from models_library.docker import DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY -from models_library.generated_models.docker_rest_api import Availability +from models_library.generated_models.docker_rest_api import ( + Availability, +) from models_library.generated_models.docker_rest_api import Node as DockerNode -from models_library.generated_models.docker_rest_api import NodeState, NodeStatus +from models_library.generated_models.docker_rest_api import ( + NodeState, + NodeStatus, +) from models_library.rabbitmq_messages import RabbitAutoscalingStatusMessage from pydantic import ByteSize, TypeAdapter from pytest_mock import MockerFixture, MockType @@ -41,9 +46,11 @@ from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import EC2InstanceData -from simcore_service_autoscaling.modules.auto_scaling_core import auto_scale_cluster -from simcore_service_autoscaling.modules.auto_scaling_mode_computational import ( - ComputationalAutoscaling, +from simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core import ( + auto_scale_cluster, +) +from simcore_service_autoscaling.modules.cluster_scaling._provider_computational import ( + ComputationalAutoscalingProvider, ) from simcore_service_autoscaling.modules.dask import DaskTaskResources from simcore_service_autoscaling.modules.docker import get_docker_client @@ -128,7 +135,7 @@ def mock_docker_find_node_with_name_returns_fake_node( mocker: MockerFixture, fake_node: DockerNode ) -> Iterator[mock.Mock]: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.find_node_with_name", autospec=True, return_value=fake_node, ) @@ -137,7 +144,7 @@ def mock_docker_find_node_with_name_returns_fake_node( @pytest.fixture def mock_docker_compute_node_used_resources(mocker: MockerFixture) -> mock.Mock: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.compute_node_used_resources", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.compute_node_used_resources", autospec=True, return_value=Resources.create_as_empty(), ) @@ -326,7 +333,7 @@ async def test_cluster_scaling_with_no_tasks_does_nothing( dask_spec_local_cluster: distributed.SpecCluster, ): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) mock_launch_instances.assert_not_called() mock_terminate_instances.assert_not_called() @@ -364,7 +371,7 @@ async def test_cluster_scaling_with_disabled_ssm_does_not_block_autoscaling( dask_spec_local_cluster: distributed.SpecCluster, ): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) mock_launch_instances.assert_not_called() mock_terminate_instances.assert_not_called() @@ -405,7 +412,7 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( assert dask_future await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) mock_launch_instances.assert_not_called() mock_terminate_instances.assert_not_called() @@ -497,7 +504,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert dask_futures # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # check the instance was started and we have exactly 1 @@ -531,7 +538,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # 2. running this again should not scale again, but tag the node and make it available await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) mock_dask_get_worker_has_results_in_memory.assert_called_once() mock_dask_get_worker_has_results_in_memory.reset_mock() @@ -629,7 +636,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert fake_attached_node.description fake_attached_node.description.hostname = internal_dns_name - auto_scaling_mode = ComputationalAutoscaling() + auto_scaling_mode = ComputationalAutoscalingProvider() mocker.patch.object( auto_scaling_mode, "get_monitored_nodes", @@ -766,7 +773,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION ) mocked_docker_remove_node = mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.remove_nodes", return_value=None, autospec=True, ) @@ -873,7 +880,7 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_allowed( # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # nothing runs @@ -924,7 +931,7 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # nothing runs @@ -991,7 +998,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( # run the code await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # check the instances were started @@ -1083,7 +1090,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) await assert_autoscaled_computational_ec2_instances( ec2_client, @@ -1115,7 +1122,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and num_useless_calls = 10 for _ in range(num_useless_calls): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) await assert_autoscaled_computational_ec2_instances( ec2_client, @@ -1184,7 +1191,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # one of each type is created with some that will have 2 instances @@ -1228,7 +1235,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star num_useless_calls = 10 for _ in range(num_useless_calls): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) all_instances = await ec2_client.describe_instances() assert len(all_instances["Reservations"]) == len( @@ -1294,7 +1301,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # check the instance was started and we have exactly 1 @@ -1338,7 +1345,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( # 2. running again several times the autoscaler, the node does not join for i in range(7): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # there should be no scaling up, since there is already a pending instance instances = await assert_autoscaled_computational_ec2_instances( @@ -1382,7 +1389,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( ) # scaling now will terminate the broken ec2 that did not connect, and directly create a replacement await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) # we have therefore 2 reservations, first instance is terminated and a second one started all_instances = await ec2_client.describe_instances() @@ -1485,7 +1492,7 @@ async def test_cluster_adapts_machines_on_the_fly( # it will only scale once and do nothing else await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) await assert_autoscaled_computational_ec2_instances( ec2_client, @@ -1512,7 +1519,7 @@ async def test_cluster_adapts_machines_on_the_fly( # # 2. now the machines are associated await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1535,7 +1542,7 @@ async def test_cluster_adapts_machines_on_the_fly( # scaling will do nothing since we have hit the maximum number of machines for _ in range(3): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) await assert_autoscaled_computational_ec2_instances( ec2_client, @@ -1565,11 +1572,11 @@ async def test_cluster_adapts_machines_on_the_fly( # first call to auto_scale_cluster will mark 1 node as empty with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_found_empty", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.set_node_found_empty", autospec=True, ) as mock_docker_set_node_found_empty: await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1587,14 +1594,14 @@ async def test_cluster_adapts_machines_on_the_fly( # now we mock the get_node_found_empty so the next call will actually drain the machine with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_empty_since", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.get_node_empty_since", autospec=True, return_value=arrow.utcnow().datetime - 1.5 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING, ) as mocked_get_node_empty_since: await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) mocked_get_node_empty_since.assert_called_once() analyzed_cluster = assert_cluster_state( @@ -1610,7 +1617,7 @@ async def test_cluster_adapts_machines_on_the_fly( create_fake_node, drained_machine_instance_id, None ) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1622,7 +1629,7 @@ async def test_cluster_adapts_machines_on_the_fly( # this will initiate termination now with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_last_readyness_update", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.get_node_last_readyness_update", autospec=True, return_value=arrow.utcnow().datetime - 1.5 @@ -1630,7 +1637,7 @@ async def test_cluster_adapts_machines_on_the_fly( ): mock_docker_tag_node.reset_mock() await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1649,7 +1656,7 @@ async def test_cluster_adapts_machines_on_the_fly( create_fake_node, drained_machine_instance_id, drained_machine_instance_id ) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1662,19 +1669,19 @@ async def test_cluster_adapts_machines_on_the_fly( # now this will terminate it and straight away start a new machine type with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_termination_started_since", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.get_node_termination_started_since", autospec=True, return_value=arrow.utcnow().datetime - 1.5 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION, ): mocked_docker_remove_node = mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.remove_nodes", return_value=None, autospec=True, ) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + app=initialized_app, auto_scaling_mode=ComputationalAutoscalingProvider() ) mocked_docker_remove_node.assert_called_once() diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py similarity index 96% rename from services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py rename to services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py index 19bb4c69c89..9dc439d694a 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py @@ -53,28 +53,28 @@ from simcore_service_autoscaling.constants import BUFFER_MACHINE_TAG_KEY from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import AssociatedInstance, Cluster -from simcore_service_autoscaling.modules.auto_scaling_core import ( +from simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core import ( _activate_drained_nodes, _find_terminateable_instances, _try_scale_down_cluster, auto_scale_cluster, ) -from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import ( - DynamicAutoscaling, +from simcore_service_autoscaling.modules.cluster_scaling._provider_dynamic import ( + DynamicAutoscalingProvider, ) from simcore_service_autoscaling.modules.docker import ( AutoscalingDocker, get_docker_client, ) -from simcore_service_autoscaling.utils.auto_scaling_core import ( - node_host_name_from_ec2_private_dns, -) from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY, _OSPARC_SERVICE_READY_LABEL_KEY, _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) +from simcore_service_autoscaling.utils.utils_ec2 import ( + node_host_name_from_ec2_private_dns, +) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef @@ -113,7 +113,7 @@ def mock_find_node_with_name_returns_fake_node( mocker: MockerFixture, fake_node: Node ) -> Iterator[mock.Mock]: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.find_node_with_name", autospec=True, return_value=fake_node, ) @@ -122,7 +122,7 @@ def mock_find_node_with_name_returns_fake_node( @pytest.fixture def mock_remove_nodes(mocker: MockerFixture) -> mock.Mock: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.remove_nodes", autospec=True, ) @@ -130,7 +130,7 @@ def mock_remove_nodes(mocker: MockerFixture) -> mock.Mock: @pytest.fixture def mock_compute_node_used_resources(mocker: MockerFixture) -> mock.Mock: return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.compute_node_used_resources", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.compute_node_used_resources", autospec=True, return_value=Resources.create_as_empty(), ) @@ -323,7 +323,7 @@ async def test_cluster_scaling_with_no_services_does_nothing( mock_rabbitmq_post_message: mock.Mock, ): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) mock_launch_instances.assert_not_called() mock_terminate_instances.assert_not_called() @@ -362,7 +362,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ): assert app_settings.AUTOSCALING_EC2_INSTANCES await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, @@ -387,7 +387,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect mock_rabbitmq_post_message.reset_mock() # calling again should attach the new nodes to the reserve, but nothing should start await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, @@ -426,7 +426,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect # calling it again should not create anything new for _ in range(10): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, @@ -486,7 +486,7 @@ async def test_cluster_scaling_with_service_asking_for_too_much_resources_starts await create_services_batch(scale_up_params) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) mock_launch_instances.assert_not_called() mock_terminate_instances.assert_not_called() @@ -529,7 +529,7 @@ async def _test_cluster_scaling_up_and_down( # noqa: PLR0915 # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=0 @@ -578,7 +578,7 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: # 2. running this again should not scale again, but tag the node and make it available await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=1 @@ -591,13 +591,11 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: ) assert fake_attached_node.spec.labels assert app_settings.AUTOSCALING_NODES_MONITORING - expected_docker_node_tags = { - tag_key: "true" - for tag_key in ( - app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS - + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS - ) - } | { + expected_docker_node_tags = dict.fromkeys( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS, + "true", + ) | { DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: scale_up_params.expected_instance_type } fake_attached_node.spec.labels |= expected_docker_node_tags | { @@ -713,7 +711,7 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: fake_attached_node.spec.availability = Availability.active fake_attached_node.description.hostname = internal_dns_name - auto_scaling_mode = DynamicAutoscaling() + auto_scaling_mode = DynamicAutoscalingProvider() mocker.patch.object( auto_scaling_mode, "get_monitored_nodes", @@ -862,7 +860,7 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION ) mocked_docker_remove_node = mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.remove_nodes", return_value=None, autospec=True, ) @@ -1192,7 +1190,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( # run the code await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) # check the instances were started @@ -1294,7 +1292,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # it will only scale once and do nothing else await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, @@ -1319,7 +1317,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # # 2. now the machines are associated await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1341,7 +1339,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # scaling will do nothing since we have hit the maximum number of machines for _ in range(3): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, @@ -1380,11 +1378,11 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # first call to auto_scale_cluster will mark 1 node as empty with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_found_empty", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.set_node_found_empty", autospec=True, ) as mock_docker_set_node_found_empty: await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1402,14 +1400,14 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # now we mock the get_node_found_empty so the next call will actually drain the machine with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_empty_since", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.get_node_empty_since", autospec=True, return_value=arrow.utcnow().datetime - 1.5 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING, ) as mocked_get_node_empty_since: await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) mocked_get_node_empty_since.assert_called_once() analyzed_cluster = assert_cluster_state( @@ -1425,7 +1423,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 create_fake_node, drained_machine_instance_id, None ) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1437,7 +1435,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # this will initiate termination now with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_last_readyness_update", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.get_node_last_readyness_update", autospec=True, return_value=arrow.utcnow().datetime - 1.5 @@ -1445,7 +1443,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 ): mock_docker_tag_node.reset_mock() await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1464,7 +1462,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 create_fake_node, drained_machine_instance_id, drained_machine_instance_id ) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) analyzed_cluster = assert_cluster_state( spied_cluster_analysis, @@ -1477,19 +1475,19 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 # now this will terminate it and straight away start a new machine type with mock.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_termination_started_since", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.get_node_termination_started_since", autospec=True, return_value=arrow.utcnow().datetime - 1.5 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION, ): mocked_docker_remove_node = mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + "simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core.utils_docker.remove_nodes", return_value=None, autospec=True, ) await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) mocked_docker_remove_node.assert_called_once() @@ -1578,7 +1576,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( # this should trigger a scaling up as we have no nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) # check the instance was started and we have exactly 1 @@ -1622,7 +1620,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( # 2. running again several times the autoscaler, the node does not join for i in range(7): await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) # there should be no scaling up, since there is already a pending instance instances = await assert_autoscaled_dynamic_ec2_instances( @@ -1666,7 +1664,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( ) # scaling now will terminate the broken ec2 that did not connect, and directly create a replacement await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) # we have therefore 2 reservations, first instance is terminated and a second one started all_instances = await ec2_client.describe_instances() @@ -1720,11 +1718,11 @@ async def test__find_terminateable_nodes_with_no_hosts( AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) ], drained_nodes=[], - buffer_drained_nodes=[ + hot_buffer_drained_nodes=[ AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) ], ) - assert await _find_terminateable_instances(initialized_app, active_cluster) == [] + assert _find_terminateable_instances(initialized_app, active_cluster) == [] @pytest.mark.parametrize( @@ -1754,7 +1752,7 @@ async def test__try_scale_down_cluster_with_no_nodes( drained_nodes=[ create_associated_instance(drained_host_node, False) # noqa: FBT003 ], - buffer_drained_nodes=[ + hot_buffer_drained_nodes=[ create_associated_instance(drained_host_node, True) # noqa: FBT003 ], ) @@ -1795,7 +1793,7 @@ async def test__activate_drained_nodes_with_no_tasks( drained_nodes=[ create_associated_instance(drained_host_node, True) # noqa: FBT003 ], - buffer_drained_nodes=[ + hot_buffer_drained_nodes=[ create_associated_instance(drained_host_node, True) # noqa: FBT003 ], ) @@ -2006,7 +2004,7 @@ async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( # let's autoscale, this should move the warm buffers to hot buffers await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) mock_docker_tag_node.assert_not_called() # at analysis time, we had no machines running @@ -2016,8 +2014,8 @@ async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( expected_num_machines=0, ) assert not analyzed_cluster.active_nodes - assert analyzed_cluster.buffer_ec2s - assert len(analyzed_cluster.buffer_ec2s) == len(buffer_machines) + assert analyzed_cluster.warm_buffer_ec2s + assert len(analyzed_cluster.warm_buffer_ec2s) == len(buffer_machines) # now we should have a warm buffer moved to the hot buffer await assert_autoscaled_dynamic_ec2_instances( @@ -2041,7 +2039,7 @@ async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( # let's autoscale again, to check the cluster analysis and tag the nodes await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) mock_docker_tag_node.assert_called() assert ( @@ -2055,14 +2053,14 @@ async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, ) assert not analyzed_cluster.active_nodes - assert len(analyzed_cluster.buffer_ec2s) == max( + assert len(analyzed_cluster.warm_buffer_ec2s) == max( 0, buffer_count - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, ), ( "the warm buffers were not used as expected there should be" f" {buffer_count - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER} remaining, " - f"found {len(analyzed_cluster.buffer_ec2s)}" + f"found {len(analyzed_cluster.warm_buffer_ec2s)}" ) assert ( len(analyzed_cluster.pending_ec2s) @@ -2124,7 +2122,7 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 # ensure we get our running hot buffer await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, @@ -2137,7 +2135,7 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 ) # this brings a new analysis await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) spied_cluster = assert_cluster_state( spied_cluster_analysis, expected_calls=2, expected_num_machines=5 @@ -2150,13 +2148,11 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 ) assert fake_attached_node_base.spec.labels assert app_settings.AUTOSCALING_NODES_MONITORING - expected_docker_node_tags = { - tag_key: "true" - for tag_key in ( - app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS - + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS - ) - } | { + expected_docker_node_tags = dict.fromkeys( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS, + "true", + ) | { DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: f"{hot_buffer_instance_type}" } fake_attached_node_base.spec.labels |= expected_docker_node_tags | { @@ -2172,7 +2168,7 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 spied_cluster.pending_ec2s[i].ec2_instance ) fake_hot_buffer_nodes.append(node) - auto_scaling_mode = DynamicAutoscaling() + auto_scaling_mode = DynamicAutoscalingProvider() mocker.patch.object( auto_scaling_mode, "get_monitored_nodes", @@ -2193,8 +2189,8 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 spied_cluster = assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=5 ) - assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer - assert not spied_cluster.buffer_ec2s + assert len(spied_cluster.hot_buffer_drained_nodes) == num_hot_buffer + assert not spied_cluster.warm_buffer_ec2s # have a few warm buffers ready with the same type as the hot buffer machines await create_buffer_machines( @@ -2238,8 +2234,8 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 spied_cluster = assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=5 ) - assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer - assert len(spied_cluster.buffer_ec2s) == buffer_count + assert len(spied_cluster.hot_buffer_drained_nodes) == num_hot_buffer + assert len(spied_cluster.warm_buffer_ec2s) == buffer_count # # BUG REPRODUCTION @@ -2307,8 +2303,8 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 spied_cluster = assert_cluster_state( spied_cluster_analysis, expected_calls=2, expected_num_machines=6 ) - assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer - 1 - assert len(spied_cluster.buffer_ec2s) == buffer_count - 1 + assert len(spied_cluster.hot_buffer_drained_nodes) == num_hot_buffer - 1 + assert len(spied_cluster.warm_buffer_ec2s) == buffer_count - 1 assert len(spied_cluster.active_nodes) == 1 assert len(spied_cluster.pending_ec2s) == 1 @@ -2326,8 +2322,8 @@ async def _check_autoscaling_is_stable() -> None: spied_cluster = assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=6 ) - assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer - 1 - assert len(spied_cluster.buffer_ec2s) == buffer_count - 1 + assert len(spied_cluster.hot_buffer_drained_nodes) == num_hot_buffer - 1 + assert len(spied_cluster.warm_buffer_ec2s) == buffer_count - 1 assert len(spied_cluster.active_nodes) == 1 assert len(spied_cluster.pending_ec2s) == 1 diff --git a/services/autoscaling/tests/unit/test_utils_computational_scaling.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py similarity index 95% rename from services/autoscaling/tests/unit/test_utils_computational_scaling.py rename to services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py index b5744f17053..e412487f4ea 100644 --- a/services/autoscaling/tests/unit/test_utils_computational_scaling.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py @@ -8,7 +8,7 @@ from aws_library.ec2 import Resources from pydantic import ByteSize, TypeAdapter from simcore_service_autoscaling.models import DaskTask, DaskTaskResources -from simcore_service_autoscaling.utils.computational_scaling import ( +from simcore_service_autoscaling.modules.cluster_scaling._utils_computational import ( _DEFAULT_MAX_CPU, _DEFAULT_MAX_RAM, resources_from_dask_task, diff --git a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py similarity index 97% rename from services/autoscaling/tests/unit/test_modules_buffer_machine_core.py rename to services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py index 32d38c0eea9..14e56d509db 100644 --- a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py @@ -28,10 +28,10 @@ from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY -from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import ( - DynamicAutoscaling, +from simcore_service_autoscaling.modules.cluster_scaling._provider_dynamic import ( + DynamicAutoscalingProvider, ) -from simcore_service_autoscaling.modules.buffer_machines_pool_core import ( +from simcore_service_autoscaling.modules.cluster_scaling._warm_buffer_machines_pool_core import ( monitor_buffer_machines, ) from types_aiobotocore_ec2 import EC2Client @@ -95,7 +95,7 @@ async def test_if_send_command_is_mocked_by_moto( # 1. run, this will create as many buffer machines as needed await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, @@ -112,7 +112,7 @@ async def test_if_send_command_is_mocked_by_moto( # 2. this should generate a failure as current version of moto does not handle this await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) @@ -168,7 +168,7 @@ async def _test_monitor_buffer_machines( # 1. run, this will create as many buffer machines as needed with log_context(logging.INFO, "create buffer machines"): await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) with log_context( logging.INFO, f"waiting for {buffer_count} buffer instances to be running" @@ -211,7 +211,7 @@ async def _assert_buffer_machines_running() -> None: ) async def _assert_run_ssm_command_for_pulling() -> None: await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, @@ -248,7 +248,7 @@ async def _assert_run_ssm_command_for_pulling() -> None: ) async def _assert_wait_for_ssm_command_to_finish() -> None: await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, @@ -354,7 +354,7 @@ async def test_monitor_buffer_machines_terminates_supernumerary_instances( ) # this will terminate the supernumerary instances and start new ones await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, @@ -414,7 +414,7 @@ async def test_monitor_buffer_machines_terminates_instances_with_incorrect_pre_p ) # this will terminate the wrong instances and start new ones and pre-pull the new set of images await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, @@ -491,7 +491,7 @@ async def test_monitor_buffer_machines_terminates_unneeded_pool( # this will terminate the unwanted buffer pool and replace with the expected ones await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() + initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, diff --git a/services/autoscaling/tests/unit/test_modules_instrumentation_models.py b/services/autoscaling/tests/unit/test_modules_instrumentation_models.py index 78824bd8fb5..f96efae4574 100644 --- a/services/autoscaling/tests/unit/test_modules_instrumentation_models.py +++ b/services/autoscaling/tests/unit/test_modules_instrumentation_models.py @@ -1,14 +1,14 @@ from dataclasses import is_dataclass import pytest -from simcore_service_autoscaling.models import BufferPool, Cluster +from simcore_service_autoscaling.models import Cluster, WarmBufferPool from simcore_service_autoscaling.modules.instrumentation._constants import ( - BUFFER_POOLS_METRICS_DEFINITIONS, CLUSTER_METRICS_DEFINITIONS, + WARM_BUFFER_POOLS_METRICS_DEFINITIONS, ) from simcore_service_autoscaling.modules.instrumentation._models import ( - BufferPoolsMetrics, ClusterMetrics, + WarmBufferPoolsMetrics, ) @@ -16,7 +16,7 @@ "class_name, metrics_class_name, metrics_definitions", [ (Cluster, ClusterMetrics, CLUSTER_METRICS_DEFINITIONS), - (BufferPool, BufferPoolsMetrics, BUFFER_POOLS_METRICS_DEFINITIONS), + (WarmBufferPool, WarmBufferPoolsMetrics, WARM_BUFFER_POOLS_METRICS_DEFINITIONS), ], ) def test_models_are_in_sync( @@ -27,9 +27,9 @@ def test_models_are_in_sync( assert is_dataclass(class_name) assert is_dataclass(metrics_class_name) for field in class_name.__dataclass_fields__: - assert ( - field in metrics_definitions - ), f"{metrics_definitions.__qualname__} is missing {field}" - assert hasattr( - metrics_class_name, field - ), f"{metrics_class_name.__qualname__} is missing {field}" + assert field in metrics_definitions, ( + f"{metrics_definitions.__qualname__} is missing {field}" + ) + assert hasattr(metrics_class_name, field), ( + f"{metrics_class_name.__qualname__} is missing {field}" + ) diff --git a/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py similarity index 89% rename from services/autoscaling/tests/unit/test_utils_auto_scaling_core.py rename to services/autoscaling/tests/unit/test_utils_cluster_scaling.py index 54d4f1b44e0..7afa42f770d 100644 --- a/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py +++ b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py @@ -19,14 +19,12 @@ from pydantic import TypeAdapter from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict -from simcore_service_autoscaling.core.errors import Ec2InvalidDnsNameError from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import AssociatedInstance, EC2InstanceData -from simcore_service_autoscaling.utils.auto_scaling_core import ( +from simcore_service_autoscaling.utils.cluster_scaling import ( associate_ec2_instances_with_nodes, ec2_startup_script, - get_machine_buffer_type, - node_host_name_from_ec2_private_dns, + get_hot_buffer_type, sort_drained_nodes, ) from simcore_service_autoscaling.utils.utils_docker import ( @@ -52,34 +50,6 @@ def _creator(**overrides) -> DockerNode: return _creator -@pytest.mark.parametrize( - "aws_private_dns, expected_host_name", - [ - ("ip-10-12-32-3.internal-data", "ip-10-12-32-3"), - ("ip-10-12-32-32.internal-data", "ip-10-12-32-32"), - ("ip-10-0-3-129.internal-data", "ip-10-0-3-129"), - ("ip-10-0-3-12.internal-data", "ip-10-0-3-12"), - ], -) -def test_node_host_name_from_ec2_private_dns( - fake_ec2_instance_data: Callable[..., EC2InstanceData], - aws_private_dns: str, - expected_host_name: str, -): - instance = fake_ec2_instance_data( - aws_private_dns=aws_private_dns, - ) - assert node_host_name_from_ec2_private_dns(instance) == expected_host_name - - -def test_node_host_name_from_ec2_private_dns_raises_with_invalid_name( - fake_ec2_instance_data: Callable[..., EC2InstanceData], faker: Faker -): - instance = fake_ec2_instance_data(aws_private_dns=faker.name()) - with pytest.raises(Ec2InvalidDnsNameError): - node_host_name_from_ec2_private_dns(instance) - - @pytest.mark.parametrize("valid_ec2_dns", [True, False]) async def test_associate_ec2_instances_with_nodes_with_no_correspondence( fake_ec2_instance_data: Callable[..., EC2InstanceData], @@ -99,7 +69,7 @@ async def test_associate_ec2_instances_with_nodes_with_no_correspondence( ( associated_instances, non_associated_instances, - ) = await associate_ec2_instances_with_nodes(nodes, ec2_instances) + ) = associate_ec2_instances_with_nodes(nodes, ec2_instances) assert not associated_instances assert non_associated_instances @@ -122,7 +92,7 @@ async def test_associate_ec2_instances_with_corresponding_nodes( ( associated_instances, non_associated_instances, - ) = await associate_ec2_instances_with_nodes(nodes, ec2_instances) + ) = associate_ec2_instances_with_nodes(nodes, ec2_instances) assert associated_instances assert not non_associated_instances @@ -304,7 +274,7 @@ def test_get_machine_buffer_type( random_fake_available_instances: list[EC2InstanceType], ): assert ( - get_machine_buffer_type(random_fake_available_instances) + get_hot_buffer_type(random_fake_available_instances) == random_fake_available_instances[0] ) @@ -329,7 +299,7 @@ def test_sort_drained_nodes( create_fake_node: Callable[..., DockerNode], create_associated_instance: Callable[..., AssociatedInstance], ): - machine_buffer_type = get_machine_buffer_type(random_fake_available_instances) + machine_buffer_type = get_hot_buffer_type(random_fake_available_instances) _NUM_DRAINED_NODES = 20 _NUM_NODE_WITH_TYPE_BUFFER = ( 3 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER diff --git a/services/autoscaling/tests/unit/test_utils_ec2.py b/services/autoscaling/tests/unit/test_utils_ec2.py index 23c5981acd2..8f56460bc3b 100644 --- a/services/autoscaling/tests/unit/test_utils_ec2.py +++ b/services/autoscaling/tests/unit/test_utils_ec2.py @@ -3,18 +3,24 @@ # pylint: disable=unused-variable +from collections.abc import Callable + import pytest from aws_library.ec2 import EC2InstanceType, Resources +from aws_library.ec2._models import EC2InstanceData from faker import Faker from pydantic import ByteSize from simcore_service_autoscaling.core.errors import ( ConfigurationError, + Ec2InvalidDnsNameError, TaskBestFittingInstanceNotFoundError, ) from simcore_service_autoscaling.utils.utils_ec2 import ( closest_instance_policy, compose_user_data, find_best_fitting_ec2_instance, + node_host_name_from_ec2_private_dns, + node_ip_from_ec2_private_dns, ) @@ -70,3 +76,59 @@ def test_compose_user_data(faker: Faker): user_data = compose_user_data(command) assert user_data.startswith("#!/bin/bash") assert command in user_data + + +@pytest.mark.parametrize( + "aws_private_dns, expected_host_name", + [ + ("ip-10-12-32-3.internal-data", "ip-10-12-32-3"), + ("ip-10-12-32-32.internal-data", "ip-10-12-32-32"), + ("ip-10-0-3-129.internal-data", "ip-10-0-3-129"), + ("ip-10-0-3-12.internal-data", "ip-10-0-3-12"), + ], +) +def test_node_host_name_from_ec2_private_dns( + fake_ec2_instance_data: Callable[..., EC2InstanceData], + aws_private_dns: str, + expected_host_name: str, +): + instance = fake_ec2_instance_data( + aws_private_dns=aws_private_dns, + ) + assert node_host_name_from_ec2_private_dns(instance) == expected_host_name + + +def test_node_host_name_from_ec2_private_dns_raises_with_invalid_name( + fake_ec2_instance_data: Callable[..., EC2InstanceData], faker: Faker +): + instance = fake_ec2_instance_data(aws_private_dns=faker.name()) + with pytest.raises(Ec2InvalidDnsNameError): + node_host_name_from_ec2_private_dns(instance) + + +@pytest.mark.parametrize( + "aws_private_dns, expected_host_name", + [ + ("ip-10-12-32-3.internal-data", "10.12.32.3"), + ("ip-10-12-32-32.internal-data", "10.12.32.32"), + ("ip-10-0-3-129.internal-data", "10.0.3.129"), + ("ip-10-0-3-12.internal-data", "10.0.3.12"), + ], +) +def test_node_ip_from_ec2_private_dns( + fake_ec2_instance_data: Callable[..., EC2InstanceData], + aws_private_dns: str, + expected_host_name: str, +): + instance = fake_ec2_instance_data( + aws_private_dns=aws_private_dns, + ) + assert node_ip_from_ec2_private_dns(instance) == expected_host_name + + +def test_node_ip_from_ec2_private_dns_raises_with_invalid_name( + fake_ec2_instance_data: Callable[..., EC2InstanceData], faker: Faker +): + instance = fake_ec2_instance_data(aws_private_dns=faker.name()) + with pytest.raises(Ec2InvalidDnsNameError): + node_ip_from_ec2_private_dns(instance) diff --git a/services/autoscaling/tests/unit/test_utils_buffer_machines_pool_core.py b/services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py similarity index 80% rename from services/autoscaling/tests/unit/test_utils_buffer_machines_pool_core.py rename to services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py index 19cc33c2575..f9cee912bcf 100644 --- a/services/autoscaling/tests/unit/test_utils_buffer_machines_pool_core.py +++ b/services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py @@ -2,7 +2,7 @@ # pylint:disable=unused-argument # pylint:disable=redefined-outer-name import pytest -from aws_library.ec2 import AWSTagKey, AWSTagValue, EC2Tags +from aws_library.ec2 import AWSTagValue, EC2Tags from faker import Faker from fastapi import FastAPI from models_library.docker import DockerGenericTag @@ -14,17 +14,17 @@ DEACTIVATED_BUFFER_MACHINE_EC2_TAGS, PRE_PULLED_IMAGES_EC2_TAG_KEY, ) -from simcore_service_autoscaling.modules.auto_scaling_mode_computational import ( - ComputationalAutoscaling, +from simcore_service_autoscaling.modules.cluster_scaling._provider_computational import ( + ComputationalAutoscalingProvider, ) -from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import ( - DynamicAutoscaling, +from simcore_service_autoscaling.modules.cluster_scaling._provider_dynamic import ( + DynamicAutoscalingProvider, ) -from simcore_service_autoscaling.utils.buffer_machines_pool_core import ( +from simcore_service_autoscaling.utils.warm_buffer_machines import ( dump_pre_pulled_images_as_tags, - get_activated_buffer_ec2_tags, - get_deactivated_buffer_ec2_tags, - is_buffer_machine, + get_activated_warm_buffer_ec2_tags, + get_deactivated_warm_buffer_ec2_tags, + is_warm_buffer_machine, load_pre_pulled_images_from_tags, ) @@ -37,9 +37,9 @@ def test_get_activated_buffer_ec2_tags_dynamic( enabled_dynamic_mode: EnvVarsDict, initialized_app: FastAPI, ): - auto_scaling_mode = DynamicAutoscaling() - activated_buffer_tags = get_activated_buffer_ec2_tags( - initialized_app, auto_scaling_mode + auto_scaling_mode = DynamicAutoscalingProvider() + activated_buffer_tags = get_activated_warm_buffer_ec2_tags( + auto_scaling_mode.get_ec2_tags(initialized_app) ) assert ( auto_scaling_mode.get_ec2_tags(initialized_app) @@ -55,9 +55,9 @@ def test_get_deactivated_buffer_ec2_tags_dynamic( enabled_dynamic_mode: EnvVarsDict, initialized_app: FastAPI, ): - auto_scaling_mode = DynamicAutoscaling() - deactivated_buffer_tags = get_deactivated_buffer_ec2_tags( - initialized_app, auto_scaling_mode + auto_scaling_mode = DynamicAutoscalingProvider() + deactivated_buffer_tags = get_deactivated_warm_buffer_ec2_tags( + auto_scaling_mode.get_ec2_tags(initialized_app) ) # when deactivated the buffer EC2 name has an additional -buffer suffix expected_tags = ( @@ -65,8 +65,8 @@ def test_get_deactivated_buffer_ec2_tags_dynamic( | DEACTIVATED_BUFFER_MACHINE_EC2_TAGS ) assert "Name" in expected_tags - expected_tags[AWSTagKey("Name")] = TypeAdapter(AWSTagValue).validate_python( - str(expected_tags[AWSTagKey("Name")]) + "-buffer" + expected_tags["Name"] = TypeAdapter(AWSTagValue).validate_python( + str(expected_tags["Name"]) + "-buffer" ) assert expected_tags == deactivated_buffer_tags @@ -79,9 +79,9 @@ def test_get_activated_buffer_ec2_tags_computational( enabled_computational_mode: EnvVarsDict, initialized_app: FastAPI, ): - auto_scaling_mode = ComputationalAutoscaling() - activated_buffer_tags = get_activated_buffer_ec2_tags( - initialized_app, auto_scaling_mode + auto_scaling_mode = ComputationalAutoscalingProvider() + activated_buffer_tags = get_activated_warm_buffer_ec2_tags( + auto_scaling_mode.get_ec2_tags(initialized_app) ) assert ( auto_scaling_mode.get_ec2_tags(initialized_app) @@ -97,9 +97,9 @@ def test_get_deactivated_buffer_ec2_tags_computational( enabled_computational_mode: EnvVarsDict, initialized_app: FastAPI, ): - auto_scaling_mode = ComputationalAutoscaling() - deactivated_buffer_tags = get_deactivated_buffer_ec2_tags( - initialized_app, auto_scaling_mode + auto_scaling_mode = ComputationalAutoscalingProvider() + deactivated_buffer_tags = get_deactivated_warm_buffer_ec2_tags( + auto_scaling_mode.get_ec2_tags(initialized_app) ) # when deactivated the buffer EC2 name has an additional -buffer suffix expected_tags = ( @@ -107,8 +107,8 @@ def test_get_deactivated_buffer_ec2_tags_computational( | DEACTIVATED_BUFFER_MACHINE_EC2_TAGS ) assert "Name" in expected_tags - expected_tags[AWSTagKey("Name")] = TypeAdapter(AWSTagValue).validate_python( - str(expected_tags[AWSTagKey("Name")]) + "-buffer" + expected_tags["Name"] = TypeAdapter(AWSTagValue).validate_python( + str(expected_tags["Name"]) + "-buffer" ) assert expected_tags == deactivated_buffer_tags @@ -121,7 +121,7 @@ def test_get_deactivated_buffer_ec2_tags_computational( ], ) def test_is_buffer_machine(tags: EC2Tags, expected_is_buffer: bool): - assert is_buffer_machine(tags) is expected_is_buffer + assert is_warm_buffer_machine(tags) is expected_is_buffer @pytest.mark.parametrize(