Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions packages/aws-library/src/aws_library/ec2/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from typing import cast
from typing import Literal, cast

import aioboto3
import botocore.exceptions
Expand Down Expand Up @@ -66,20 +66,28 @@ async def ping(self) -> bool:
@ec2_exception_handler(_logger)
async def get_ec2_instance_capabilities(
self,
instance_type_names: set[InstanceTypeType],
instance_type_names: set[InstanceTypeType] | Literal["ALL"],
) -> list[EC2InstanceType]:
"""returns the ec2 instance types from a list of instance type names
NOTE: the order might differ!
"""Returns the ec2 instance types from a list of instance type names (sorted by name)

Arguments:
instance_type_names -- the types to filter with
instance_type_names -- the types to filter with or "ALL", to return all EC2 possible instances

Raises:
Ec2InstanceTypeInvalidError: some invalid types were used as filter
ClustersKeeperRuntimeError: unexpected error communicating with EC2

"""
if instance_type_names == "ALL":
selection_or_all_if_empty = []
else:
selection_or_all_if_empty = list(instance_type_names)
if len(selection_or_all_if_empty) == 0:
msg = "`instance_type_names` cannot be an empty set. Use either a selection or 'ALL'"
raise ValueError(msg)

instance_types = await self.client.describe_instance_types(
InstanceTypes=list(instance_type_names)
InstanceTypes=selection_or_all_if_empty
)
list_instances: list[EC2InstanceType] = []
for instance in instance_types.get("InstanceTypes", []):
Expand All @@ -95,7 +103,7 @@ async def get_ec2_instance_capabilities(
),
)
)
return list_instances
return sorted(list_instances, key=lambda i: i.name)

@ec2_exception_handler(_logger)
async def launch_instances(
Expand Down
18 changes: 10 additions & 8 deletions packages/aws-library/tests/test_ec2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,25 @@ async def test_get_ec2_instance_capabilities(
)
)
assert instance_types
assert len(instance_types) == len(ec2_allowed_instances)
assert [_.name for _ in instance_types] == sorted(ec2_allowed_instances)

# all the instance names are found and valid
assert all(i.name in ec2_allowed_instances for i in instance_types)
for instance_type_name in ec2_allowed_instances:
assert any(i.name == instance_type_name for i in instance_types)


async def test_get_ec2_instance_capabilities_empty_list_returns_all_options(
async def test_get_ec2_instance_capabilities_returns_all_options(
simcore_ec2_api: SimcoreEC2API,
):
instance_types = await simcore_ec2_api.get_ec2_instance_capabilities(set())
instance_types = await simcore_ec2_api.get_ec2_instance_capabilities("ALL")
assert instance_types
# NOTE: this might need adaptation when moto is updated
assert 700 < len(instance_types) < 828


async def test_get_ec2_instance_capabilities_raise_with_empty_set(
simcore_ec2_api: SimcoreEC2API,
):
with pytest.raises(ValueError, match="instance_type_names"):
await simcore_ec2_api.get_ec2_instance_capabilities(set())


async def test_get_ec2_instance_capabilities_with_invalid_type_raises(
simcore_ec2_api: SimcoreEC2API,
faker: Faker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,6 @@ def metadata_file(project_slug_dir: Path, request: pytest.FixtureRequest) -> Pat
return metadata_file


def get_expected_files(docker_name: str) -> tuple[str, ...]:
return (
".cookiecutterrc",
".dockerignore",
"metadata:metadata.yml",
f"docker/{docker_name}:entrypoint.sh",
f"docker/{docker_name}:Dockerfile",
"service.cli:execute.sh",
"docker-compose-build.yml",
"docker-compose-meta.yml",
"docker-compose.devel.yml",
"docker-compose.yml",
)


def assert_path_in_repo(expected_path: str, project_slug_dir: Path):

if ":" in expected_path:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Literal

from models_library.api_schemas_clusters_keeper import CLUSTERS_KEEPER_RPC_NAMESPACE
from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceTypeGet
from models_library.rabbitmq_basic_types import RPCMethodName
Expand All @@ -7,7 +9,7 @@


async def get_instance_type_details(
client: RabbitMQRPCClient, *, instance_type_names: set[str]
client: RabbitMQRPCClient, *, instance_type_names: set[str] | Literal["ALL"]
) -> list[EC2InstanceTypeGet]:
"""**Remote method**
Expand Down
20 changes: 0 additions & 20 deletions services/autoscaling/.cookiecutterrc

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class Config(EC2Settings.Config):
class EC2InstancesSettings(BaseCustomSettings):
EC2_INSTANCES_ALLOWED_TYPES: dict[str, EC2InstanceBootSpecific] = Field(
...,
description="Defines which EC2 instances are considered as candidates for new EC2 instance and their respective boot specific parameters",
description="Defines which EC2 instances are considered as candidates for new EC2 instance and their respective boot specific parameters"
"NOTE: minimum length >0",
)

