Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,10 @@ async def _scale_down_unused_cluster_instances(
cluster: Cluster,
auto_scaling_mode: AutoscalingProvider,
) -> Cluster:
await auto_scaling_mode.try_retire_nodes(app)
if any(not instance.has_assigned_tasks() for instance in cluster.active_nodes):
# ask the provider to try to retire nodes actively
with log_catch(_logger, reraise=False):
await auto_scaling_mode.try_retire_nodes(app)
cluster = await _deactivate_empty_nodes(app, cluster)
return await _try_scale_down_cluster(app, cluster)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ def mock_dask_is_worker_connected(mocker: MockerFixture) -> mock.Mock:
)


@pytest.fixture
def spy_dask_try_retire_nodes(mocker: MockerFixture) -> mock.Mock:
from simcore_service_autoscaling.modules import dask

return mocker.spy(dask, "try_retire_nodes")


async def _create_task_with_resources(
ec2_client: EC2Client,
dask_task_imposed_ec2_type: InstanceTypeType | None,
Expand Down Expand Up @@ -491,6 +498,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_has_results_in_memory: mock.Mock,
mock_dask_get_worker_used_resources: mock.Mock,
mock_dask_is_worker_connected: mock.Mock,
spy_dask_try_retire_nodes: mock.Mock,
mocker: MockerFixture,
dask_spec_local_cluster: distributed.SpecCluster,
with_drain_nodes_labelled: bool,
Expand Down Expand Up @@ -527,6 +535,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_has_results_in_memory.assert_not_called()
mock_dask_get_worker_used_resources.assert_not_called()
mock_dask_is_worker_connected.assert_not_called()
spy_dask_try_retire_nodes.assert_not_called()
# check rabbit messages were sent
_assert_rabbit_autoscaling_message_sent(
mock_rabbitmq_post_message,
Expand All @@ -547,6 +556,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_used_resources.assert_called_once()
mock_dask_get_worker_used_resources.reset_mock()
mock_dask_is_worker_connected.assert_not_called()
spy_dask_try_retire_nodes.assert_not_called()
instances = await assert_autoscaled_computational_ec2_instances(
ec2_client,
expected_num_reservations=1,
Expand Down Expand Up @@ -667,6 +677,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
assert mock_docker_tag_node.call_count == num_useless_calls
mock_docker_tag_node.reset_mock()
mock_docker_set_node_availability.assert_not_called()
spy_dask_try_retire_nodes.assert_not_called()
# check the number of instances did not change and is still running
await assert_autoscaled_computational_ec2_instances(
ec2_client,
Expand Down Expand Up @@ -697,6 +708,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_used_resources.reset_mock()
# the node shall be waiting before draining
mock_docker_set_node_availability.assert_not_called()
spy_dask_try_retire_nodes.assert_called_once()
spy_dask_try_retire_nodes.reset_mock()
mock_docker_tag_node.assert_called_once_with(
get_docker_client(initialized_app),
fake_attached_node,
Expand Down Expand Up @@ -731,6 +744,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_used_resources.reset_mock()
# the node shall be set to drain, but not yet terminated
mock_docker_set_node_availability.assert_not_called()
spy_dask_try_retire_nodes.assert_called_once()
spy_dask_try_retire_nodes.reset_mock()
mock_docker_tag_node.assert_called_once_with(
get_docker_client(initialized_app),
fake_attached_node,
Expand Down Expand Up @@ -827,6 +842,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
)
.datetime.isoformat()
)
spy_dask_try_retire_nodes.assert_not_called()
await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode)
mocked_docker_remove_node.assert_called_once_with(
mock.ANY, nodes=[fake_attached_node], force=True
Expand All @@ -839,7 +855,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
expected_instance_state="terminated",
expected_additional_tag_keys=list(ec2_instance_custom_tags),
)

spy_dask_try_retire_nodes.assert_not_called()
# this call should never be used in computational mode
mock_docker_compute_node_used_resources.assert_not_called()

Expand Down
Loading