Skip to content

Commit 88c77bd

Browse files
author
Andrei Neagu
committed
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate-dy-scheduler-part-1
2 parents 8c79dde + 2ac6d38 commit 88c77bd

File tree

11 files changed

+235
-52
lines changed

11 files changed

+235
-52
lines changed

packages/aws-library/tests/test_ec2_client.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,63 @@ async def test_stop_start_instances(
419419
assert getattr(s, f.name) == getattr(c, f.name)
420420

421421

422+
async def test_start_instances_with_insufficient_instance_capacity(
423+
simcore_ec2_api: SimcoreEC2API,
424+
ec2_client: EC2Client,
425+
faker: Faker,
426+
ec2_instance_config: EC2InstanceConfig,
427+
mocker: MockerFixture,
428+
):
429+
# we have nothing running now in ec2
430+
await _assert_no_instances_in_ec2(ec2_client)
431+
# create some instance
432+
_NUM_INSTANCES = 10
433+
num_instances = faker.pyint(min_value=1, max_value=_NUM_INSTANCES)
434+
created_instances = await simcore_ec2_api.launch_instances(
435+
ec2_instance_config,
436+
min_number_of_instances=num_instances,
437+
number_of_instances=num_instances,
438+
)
439+
await _assert_instances_in_ec2(
440+
ec2_client,
441+
expected_num_reservations=1,
442+
expected_num_instances=num_instances,
443+
expected_instance_type=ec2_instance_config.type,
444+
expected_tags=ec2_instance_config.tags,
445+
expected_state="running",
446+
)
447+
# stop the instances
448+
await simcore_ec2_api.stop_instances(created_instances)
449+
await _assert_instances_in_ec2(
450+
ec2_client,
451+
expected_num_reservations=1,
452+
expected_num_instances=num_instances,
453+
expected_instance_type=ec2_instance_config.type,
454+
expected_tags=ec2_instance_config.tags,
455+
expected_state="stopped",
456+
)
457+
458+
# Mock the EC2 client to simulate InsufficientInstanceCapacity on first subnet
459+
async def mock_start_instances(*args, **kwargs) -> Any:
460+
# no more machines, simulate insufficient capacity
461+
error_response: dict[str, Any] = {
462+
"Error": {
463+
"Code": "InsufficientInstanceCapacity",
464+
"Message": "An error occurred (InsufficientInstanceCapacity) when calling the StartInstances operation (reached max retries: 4): Insufficient capacity.",
465+
},
466+
}
467+
raise botocore.exceptions.ClientError(error_response, "StartInstances") # type: ignore
468+
469+
# Apply the mock
470+
mocker.patch.object(
471+
simcore_ec2_api.client, "start_instances", side_effect=mock_start_instances
472+
)
473+
474+
# start the instances now
475+
with pytest.raises(EC2InsufficientCapacityError):
476+
await simcore_ec2_api.start_instances(created_instances)
477+
478+
422479
async def test_terminate_instance(
423480
simcore_ec2_api: SimcoreEC2API,
424481
ec2_client: EC2Client,
@@ -717,7 +774,9 @@ async def mock_run_instances(*args, **kwargs) -> Any:
717774
mocker.patch.object(
718775
simcore_ec2_api.client, "run_instances", side_effect=mock_run_instances
719776
)
720-
with pytest.raises(EC2InsufficientCapacityError) as exc_info:
777+
with pytest.raises(
778+
EC2InsufficientCapacityError, match=fake_ec2_instance_type.name
779+
) as exc_info:
721780
await simcore_ec2_api.launch_instances(
722781
ec2_instance_config,
723782
min_number_of_instances=1,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""add state type unknown
2+
3+
Revision ID: 06eafd25d004
4+
Revises: ec4f62595e0c
5+
Create Date: 2025-09-01 12:25:25.617790+00:00
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "06eafd25d004"
14+
down_revision = "ec4f62595e0c"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
op.execute("ALTER TYPE statetype ADD VALUE 'UNKNOWN'")
21+
22+
23+
def downgrade() -> None:
24+
# NOTE: PostgreSQL doesn't support removing enum values directly
25+
# This downgrades only ensure that StateType.UNKNOWN is not used
26+
#
27+
28+
# Find all tables and columns that use statetype enum
29+
result = op.get_bind().execute(
30+
sa.DDL(
31+
"""
32+
SELECT t.table_name, c.column_name, c.column_default
33+
FROM information_schema.columns c
34+
JOIN information_schema.tables t ON c.table_name = t.table_name
35+
WHERE c.udt_name = 'statetype'
36+
AND t.table_schema = 'public'
37+
"""
38+
)
39+
)
40+
41+
tables_columns = result.fetchall()
42+
43+
# Update UNKNOWN states to FAILED in all affected tables
44+
for table_name, column_name, _ in tables_columns:
45+
op.execute(
46+
sa.DDL(
47+
f"""
48+
UPDATE {table_name}
49+
SET {column_name} = 'FAILED'
50+
WHERE {column_name} = 'UNKNOWN'
51+
"""
52+
)
53+
)

packages/postgres-database/src/simcore_postgres_database/models/comp_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
""" Computational Pipeline Table
1+
"""Computational Pipeline Table"""
22

3-
"""
43
import enum
54
import uuid
65

@@ -24,6 +23,7 @@ class StateType(enum.Enum):
2423
ABORTED = "ABORTED"
2524
WAITING_FOR_RESOURCES = "WAITING_FOR_RESOURCES"
2625
WAITING_FOR_CLUSTER = "WAITING_FOR_CLUSTER"
26+
UNKNOWN = "UNKNOWN"
2727

2828

2929
def _new_uuid():

services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
EC2Tags,
1616
Resources,
1717
)
18-
from aws_library.ec2._errors import EC2AccessError, EC2TooManyInstancesError
18+
from aws_library.ec2._errors import (
19+
EC2AccessError,
20+
EC2InsufficientCapacityError,
21+
EC2TooManyInstancesError,
22+
)
1923
from fastapi import FastAPI
2024
from models_library.generated_models.docker_rest_api import Node
2125
from models_library.rabbitmq_messages import ProgressType
@@ -421,10 +425,46 @@ async def _activate_drained_nodes(
421425
)
422426

423427

428+
def _de_assign_tasks_from_warm_buffer_ec2s(
429+
cluster: Cluster, instances_to_start: list[EC2InstanceData]
430+
) -> tuple[Cluster, list]:
431+
# de-assign tasks from the warm buffer instances that could not be started
432+
deassigned_tasks = list(
433+
itertools.chain.from_iterable(
434+
i.assigned_tasks
435+
for i in cluster.warm_buffer_ec2s
436+
if i.ec2_instance in instances_to_start
437+
)
438+
)
439+
# upgrade the cluster
440+
return (
441+
dataclasses.replace(
442+
cluster,
443+
warm_buffer_ec2s=[
444+
(
445+
dataclasses.replace(i, assigned_tasks=[])
446+
if i.ec2_instance in instances_to_start
447+
else i
448+
)
449+
for i in cluster.warm_buffer_ec2s
450+
],
451+
),
452+
deassigned_tasks,
453+
)
454+
455+
424456
async def _try_start_warm_buffer_instances(
425457
app: FastAPI, cluster: Cluster, auto_scaling_mode: AutoscalingProvider
426-
) -> Cluster:
427-
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""
458+
) -> tuple[Cluster, list]:
459+
"""
460+
starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed
461+
462+
Returns:
463+
A tuple containing:
464+
- The updated cluster instance after attempting to start warm buffer instances.
465+
- In case warm buffer could not be started, a list of de-assigned tasks (tasks whose resource requirements cannot be fulfilled by warm buffers anymore).
466+
467+
"""
428468

429469
app_settings = get_application_settings(app)
430470
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
@@ -466,26 +506,34 @@ async def _try_start_warm_buffer_instances(
466506
]
467507

468508
if not instances_to_start:
469-
return cluster
509+
return cluster, []
470510

471511
with log_context(
472-
_logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines"
512+
_logger,
513+
logging.INFO,
514+
f"start {len(instances_to_start)} warm buffer machines '{[i.id for i in instances_to_start]}'",
473515
):
474516
try:
475517
started_instances = await get_ec2_client(app).start_instances(
476518
instances_to_start
477519
)
478-
except EC2AccessError:
520+
except EC2InsufficientCapacityError:
521+
# NOTE: this warning is only raised if none of the instances could be started due to InsufficientCapacity
479522
_logger.warning(
480-
"Could not start warm buffer instances! "
481-
"TIP: This can happen in case of Insufficient "
482-
"Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. "
483-
"Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this "
484-
"will prevent fulfilling this instance type need.",
485-
exc_info=True,
523+
"Could not start warm buffer instances: %s due to Insufficient Capacity in the current AWS Availability Zone! "
524+
"The warm buffer assigned tasks will be moved to new instances if possible.",
525+
[i.id for i in instances_to_start],
486526
)
487-
# we need to re-assign the tasks assigned to the warm buffer instances
488-
return cluster
527+
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)
528+
529+
except EC2AccessError:
530+
_logger.exception(
531+
"Could not start warm buffer instances %s! TIP: This needs to be analysed!"
532+
"The warm buffer assigned tasks will be moved to new instances if possible.",
533+
[i.id for i in instances_to_start],
534+
)
535+
return _de_assign_tasks_from_warm_buffer_ec2s(cluster, instances_to_start)
536+
489537
# NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity)
490538
await get_ec2_client(app).set_instances_tags(
491539
started_instances,
@@ -495,15 +543,18 @@ async def _try_start_warm_buffer_instances(
495543
)
496544
started_instance_ids = [i.id for i in started_instances]
497545

498-
return dataclasses.replace(
499-
cluster,
500-
warm_buffer_ec2s=[
501-
i
502-
for i in cluster.warm_buffer_ec2s
503-
if i.ec2_instance.id not in started_instance_ids
504-
],
505-
pending_ec2s=cluster.pending_ec2s
506-
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
546+
return (
547+
dataclasses.replace(
548+
cluster,
549+
warm_buffer_ec2s=[
550+
i
551+
for i in cluster.warm_buffer_ec2s
552+
if i.ec2_instance.id not in started_instance_ids
553+
],
554+
pending_ec2s=cluster.pending_ec2s
555+
+ [NonAssociatedInstance(ec2_instance=i) for i in started_instances],
556+
),
557+
[],
507558
)
508559

509560

@@ -1243,7 +1294,11 @@ async def _autoscale_cluster(
12431294
cluster = await _activate_drained_nodes(app, cluster)
12441295

12451296
# 3. start warm buffer instances to cover the remaining tasks
1246-
cluster = await _try_start_warm_buffer_instances(app, cluster, auto_scaling_mode)
1297+
cluster, de_assigned_tasks = await _try_start_warm_buffer_instances(
1298+
app, cluster, auto_scaling_mode
1299+
)
1300+
# 3.1 if some tasks were de-assigned, we need to add them to the still pending tasks
1301+
still_pending_tasks.extend(de_assigned_tasks)
12471302

12481303
# 4. scale down unused instances
12491304
cluster = await _scale_down_unused_cluster_instances(

services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2454,9 +2454,6 @@ async def _raise_insufficient_capacity_error(*args: Any, **kwargs: Any) -> None:
24542454
)
24552455

24562456

2457-
@pytest.mark.xfail(
2458-
reason="bug described in https://github.com/ITISFoundation/osparc-simcore/issues/8273"
2459-
)
24602457
@pytest.mark.parametrize(
24612458
# NOTE: only the main test test_cluster_scaling_up_and_down is run with all options
24622459
"with_docker_join_drained",
@@ -2495,7 +2492,7 @@ async def test_fresh_instance_is_launched_if_warm_buffers_cannot_start_due_to_in
24952492
InstanceTypeType,
24962493
next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)),
24972494
)
2498-
await create_buffer_machines(1, warm_buffer_instance_type, "stopped", None)
2495+
await create_buffer_machines(3, warm_buffer_instance_type, "stopped", None)
24992496

25002497
# create several tasks that needs more power
25012498
scale_up_params = _ScaleUpParams(

services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str:
102102
f"WORKERS_EC2_INSTANCES_KEY_NAME={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_KEY_NAME}",
103103
f"WORKERS_EC2_INSTANCES_MAX_INSTANCES={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_MAX_INSTANCES}",
104104
f"WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS={_convert_to_env_list(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS)}",
105-
f"WORKERS_EC2_INSTANCES_SUBNET_IDS={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SUBNET_IDS}",
105+
f"WORKERS_EC2_INSTANCES_SUBNET_IDS={_convert_to_env_list(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SUBNET_IDS)}",
106106
f"WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING}",
107107
f"WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION}",
108108
f"AUTOSCALING_RABBITMQ={_convert_to_env_dict(model_dump_with_secrets(app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_RABBIT, show_secrets=True)) if app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_RABBIT else 'null'}",

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,12 @@ def _handle_foreign_key_violation(
9595

9696
def _resolve_grouped_state(states: list[RunningState]) -> RunningState:
9797
# If any state is not a final state, return STARTED
98+
9899
final_states = {
99100
RunningState.FAILED,
100101
RunningState.ABORTED,
101102
RunningState.SUCCESS,
102-
RunningState.UNKNOWN,
103+
RunningState.UNKNOWN, # NOTE: this is NOT a final state, but happens when tasks are missing
103104
}
104105
if any(state not in final_states for state in states):
105106
return RunningState.STARTED
@@ -399,7 +400,6 @@ async def list_all_collection_run_ids_for_user_currently_running_computations(
399400
product_name: str,
400401
user_id: UserID,
401402
) -> list[CollectionRunID]:
402-
403403
list_query = (
404404
sa.select(
405405
comp_runs.c.collection_run_id,
@@ -493,17 +493,17 @@ async def list_group_by_collection_run_id(
493493
total_count = await conn.scalar(count_query)
494494
items = []
495495
async for row in await conn.stream(list_query):
496-
db_states = [DB_TO_RUNNING_STATE[s] for s in row["states"]]
496+
db_states = [DB_TO_RUNNING_STATE[s] for s in row.states]
497497
resolved_state = _resolve_grouped_state(db_states)
498498
items.append(
499499
ComputationCollectionRunRpcGet(
500-
collection_run_id=row["collection_run_id"],
501-
project_ids=row["project_ids"],
500+
collection_run_id=row.collection_run_id,
501+
project_ids=row.project_ids,
502502
state=resolved_state,
503-
info={} if row["info"] is None else row["info"],
504-
submitted_at=row["submitted_at"],
505-
started_at=row["started_at"],
506-
ended_at=row["ended_at"],
503+
info={} if row.info is None else row.info,
504+
submitted_at=row.submitted_at,
505+
started_at=row.started_at,
506+
ended_at=row.ended_at,
507507
)
508508
)
509509
return cast(int, total_count), items

services/director-v2/src/simcore_service_director_v2/utils/computations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
RunningState.PENDING,
6666
RunningState.NOT_STARTED,
6767
RunningState.WAITING_FOR_CLUSTER,
68+
RunningState.WAITING_FOR_RESOURCES,
6869
): RunningState.STARTED,
6970
}
7071

services/director-v2/src/simcore_service_director_v2/utils/db.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
StateType.ABORTED: RunningState.ABORTED,
1717
StateType.WAITING_FOR_RESOURCES: RunningState.WAITING_FOR_RESOURCES,
1818
StateType.WAITING_FOR_CLUSTER: RunningState.WAITING_FOR_CLUSTER,
19+
StateType.UNKNOWN: RunningState.UNKNOWN,
1920
}
2021

21-
RUNNING_STATE_TO_DB = {v: k for k, v in DB_TO_RUNNING_STATE.items()} | {
22-
RunningState.UNKNOWN: StateType.FAILED
23-
}
22+
RUNNING_STATE_TO_DB = {v: k for k, v in DB_TO_RUNNING_STATE.items()}
2423

2524
_logger = logging.getLogger(__name__)
2625

0 commit comments

Comments
 (0)