Skip to content

Commit f4930bb

Browse files
committed
ongoing test
1 parent 1f47bd2 commit f4930bb

File tree

2 files changed

+125
-42
lines changed

2 files changed

+125
-42
lines changed

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,16 +1116,26 @@ async def _drain_retired_nodes(
11161116
)
11171117

11181118

1119+
async def _scale_down_unused_cluster_machines(
1120+
app: FastAPI,
1121+
cluster: Cluster,
1122+
auto_scaling_mode: BaseAutoscaling,
1123+
) -> Cluster:
1124+
await auto_scaling_mode.try_retire_nodes(app)
1125+
cluster = await _deactivate_empty_nodes(app, cluster)
1126+
return await _try_scale_down_cluster(app, cluster)
1127+
1128+
11191129
async def _autoscale_cluster(
11201130
app: FastAPI,
11211131
cluster: Cluster,
11221132
auto_scaling_mode: BaseAutoscaling,
11231133
allowed_instance_types: list[EC2InstanceType],
11241134
) -> Cluster:
1125-
# 1. check if we have pending tasks and resolve them by activating some drained nodes
1135+
# 1. check if we have pending tasks
11261136
unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
11271137
_logger.info("found %s unrunnable tasks", len(unrunnable_tasks))
1128-
# NOTE: this function predicts how dask will assign a task to a machine
1138+
# NOTE: this function predicts how the backend will assign tasks
11291139
queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster(
11301140
app, unrunnable_tasks, cluster, auto_scaling_mode
11311141
)
@@ -1135,41 +1145,34 @@ async def _autoscale_cluster(
11351145
# 3. start buffer instances to cover the remaining tasks
11361146
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)
11371147

