66
77from __future__ import annotations
88
9+ import logging
10+ import shlex
11+ import time
12+ from collections .abc import Sequence
13+
914import boto3
1015import click
1116from botocore .exceptions import ClientError
1217
18+ from lib .amazon import as_client , ec2
19+ from lib .ce_utils import are_you_sure
1320from lib .cli import cli
1421from lib .env import Config
22+ from lib .ssh import exec_remote , exec_remote_all
23+
24+ LOGGER = logging .getLogger (__name__ )
1525
1626
1727@cli .group ()
@@ -20,6 +30,49 @@ def ce_router():
2030 pass
2131
2232
33+ class CERouterInstance :
34+ """Wrapper for CE Router instances to work with SSH utilities."""
35+
36+ def __init__ (self , instance ):
37+ self .instance = instance
38+ self .elb_health = "unknown"
39+ self .service_status = {"SubState" : "unknown" }
40+ self .running_version = "ce-router"
41+
42+ def __str__ (self ):
43+ return f"{ self .instance .id } @{ self .instance .private_ip_address } "
44+
45+
46+ def _get_ce_router_instances (cfg : Config ) -> list [CERouterInstance ]:
47+ """Get all CE Router instances from the ASG."""
48+ asg_name = f"ce-router-{ cfg .env .name .lower ()} "
49+
50+ try :
51+ response = as_client .describe_auto_scaling_groups (AutoScalingGroupNames = [asg_name ])
52+
53+ if not response ["AutoScalingGroups" ]:
54+ LOGGER .warning (f"ASG '{ asg_name } ' not found" )
55+ return []
56+
57+ asg = response ["AutoScalingGroups" ][0 ]
58+ instance_ids = [instance ["InstanceId" ] for instance in asg ["Instances" ]]
59+
60+ if not instance_ids :
61+ return []
62+
63+ instances = []
64+ for instance_id in instance_ids :
65+ ec2_instance = ec2 .Instance (id = instance_id )
66+ ec2_instance .load ()
67+ instances .append (CERouterInstance (ec2_instance ))
68+
69+ return instances
70+
71+ except ClientError as e :
72+ LOGGER .error (f"Error getting CE Router instances: { e } " )
73+ return []
74+
75+
2376def _get_alb_client ():
2477 """Get ALB (ELBv2) client."""
2578 return boto3 .client ("elbv2" )
@@ -421,3 +474,162 @@ def status(cfg: Config):
421474
422475 except ClientError as e :
423476 click .echo (f"CE-ROUTER | ❌ ERROR: { str (e )} " )
477+
478+
479+ @ce_router .command (name = "exec_all" )
480+ @click .pass_obj
481+ @click .argument ("remote_cmd" , required = True , nargs = - 1 )
482+ def exec_all (cfg : Config , remote_cmd : Sequence [str ]):
483+ """
484+ Execute REMOTE_CMD on all CE Router instances.
485+
486+ Examples:
487+ ce ce-router exec_all uptime
488+ ce ce-router exec_all sudo systemctl status ce-router
489+ ce ce-router exec_all curl -f http://localhost:10240/healthcheck
490+ """
491+ instances = _get_ce_router_instances (cfg )
492+
493+ if not instances :
494+ click .echo (f"No CE Router instances found for environment { cfg .env .name } " )
495+ return
496+
497+ escaped = shlex .join (remote_cmd )
498+ if not are_you_sure (f"exec command { escaped } on all { len (instances )} CE Router instances" , cfg ):
499+ return
500+
501+ click .echo (f"Running '{ escaped } ' on { len (instances )} CE Router instances..." )
502+ exec_remote_all (instances , remote_cmd )
503+
504+
505+ @ce_router .command (name = "version" )
506+ @click .pass_obj
507+ def version (cfg : Config ):
508+ """
509+ Show the installed CE Router version on all instances.
510+
511+ Example:
512+ ce ce-router version
513+ """
514+ instances = _get_ce_router_instances (cfg )
515+
516+ if not instances :
517+ click .echo (f"No CE Router instances found for environment { cfg .env .name } " )
518+ return
519+
520+ click .echo (f"CE Router versions for { cfg .env .name } :" )
521+ click .echo ("" )
522+
523+ for instance in instances :
524+ try :
525+ version_output = exec_remote (instance , ["cat" , "/infra/.deploy/ce-router-version" ], ignore_errors = True )
526+ version_str = version_output .strip () if version_output else "unknown"
527+ click .echo (f" { instance } : { version_str } " )
528+ except RuntimeError :
529+ click .echo (f" { instance } : error reading version" )
530+
531+
532+ @ce_router .command (name = "refresh" )
533+ @click .option (
534+ "--min-healthy-percent" ,
535+ type = click .IntRange (min = 0 , max = 100 ),
536+ metavar = "PERCENT" ,
537+ help = "While updating, ensure at least PERCENT are healthy" ,
538+ default = 75 ,
539+ show_default = True ,
540+ )
541+ @click .option ("--skip-confirmation" , is_flag = True , help = "Skip confirmation prompt" )
542+ @click .pass_obj
543+ def refresh (cfg : Config , min_healthy_percent : int , skip_confirmation : bool ):
544+ """
545+ Refresh CE Router instances by replacing them with new ones.
546+
547+ This starts an AWS instance refresh which will:
548+ 1. Launch new instances with the latest CE Router version
549+ 2. Wait for them to become healthy
550+ 3. Terminate old instances
551+ 4. Repeat until all instances are replaced
552+
553+ The refresh maintains the specified minimum healthy percentage throughout.
554+
555+ Example:
556+ ce ce-router refresh
557+ ce ce-router refresh --min-healthy-percent 90
558+ """
559+ asg_name = f"ce-router-{ cfg .env .name .lower ()} "
560+
561+ try :
562+ # Check if ASG exists
563+ response = as_client .describe_auto_scaling_groups (AutoScalingGroupNames = [asg_name ])
564+
565+ if not response ["AutoScalingGroups" ]:
566+ click .echo (f"ASG '{ asg_name } ' not found" )
567+ return
568+
569+ asg = response ["AutoScalingGroups" ][0 ]
570+
571+ if asg ["DesiredCapacity" ] == 0 :
572+ click .echo (f"Skipping ASG { asg_name } as it has zero desired capacity" )
573+ return
574+
575+ # Check for existing refresh
576+ describe_state = as_client .describe_instance_refreshes (AutoScalingGroupName = asg_name )
577+ existing_refreshes = [
578+ x for x in describe_state ["InstanceRefreshes" ] if x ["Status" ] in ("Pending" , "InProgress" )
579+ ]
580+
581+ if existing_refreshes :
582+ refresh_id = existing_refreshes [0 ]["InstanceRefreshId" ]
583+ click .echo (f"Found existing refresh { refresh_id } for { asg_name } " )
584+ else :
585+ if not skip_confirmation and not are_you_sure (
586+ f"refresh CE Router instances in { asg_name } (min healthy: { min_healthy_percent } %)" , cfg
587+ ):
588+ return
589+
590+ click .echo ("Starting instance refresh..." )
591+ refresh_result = as_client .start_instance_refresh (
592+ AutoScalingGroupName = asg_name , Preferences = {"MinHealthyPercentage" : min_healthy_percent }
593+ )
594+ refresh_id = refresh_result ["InstanceRefreshId" ]
595+ click .echo (f"Refresh started with ID: { refresh_id } " )
596+
597+ # Monitor progress
598+ last_log = ""
599+ while True :
600+ time .sleep (5 )
601+ describe_state = as_client .describe_instance_refreshes (
602+ AutoScalingGroupName = asg_name , InstanceRefreshIds = [refresh_id ]
603+ )
604+ refresh_data = describe_state ["InstanceRefreshes" ][0 ]
605+ status = refresh_data ["Status" ]
606+
607+ if status == "InProgress" :
608+ log = (
609+ f" { status } , { refresh_data ['PercentageComplete' ]} %, "
610+ f"{ refresh_data ['InstancesToUpdate' ]} to update. "
611+ f"{ refresh_data .get ('StatusReason' , '' )} "
612+ )
613+ else :
614+ log = f" Status: { status } "
615+
616+ if log != last_log :
617+ click .echo (log )
618+ last_log = log
619+
620+ if status in ("Successful" , "Failed" , "Cancelled" ):
621+ break
622+
623+ if status == "Successful" :
624+ click .echo ("" )
625+ click .echo ("Instance refresh completed successfully!" )
626+ click .echo ("New instances are now running with the latest CE Router version." )
627+ elif status == "Failed" :
628+ click .echo ("" )
629+ click .echo (f"Instance refresh failed: { refresh_data .get ('StatusReason' , 'Unknown reason' )} " )
630+ else :
631+ click .echo ("" )
632+ click .echo ("Instance refresh was cancelled" )
633+
634+ except ClientError as e :
635+ click .echo (f"Error refreshing CE Router instances: { e } " )
0 commit comments