Skip to content

Commit ec12cfc

Browse files
authored
🐛Bugfix/autoscaling does not scale above limit (#5129)
1 parent 7c30bc0 commit ec12cfc

File tree

8 files changed

+376
-32
lines changed

8 files changed

+376
-32
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from typing import Any, TypeAlias
2+
3+
from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
4+
5+
DaskTaskResources: TypeAlias = dict[str, Any]
6+
7+
8+
def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str:
9+
return f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{ec2_instance_type}"
10+
11+
12+
def get_ec2_instance_type_from_resources(
13+
task_resources: DaskTaskResources,
14+
) -> str | None:
15+
for resource_name in task_resources:
16+
if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY):
17+
return resource_name.split(":")[-1]
18+
return None
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
2+
from dask_task_models_library.resource_constraints import (
3+
create_ec2_resource_constraint_key,
4+
get_ec2_instance_type_from_resources,
5+
)
6+
from faker import Faker
7+
8+
9+
def test_create_ec2_resource_constraint_key(faker: Faker):
10+
faker_instance_type = faker.pystr()
11+
assert (
12+
create_ec2_resource_constraint_key(faker_instance_type)
13+
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{faker_instance_type}"
14+
)
15+
16+
empty_instance_type = ""
17+
assert (
18+
create_ec2_resource_constraint_key(empty_instance_type)
19+
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:"
20+
)
21+
22+
23+
def test_get_ec2_instance_type_from_resources(faker: Faker):
24+
empty_task_resources = {}
25+
assert get_ec2_instance_type_from_resources(empty_task_resources) is None
26+
no_ec2_types_in_resources = {"blahblah": 1}
27+
assert get_ec2_instance_type_from_resources(no_ec2_types_in_resources) is None
28+
29+
faker_instance_type = faker.pystr()
30+
ec2_type_in_resources = {create_ec2_resource_constraint_key(faker_instance_type): 1}
31+
assert (
32+
get_ec2_instance_type_from_resources(ec2_type_in_resources)
33+
== faker_instance_type
34+
)

services/autoscaling/src/simcore_service_autoscaling/models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from dataclasses import dataclass, field
2-
from typing import Any, TypeAlias
2+
from typing import TypeAlias
33

44
from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
5+
from dask_task_models_library.resource_constraints import DaskTaskResources
56
from models_library.generated_models.docker_rest_api import Node
67

78

@@ -55,7 +56,6 @@ class Cluster:
5556

5657

5758
DaskTaskId: TypeAlias = str
58-
DaskTaskResources: TypeAlias = dict[str, Any]
5959

6060

