@@ -1441,3 +1441,151 @@ async def test_cluster_adapts_machines_on_the_fly(
14411441 )
14421442 assert analyzed_cluster .active_nodes
14431443 assert not analyzed_cluster .drained_nodes
1444+
1445+ #
1446+ # 4.now we simulate that some of the services in the 1st batch have completed and that we are 1 below the max
1447+ # a machine should switch off and another type should be started (just pop the future out of scope)
1448+ for _ in range (
1449+ scale_up_params1 .num_tasks
1450+ - app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES
1451+ + 1
1452+ ):
1453+ first_batch_tasks .pop ()
1454+
1455+ # first call to auto_scale_cluster will mark 1 node as empty
1456+ with mock .patch (
1457+ "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_found_empty" ,
1458+ autospec = True ,
1459+ ) as mock_docker_set_node_found_empty :
1460+ await auto_scale_cluster (
1461+ app = initialized_app , auto_scaling_mode = ComputationalAutoscaling ()
1462+ )
1463+ analyzed_cluster = assert_cluster_state (
1464+ spied_cluster_analysis ,
1465+ expected_calls = 1 ,
1466+ expected_num_machines = app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES ,
1467+ )
1468+ assert analyzed_cluster .active_nodes
1469+ assert not analyzed_cluster .drained_nodes
1470+ # the last machine is found empty
1471+ mock_docker_set_node_found_empty .assert_called_with (
1472+ mock .ANY ,
1473+ analyzed_cluster .active_nodes [- 1 ].node ,
1474+ empty = True ,
1475+ )
1476+
1477+ # now we mock the get_node_found_empty so the next call will actually drain the machine
1478+ with mock .patch (
1479+ "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_empty_since" ,
1480+ autospec = True ,
1481+ return_value = arrow .utcnow ().datetime
1482+ - 1.5
1483+ * app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_TIME_BEFORE_DRAINING ,
1484+ ) as mocked_get_node_empty_since :
1485+ await auto_scale_cluster (
1486+ app = initialized_app , auto_scaling_mode = ComputationalAutoscaling ()
1487+ )
1488+ mocked_get_node_empty_since .assert_called_once ()
1489+ analyzed_cluster = assert_cluster_state (
1490+ spied_cluster_analysis ,
1491+ expected_calls = 1 ,
1492+ expected_num_machines = app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES ,
1493+ )
1494+ assert analyzed_cluster .active_nodes
1495+ assert not analyzed_cluster .drained_nodes
1496+ # now scaling again should find the drained machine
1497+ drained_machine_instance_id = analyzed_cluster .active_nodes [- 1 ].ec2_instance .id
1498+ mocked_associate_ec2_instances_with_nodes .side_effect = create_fake_association (
1499+ create_fake_node , drained_machine_instance_id , None
1500+ )
1501+ await auto_scale_cluster (
1502+ app = initialized_app , auto_scaling_mode = ComputationalAutoscaling ()
1503+ )
1504+ analyzed_cluster = assert_cluster_state (
1505+ spied_cluster_analysis ,
1506+ expected_calls = 1 ,
1507+ expected_num_machines = app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES ,
1508+ )
1509+ assert analyzed_cluster .active_nodes
1510+ assert analyzed_cluster .drained_nodes
1511+
1512+ # this will initiate termination now
1513+ with mock .patch (
1514+ "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_last_readyness_update" ,
1515+ autospec = True ,
1516+ return_value = arrow .utcnow ().datetime
1517+ - 1.5
1518+ * app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_TIME_BEFORE_TERMINATION ,
1519+ ):
1520+ mock_docker_tag_node .reset_mock ()
1521+ await auto_scale_cluster (
1522+ app = initialized_app , auto_scaling_mode = ComputationalAutoscaling ()
1523+ )
1524+ mock_docker_tag_node .assert_called_with (
1525+ mock .ANY ,
1526+ analyzed_cluster .drained_nodes [- 1 ].node ,
1527+ tags = mock .ANY ,
1528+ available = False ,
1529+ )
1530+
1531+ # scaling again should find the terminating machine
1532+ mocked_associate_ec2_instances_with_nodes .side_effect = create_fake_association (
1533+ create_fake_node , drained_machine_instance_id , drained_machine_instance_id
1534+ )
1535+ await auto_scale_cluster (
1536+ app = initialized_app , auto_scaling_mode = ComputationalAutoscaling ()
1537+ )
1538+ analyzed_cluster = assert_cluster_state (
1539+ spied_cluster_analysis ,
1540+ expected_calls = 1 ,
1541+ expected_num_machines = app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES ,
1542+ )
1543+ assert analyzed_cluster .active_nodes
1544+ assert not analyzed_cluster .drained_nodes
1545+ assert analyzed_cluster .terminating_nodes
1546+
1547+ # now this will terminate it and straight away start a new machine type
1548+ with mock .patch (
1549+ "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_termination_started_since" ,
1550+ autospec = True ,
1551+ return_value = arrow .utcnow ().datetime
1552+ - 1.5
1553+ * app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_TIME_BEFORE_TERMINATION ,
1554+ ):
1555+ mocked_docker_remove_node = mocker .patch (
1556+ "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes" ,
1557+ return_value = None ,
1558+ autospec = True ,
1559+ )
1560+ await auto_scale_cluster (
1561+ app = initialized_app , auto_scaling_mode = ComputationalAutoscaling ()
1562+ )
1563+ mocked_docker_remove_node .assert_called_once ()
1564+
1565+ # now let's check what we have
1566+ all_instances = await ec2_client .describe_instances ()
1567+ assert len (all_instances ["Reservations" ]) == 2 , "there should be 2 Reservations"
1568+ reservation1 = all_instances ["Reservations" ][0 ]
1569+ assert "Instances" in reservation1
1570+ assert len (reservation1 ["Instances" ]) == (
1571+ app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES
1572+ ), f"expected { app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MAX_INSTANCES } EC2 instances, found { len (reservation1 ['Instances' ])} "
1573+ for instance in reservation1 ["Instances" ]:
1574+ assert "InstanceType" in instance
1575+ assert instance ["InstanceType" ] == scale_up_params1 .expected_instance_type
1576+ assert "InstanceId" in instance
1577+ assert "State" in instance
1578+ assert "Name" in instance ["State" ]
1579+ if instance ["InstanceId" ] == drained_machine_instance_id :
1580+ assert instance ["State" ]["Name" ] == "terminated"
1581+ else :
1582+ assert instance ["State" ]["Name" ] == "running"
1583+
1584+ reservation2 = all_instances ["Reservations" ][1 ]
1585+ assert "Instances" in reservation2
1586+ assert (
1587+ len (reservation2 ["Instances" ]) == 1
1588+ ), f"expected 1 EC2 instances, found { len (reservation2 ['Instances' ])} "
1589+ for instance in reservation2 ["Instances" ]:
1590+ assert "InstanceType" in instance
1591+ assert instance ["InstanceType" ] == scale_up_params2 .expected_instance_type
0 commit comments