1148+
# 4. scale down unused machines
1149+
cluster = await _scale_down_unused_cluster_machines(app, cluster, auto_scaling_mode)
1150+
11381151
# 4. let's check if there are still pending tasks or if the reserve was used
11391152
app_settings = get_application_settings(app)
11401153
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
1141-
if queued_or_missing_instance_tasks or (
1142-
len(cluster.buffer_drained_nodes)
1143-
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
1144-
):
1145-
if (
1154+
if (
1155+
queued_or_missing_instance_tasks
1156+
or (
1157+
len(cluster.buffer_drained_nodes)
1158+
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
1159+
)
1160+
and (
11461161
cluster.total_number_of_machines()
11471162
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
1148-
):
1149-
_logger.info(
1150-
"%s unrunnable tasks could not be assigned, slowly trying to scale up...",
1151-
len(queued_or_missing_instance_tasks),
1152-
)
1153-
cluster = await _scale_up_cluster(
1154-
app,
1155-
cluster,
1156-
queued_or_missing_instance_tasks,
1157-
auto_scaling_mode,
1158-
allowed_instance_types,
1159-
)
1160-
1161-
elif (
1162-
len(queued_or_missing_instance_tasks) == len(unrunnable_tasks) == 0
1163-
and cluster.can_scale_down()
1163+
)
11641164
):
11651165
_logger.info(
1166-
"there is %s waiting task, slowly and gracefully scaling down...",
1166+
"%s unrunnable tasks could not be assigned, slowly trying to scale up...",
11671167
len(queued_or_missing_instance_tasks),
11681168
)
1169-
# NOTE: we only scale down in case we did not just scale up. The swarm needs some time to adjust
1170-
await auto_scaling_mode.try_retire_nodes(app)
1171-
cluster = await _deactivate_empty_nodes(app, cluster)
1172-
cluster = await _try_scale_down_cluster(app, cluster)
1169+
cluster = await _scale_up_cluster(
1170+
app,
1171+
cluster,
1172+
queued_or_missing_instance_tasks,
1173+
auto_scaling_mode,
1174+
allowed_instance_types,
1175+
)
11731176

11741177
return cluster
11751178

services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,20 @@ async def test_cluster_scaling_up_starts_multiple_instances(
11881188
mock_rabbitmq_post_message.reset_mock()
11891189

11901190

1191+
@pytest.fixture
1192+
async def mocked_associate_ec2_instances_with_nodes(mocker: MockerFixture) -> mock.Mock:
1193+
async def _(
1194+
nodes: list[Node], ec2_instances: list[EC2InstanceData]
1195+
) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]:
1196+
return [], ec2_instances
1197+
1198+
return mocker.patch(
1199+
"simcore_service_autoscaling.modules.auto_scaling_core.associate_ec2_instances_with_nodes",
1200+
autospec=True,
1201+
side_effect=_,
1202+
)
1203+
1204+
11911205
@pytest.mark.parametrize(
11921206
"with_docker_join_drained", ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], indirect=True
11931207
)
@@ -1240,6 +1254,11 @@ async def test_cluster_adapts_machines_on_the_fly(
12401254
async_docker_client: aiodocker.Docker,
12411255
scale_up_params1: _ScaleUpParams,
12421256
scale_up_params2: _ScaleUpParams,
1257+
mocked_associate_ec2_instances_with_nodes: mock.Mock,
1258+
create_fake_node: Callable[..., Node],
1259+
mock_docker_tag_node: mock.Mock,
1260+
mock_compute_node_used_resources: mock.Mock,
1261+
spied_cluster_analysis: MockType,
12431262
):
12441263
# pre-requisites
12451264
assert app_settings.AUTOSCALING_EC2_INSTANCES
@@ -1276,20 +1295,73 @@ async def test_cluster_adapts_machines_on_the_fly(
12761295
for _ in range(scale_up_params1.num_services)
12771296
)
12781297
)
1279-
for _ in range(3):
1280-
# it will only scale once and do nothing else
1281-
await auto_scale_cluster(
1282-
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
1283-
)
1284-
await assert_autoscaled_dynamic_ec2_instances(
1285-
ec2_client,
1286-
expected_num_reservations=1,
1287-
expected_num_instances=scale_up_params1.expected_num_instances,
1288-
expected_instance_type=scale_up_params1.expected_instance_type,
1289-
expected_instance_state="running",
1290-
expected_additional_tag_keys=list(ec2_instance_custom_tags),
1291-
instance_filters=instance_type_filters,
1292-
)
1298+
1299+
# it will only scale once and do nothing else
1300+
await auto_scale_cluster(
1301+
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
1302+
)
1303+
await assert_autoscaled_dynamic_ec2_instances(
1304+
ec2_client,
1305+
expected_num_reservations=1,
1306+
expected_num_instances=scale_up_params1.expected_num_instances,
1307+
expected_instance_type=scale_up_params1.expected_instance_type,
1308+
expected_instance_state="running",
1309+
expected_additional_tag_keys=list(ec2_instance_custom_tags),
1310+
instance_filters=instance_type_filters,
1311+
)
1312+
_assert_cluster_state(
1313+
spied_cluster_analysis,
1314+
expected_calls=1,
1315+
expected_num_machines=0,
1316+
)
1317+
mocked_associate_ec2_instances_with_nodes.assert_called_once_with([], [])
1318+
mocked_associate_ec2_instances_with_nodes.reset_mock()
1319+
1320+
fake_node_to_instance_map = {}
1321+
1322+
async def _fake_node_creator(
1323+
nodes: list[Node], ec2_instances: list[EC2InstanceData]
1324+
) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]:
1325+
def _create_fake_node_with_labels(instance: EC2InstanceData) -> Node:
1326+
if instance not in fake_node_to_instance_map:
1327+
fake_node = create_fake_node()
1328+
assert fake_node.spec
1329+
fake_node.spec.availability = Availability.active
1330+
assert fake_node.status
1331+
fake_node.status.state = NodeState.ready
1332+
assert fake_node.spec.labels
1333+
fake_node.spec.labels |= {
1334+
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: arrow.utcnow().isoformat(),
1335+
_OSPARC_SERVICE_READY_LABEL_KEY: "true",
1336+
}
1337+
fake_node_to_instance_map[instance] = fake_node
1338+
return fake_node_to_instance_map[instance]
1339+
1340+
associated_instances = [
1341+
AssociatedInstance(node=_create_fake_node_with_labels(i), ec2_instance=i)
1342+
for i in ec2_instances
1343+
]
1344+
1345+
return associated_instances, []
1346+
1347+
mocked_associate_ec2_instances_with_nodes.side_effect = _fake_node_creator
1348+
1349+
#
1350+
# 2. now the machines are associated
1351+
await auto_scale_cluster(
1352+
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
1353+
)
1354+
_assert_cluster_state(
1355+
spied_cluster_analysis,
1356+
expected_calls=1,
1357+
expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES,
1358+
)
1359+
mocked_associate_ec2_instances_with_nodes.assert_called_once()
1360+
mock_docker_tag_node.assert_called()
1361+
assert (
1362+
mock_docker_tag_node.call_count
1363+
== app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
1364+
)
12931365

12941366
#
12951367
# 2. now we start the second batch of services requiring a different type of machines
@@ -1330,6 +1402,10 @@ async def test_cluster_adapts_machines_on_the_fly(
13301402
instance_filters=instance_type_filters,
13311403
)
13321404

1405+
assert isinstance(spied_cluster_analysis.spy_return, Cluster)
1406+
assert spied_cluster_analysis.spy_return.active_nodes
1407+
assert not spied_cluster_analysis.spy_return.drained_nodes
1408+
13331409
# now we simulate that some of the services in the 1st batch have completed and that we are 1 below the max
13341410
# a machine should switch off and another type should be started
13351411
completed_services_to_stop = random.sample(
@@ -1350,6 +1426,10 @@ async def test_cluster_adapts_machines_on_the_fly(
13501426
await auto_scale_cluster(
13511427
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
13521428
)
1429+
1430+
assert spied_cluster_analysis.spy_return.active_nodes
1431+
assert not spied_cluster_analysis.spy_return.drained_nodes
1432+
13531433
all_instances = await ec2_client.describe_instances()
13541434
assert len(all_instances["Reservations"]) == 2, "there should be 2 Reservations"
13551435
reservation1 = all_instances["Reservations"][0]

0 commit comments

Comments
 (0)