@@ -393,6 +393,38 @@ async def _activate_and_notify(
393393 return dataclasses .replace (drained_node , node = updated_node )
394394
395395
396+ async def _cancel_previous_pulling_command_if_any (
397+ app : FastAPI ,
398+ instance : EC2InstanceData ,
399+ ) -> None :
400+ if not (
401+ (MACHINE_PULLING_EC2_TAG_KEY in instance .tags )
402+ and (MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY in instance .tags )
403+ ):
404+ # nothing to do
405+ return
406+
407+ ssm_client = get_ssm_client (app )
408+ ec2_client = get_ec2_client (app )
409+ command_id = instance .tags [MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY ]
410+ command = await ssm_client .get_command (instance .id , command_id )
411+ if command .status in ("Pending" , "InProgress" ):
412+ with log_context (
413+ _logger ,
414+ logging .INFO ,
415+ msg = f"cancelling previous pulling { command_id } on { instance .id } " ,
416+ ):
417+ await ssm_client .cancel_command (instance .id , command_id )
418+ await ec2_client .remove_instances_tags (
419+ [instance ],
420+ tags = [
421+ MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY ,
422+ MACHINE_PULLING_EC2_TAG_KEY ,
423+ * list_pre_pulled_images_tag_keys (instance .tags ),
424+ ],
425+ )
426+
427+
396428async def _activate_drained_nodes (
397429 app : FastAPI ,
398430 cluster : Cluster ,
@@ -413,6 +445,12 @@ async def _activate_drained_nodes(
413445 logging .INFO ,
414446 f"activate { len (nodes_to_activate )} drained nodes { [n .ec2_instance .id for n in nodes_to_activate ]} " ,
415447 ):
448+ await asyncio .gather (
449+ * (
450+ _cancel_previous_pulling_command_if_any (app , n .ec2_instance )
451+ for n in nodes_to_activate
452+ )
453+ )
416454 activated_nodes = await asyncio .gather (
417455 * (_activate_and_notify (app , node ) for node in nodes_to_activate )
418456 )
0 commit comments