2020from assertpy import assert_that , soft_assertions
2121from remote_command_executor import RemoteCommandExecutionError , RemoteCommandExecutor
2222from tests .common .assertions import assert_instance_replaced_or_terminating , assert_no_errors_in_logs
23- from tests .common .compute_logs_common import wait_compute_log
2423from tests .common .scaling_common import get_compute_nodes_allocation , get_desired_asg_capacity
2524from tests .common .schedulers_common import get_scheduler_commands
2625from time_utils import minutes , seconds
27- from utils import get_compute_nodes_instance_ids , get_instance_ids_to_compute_hostnames_dict
26+ from utils import get_compute_nodes_instance_ids , get_instance_ids_compute_hostnames_conversion_dict
2827
2928
3029@pytest .mark .skip_schedulers (["awsbatch" ])
@@ -83,7 +82,7 @@ def test_nodewatcher_terminates_failing_node(scheduler, region, pcluster_config_
8382
8483 compute_nodes = scheduler_commands .get_compute_nodes ()
8584 instance_ids = get_compute_nodes_instance_ids (cluster .cfn_name , region )
86- instance_ids_to_hostname = get_instance_ids_to_compute_hostnames_dict (instance_ids )
85+ hostname_to_instance_id = get_instance_ids_compute_hostnames_conversion_dict (instance_ids , id_to_hostname = False )
8786
8887 logging .info ("Testing that nodewatcher will terminate a node in failing state" )
8988 # submit a job to run on all nodes
@@ -95,16 +94,13 @@ def test_nodewatcher_terminates_failing_node(scheduler, region, pcluster_config_
9594 remote_command_executor .run_remote_script (
9695 str (test_datadir / "{0}_kill_scheduler_job.sh" .format (scheduler )), args = [node ]
9796 )
98- failed_instances = wait_compute_log (remote_command_executor , expected_num_nodes_killed )
99-
100- for instance_id in failed_instances :
101- assert_that (nodes_to_remove ).contains (instance_ids_to_hostname .get (instance_id ))
102- _assert_compute_logs (remote_command_executor , instance_id )
103- assert_instance_replaced_or_terminating (instance_id , region )
10497
98+ # assert failing nodes are terminated according to ASG
99+ _assert_failing_nodes_terminated (nodes_to_remove , hostname_to_instance_id , region )
105100 nodes_to_retain = [compute for compute in compute_nodes if compute not in nodes_to_remove ]
106101 # verify that desired capacity is still the initial_queue_size
107102 assert_that (get_desired_asg_capacity (region , cluster .cfn_name )).is_equal_to (initial_queue_size )
103+ # assert failing nodes are removed from scheduler config
108104 _assert_nodes_removed_and_replaced_in_scheduler (
109105 scheduler_commands , nodes_to_remove , nodes_to_retain , desired_capacity = initial_queue_size
110106 )
@@ -134,6 +130,12 @@ def test_scaling_with_manual_actions(scheduler, region, pcluster_config_reader,
134130 _test_keep_or_replace_suspended_nodes (scheduler_commands , num_compute_nodes )
135131
136132
133+ @retry (wait_fixed = seconds (30 ), stop_max_delay = minutes (15 ))
134+ def _assert_failing_nodes_terminated (nodes_to_remove , hostname_to_instance_id , region ):
135+ for node in nodes_to_remove :
136+ assert_instance_replaced_or_terminating (hostname_to_instance_id .get (node ), region )
137+
138+
137139def _assert_initial_conditions (scheduler_commands , num_compute_nodes ):
138140 """Assert cluster is in expected state before test starts; return list of compute nodes."""
139141 compute_nodes = scheduler_commands .get_compute_nodes ()
@@ -150,7 +152,7 @@ def _test_replace_terminated_nodes(scheduler_commands, num_compute_nodes, instan
150152 """Test that slurm nodes are replaced if instances are terminated manually."""
151153 logging .info ("Testing that nodes are replaced when terminated manually" )
152154 compute_nodes = _assert_initial_conditions (scheduler_commands , num_compute_nodes )
153- instance_ids_to_hostname = get_instance_ids_to_compute_hostnames_dict (instance_ids )
155+ instance_ids_to_hostname = get_instance_ids_compute_hostnames_conversion_dict (instance_ids , id_to_hostname = True )
154156 # Run job on all nodes
155157 _submit_sleep_job (scheduler_commands , num_compute_nodes )
156158 nodes_to_retain = [instance_ids_to_hostname [instance_ids [0 ]]]
@@ -280,16 +282,6 @@ def _assert_num_nodes_in_scheduler(scheduler_commands, desired):
280282 assert_that (len (scheduler_commands .get_compute_nodes ())).is_equal_to (desired )
281283
282284
283- def _assert_compute_logs (remote_command_executor , instance_id ):
284- remote_command_executor .run_remote_command (
285- "tar -xf /home/logs/compute/{0}.tar.gz --directory /tmp" .format (instance_id )
286- )
287- remote_command_executor .run_remote_command ("test -f /tmp/var/log/nodewatcher" )
288- messages_log = remote_command_executor .run_remote_command ("cat /tmp/var/log/nodewatcher" , hide = True ).stdout
289- assert_that (messages_log ).contains ("Node is marked as down by scheduler or not attached correctly. Terminating..." )
290- assert_that (messages_log ).contains ("Dumping logs to /home/logs/compute/{0}.tar.gz" .format (instance_id ))
291-
292-
293285def _assert_scaling_works (
294286 asg_capacity_time_series , compute_nodes_time_series , expected_asg_capacity , expected_compute_nodes
295287):
0 commit comments