EC2_INSTANCES_KEY_NAME: str = Field(
Expand Down Expand Up @@ -133,7 +134,7 @@ class EC2InstancesSettings(BaseCustomSettings):

@validator("EC2_INSTANCES_TIME_BEFORE_DRAINING")
@classmethod
def ensure_draining_delay_time_is_in_range(
def _ensure_draining_delay_time_is_in_range(
cls, value: datetime.timedelta
) -> datetime.timedelta:
if value < datetime.timedelta(seconds=10):
Expand All @@ -144,7 +145,7 @@ def ensure_draining_delay_time_is_in_range(

@validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION")
@classmethod
def ensure_termination_delay_time_is_in_range(
def _ensure_termination_delay_time_is_in_range(
cls, value: datetime.timedelta
) -> datetime.timedelta:
if value < datetime.timedelta(minutes=0):
Expand All @@ -155,12 +156,18 @@ def ensure_termination_delay_time_is_in_range(

@validator("EC2_INSTANCES_ALLOWED_TYPES")
@classmethod
def check_valid_instance_names(
def _check_valid_instance_names_and_not_empty(
cls, value: dict[str, EC2InstanceBootSpecific]
) -> dict[str, EC2InstanceBootSpecific]:
# NOTE: needed because of a flaw in BaseCustomSettings
# issubclass raises TypeError if used on Aliases
parse_obj_as(list[InstanceTypeType], list(value))

if not value:
# NOTE: Field( ... , min_items=...) cannot be used to contraint number of iterms in a dict
msg = "At least one item expecte EC2_INSTANCES_ALLOWED_TYPES, got none"
raise ValueError(msg)

return value


Expand Down Expand Up @@ -293,12 +300,12 @@ def LOG_LEVEL(self): # noqa: N802

@validator("AUTOSCALING_LOGLEVEL")
@classmethod
def valid_log_level(cls, value: str) -> str:
def _valid_log_level(cls, value: str) -> str:
return cls.validate_log_level(value)

@root_validator()
@classmethod
def exclude_both_dynamic_computational_mode(cls, values):
def _exclude_both_dynamic_computational_mode(cls, values):
if (
values.get("AUTOSCALING_DASK") is not None
and values.get("AUTOSCALING_NODES_MONITORING") is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import collections
import dataclasses
import datetime
import functools
import itertools
import logging
from typing import Final, cast
Expand Down Expand Up @@ -327,30 +328,30 @@ async def _try_attach_pending_ec2s(
)


async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
async def _sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
app_settings: ApplicationSettings = app.state.settings
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
ec2_client = get_ec2_client(app)

# some instances might be able to run several tasks
allowed_instance_type_names = list(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
)

assert ( # nosec
allowed_instance_type_names
), "EC2_INSTANCES_ALLOWED_TYPES cannot be empty!"

allowed_instance_types: list[
EC2InstanceType
] = await ec2_client.get_ec2_instance_capabilities(
cast(
set[InstanceTypeType],
set(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
),
)
cast(set[InstanceTypeType], set(allowed_instance_type_names))
)

def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int:
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
return list(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
).index(f"{instance_type.name}")
def _as_selection(instance_type: EC2InstanceType) -> int:
# NOTE: will raise ValueError if allowed_instance_types not in allowed_instance_type_names
return allowed_instance_type_names.index(f"{instance_type.name}")

allowed_instance_types.sort(key=_sort_according_to_allowed_types)
allowed_instance_types.sort(key=_as_selection)
return allowed_instance_types


Expand Down Expand Up @@ -497,51 +498,44 @@ async def _assign_tasks_to_current_cluster(
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
) -> tuple[list, Cluster]:
"""
Evaluates whether a task can be executed on any instance within the cluster. If the task's resource requirements are met, the task is *denoted* as assigned to the cluster.
Note: This is an estimation only since actual scheduling is handled by Dask/Docker (depending on the mode).

Returns:
A tuple containing:
- A list of unassigned tasks (tasks whose resource requirements cannot be fulfilled by the available machines in the cluster).
- The same cluster instance passed as input.
"""
unassigned_tasks = []
assignment_predicates = [
functools.partial(_try_assign_task_to_ec2_instance, instances=instances)
for instances in (
cluster.active_nodes,
cluster.drained_nodes + cluster.buffer_drained_nodes,
cluster.pending_nodes,
cluster.pending_ec2s,
cluster.buffer_ec2s,
)
]

for task in tasks:
task_required_resources = auto_scaling_mode.get_task_required_resources(task)
task_required_ec2_instance = await auto_scaling_mode.get_task_defined_instance(
app, task
)

assignment_functions = [
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
task,
instances=cluster.active_nodes,
task_required_ec2_instance=required_ec2,
task_required_resources=required_resources,
),
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
task,
instances=cluster.drained_nodes + cluster.buffer_drained_nodes,
task_required_ec2_instance=required_ec2,
task_required_resources=required_resources,
),
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
task,
instances=cluster.pending_nodes,
task_required_ec2_instance=required_ec2,
task_required_resources=required_resources,
),
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
task,
instances=cluster.pending_ec2s,
task_required_ec2_instance=required_ec2,
task_required_resources=required_resources,
),
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
task,
instances=cluster.buffer_ec2s,
task_required_ec2_instance=required_ec2,
task_required_resources=required_resources,
),
]

if any(
assignment(task, task_required_ec2_instance, task_required_resources)
for assignment in assignment_functions
is_assigned(
task,
task_required_ec2_instance=task_required_ec2_instance,
task_required_resources=task_required_resources,
)
for is_assigned in assignment_predicates
):
_logger.debug("assigned task to cluster")
_logger.debug(
"task %s is assigned to one instance available in cluster", task
)
else:
unassigned_tasks.append(task)

Expand Down Expand Up @@ -1131,7 +1125,7 @@ async def _autoscale_cluster(
# 1. check if we have pending tasks and resolve them by activating some drained nodes
unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
_logger.info("found %s unrunnable tasks", len(unrunnable_tasks))

# NOTE: this function predicts how dask will assign a task to a machine
queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster(
app, unrunnable_tasks, cluster, auto_scaling_mode
)
Expand Down Expand Up @@ -1217,11 +1211,13 @@ async def auto_scale_cluster(
If there are such tasks, this method will allocate new machines in AWS to cope with
the additional load.
"""

allowed_instance_types = await sorted_allowed_instance_types(app)
# current state
allowed_instance_types = await _sorted_allowed_instance_types(app)
cluster = await _analyze_current_cluster(
app, auto_scaling_mode, allowed_instance_types
)

# cleanup
cluster = await _cleanup_disconnected_nodes(app, cluster)
cluster = await _terminate_broken_ec2s(app, cluster)
cluster = await _make_pending_buffer_ec2s_join_cluster(app, cluster)
Expand All @@ -1230,8 +1226,11 @@ async def auto_scale_cluster(
)
cluster = await _drain_retired_nodes(app, cluster)

# desired state
cluster = await _autoscale_cluster(
app, cluster, auto_scaling_mode, allowed_instance_types
)

# notify
await _notify_machine_creation_progress(app, cluster, auto_scaling_mode)
await _notify_autoscaling_status(app, cluster, auto_scaling_mode)
Loading
Loading