6161
@dataclass(frozen=True, kw_only=True)

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
EC2InstanceConfig,
1212
EC2InstanceData,
1313
EC2InstanceType,
14+
EC2Tags,
1415
Resources,
1516
)
1617
from fastapi import FastAPI
@@ -441,6 +442,73 @@ async def _find_needed_instances(
441442
return num_instances_per_type
442443

443444

445+
async def _cap_needed_instances(
446+
app: FastAPI, needed_instances: dict[EC2InstanceType, int], ec2_tags: EC2Tags
447+
) -> dict[EC2InstanceType, int]:
448+
"""caps the needed instances dict[EC2InstanceType, int] to the maximal allowed number of instances by
449+
1. limiting to 1 per asked type
450+
2. increasing each by 1 until the maximum allowed number of instances is reached
451+
NOTE: the maximum allowed number of instances contains the current number of running/pending machines
452+
453+
Raises:
454+
Ec2TooManyInstancesError: raised when the maximum of machines is already running/pending
455+
"""
456+
ec2_client = get_ec2_client(app)
457+
app_settings = get_application_settings(app)
458+
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
459+
current_instances = await ec2_client.get_instances(
460+
key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME],
461+
tags=ec2_tags,
462+
)
463+
current_number_of_instances = len(current_instances)
464+
if (
465+
current_number_of_instances
466+
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
467+
):
468+
# ok that is already too much
469+
raise Ec2TooManyInstancesError(
470+
num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
471+
)
472+
473+
total_number_of_needed_instances = sum(needed_instances.values())
474+
if (
475+
current_number_of_instances + total_number_of_needed_instances
476+
<= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
477+
):
478+
# ok that fits no need to do anything here
479+
return needed_instances
480+
481+
# this is asking for too many, so let's cap them
482+
max_number_of_creatable_instances = (
483+
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
484+
- current_number_of_instances
485+
)
486+
487+
# we start with 1 machine of each type until the max
488+
capped_needed_instances = {
489+
k: 1
490+
for count, k in enumerate(needed_instances)
491+
if (count + 1) <= max_number_of_creatable_instances
492+
}
493+
494+
if len(capped_needed_instances) < len(needed_instances):
495+
# there were too many types for the number of possible instances
496+
return capped_needed_instances
497+
498+
# all instance types were added, now create more of them if possible
499+
while sum(capped_needed_instances.values()) < max_number_of_creatable_instances:
500+
for instance_type, num_to_create in needed_instances.items():
501+
if (
502+
sum(capped_needed_instances.values())
503+
== max_number_of_creatable_instances
504+
):
505+
break
506+
if num_to_create > capped_needed_instances[instance_type]:
507+
capped_needed_instances[instance_type] += 1
508+
509+
return capped_needed_instances
510+
511+
444512
async def _start_instances(
445513
app: FastAPI,
446514
needed_instances: dict[EC2InstanceType, int],
@@ -450,14 +518,28 @@ async def _start_instances(
450518
ec2_client = get_ec2_client(app)
451519
app_settings = get_application_settings(app)
452520
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
521+
new_instance_tags = auto_scaling_mode.get_ec2_tags(app)
522+
capped_needed_machines = {}
523+
try:
524+
capped_needed_machines = await _cap_needed_instances(
525+
app, needed_instances, new_instance_tags
526+
)
527+
except Ec2TooManyInstancesError:
528+
await auto_scaling_mode.log_message_from_tasks(
529+
app,
530+
tasks,
531+
"The maximum number of machines in the cluster was reached. Please wait for your running jobs "
532+
"to complete and try again later or contact osparc support if this issue does not resolve.",
533+
level=logging.ERROR,
534+
)
535+
return []
453536

454-
instance_tags = auto_scaling_mode.get_ec2_tags(app)
455537
results = await asyncio.gather(
456538
*[
457539
ec2_client.start_aws_instance(
458540
EC2InstanceConfig(
459541
type=instance_type,
460-
tags=instance_tags,
542+
tags=new_instance_tags,
461543
startup_script=await ec2_startup_script(
462544
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
463545
instance_type.name
@@ -474,7 +556,7 @@ async def _start_instances(
474556
number_of_instances=instance_num,
475557
max_number_of_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES,
476558
)
477-
for instance_type, instance_num in needed_instances.items()
559+
for instance_type, instance_num in capped_needed_machines.items()
478560
],
479561
return_exceptions=True,
480562
)
@@ -497,7 +579,10 @@ async def _start_instances(
497579
else:
498580
new_pending_instances.append(r)
499581

500-
log_message = f"{sum(n for n in needed_instances.values())} new machines launched, it might take up to 3 minutes to start, Please wait..."
582+
log_message = (
583+
f"{sum(n for n in capped_needed_machines.values())} new machines launched"
584+
", it might take up to 3 minutes to start, Please wait..."
585+
)
501586
await auto_scaling_mode.log_message_from_tasks(
502587
app, tasks, log_message, level=logging.INFO
503588
)

services/autoscaling/src/simcore_service_autoscaling/modules/dask.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55

66
import distributed
77
from aws_library.ec2.models import EC2InstanceData, Resources
8+
from dask_task_models_library.resource_constraints import DaskTaskResources
89
from pydantic import AnyUrl, ByteSize, parse_obj_as
910

1011
from ..core.errors import (
1112
DaskNoWorkersError,
1213
DaskSchedulerNotFoundError,
1314
DaskWorkerNotFoundError,
1415
)
15-
from ..models import AssociatedInstance, DaskTask, DaskTaskId, DaskTaskResources
16+
from ..models import AssociatedInstance, DaskTask, DaskTaskId
1617
from ..utils.auto_scaling_core import (
1718
node_host_name_from_ec2_private_dns,
1819
node_ip_from_ec2_private_dns,

services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
from typing import Final
55

66
from aws_library.ec2.models import Resources
7-
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
7+
from dask_task_models_library.resource_constraints import (
8+
get_ec2_instance_type_from_resources,
9+
)
810
from fastapi import FastAPI
911
from servicelib.utils_formatting import timedelta_as_minute_second
10-
from types_aiobotocore_ec2.literals import InstanceTypeType
1112

1213
from ..core.settings import get_application_settings
1314
from ..models import (
@@ -30,8 +31,11 @@ def get_max_resources_from_dask_task(task: DaskTask) -> Resources:
3031
)
3132

3233

33-
def get_task_instance_restriction(task: DaskTask) -> InstanceTypeType | None:
34-
return task.required_resources.get(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY)
34+
def get_task_instance_restriction(task: DaskTask) -> str | None:
35+
instance_ec2_type: str | None = get_ec2_instance_type_from_resources(
36+
task.required_resources
37+
)
38+
return instance_ec2_type
3539

3640

3741
def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources:

0 commit comments

Comments
 (0)