@@ -42,8 +42,8 @@ def _get_instance_last_heartbeat(instance: EC2InstanceData) -> datetime.datetime
4242async def _get_all_associated_worker_instances (
4343 app : FastAPI ,
4444 primary_instances : Iterable [EC2InstanceData ],
45- ) -> list [EC2InstanceData ]:
46- worker_instances = []
45+ ) -> set [EC2InstanceData ]:
46+ worker_instances : set [ EC2InstanceData ] = set ()
4747 for instance in primary_instances :
4848 assert "user_id" in instance .tags # nosec
4949 user_id = UserID (instance .tags [_USER_ID_TAG_KEY ])
@@ -55,20 +55,20 @@ async def _get_all_associated_worker_instances(
5555 else None
5656 )
5757
58- worker_instances .extend (
58+ worker_instances .update (
5959 await get_cluster_workers (app , user_id = user_id , wallet_id = wallet_id )
6060 )
6161 return worker_instances
6262
6363
6464async def _find_terminateable_instances (
6565 app : FastAPI , instances : Iterable [EC2InstanceData ]
66- ) -> list [EC2InstanceData ]:
66+ ) -> set [EC2InstanceData ]:
6767 app_settings = get_application_settings (app )
6868 assert app_settings .CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES # nosec
6969
7070 # get the corresponding ec2 instance data
71- terminateable_instances : list [EC2InstanceData ] = []
71+ terminateable_instances : set [EC2InstanceData ] = set ()
7272
7373 time_to_wait_before_termination = (
7474 app_settings .CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION
@@ -82,7 +82,7 @@ async def _find_terminateable_instances(
8282 elapsed_time_since_heartbeat = arrow .utcnow ().datetime - last_heartbeat
8383 allowed_time_to_wait = time_to_wait_before_termination
8484 if elapsed_time_since_heartbeat >= allowed_time_to_wait :
85- terminateable_instances .append (instance )
85+ terminateable_instances .add (instance )
8686 else :
8787 _logger .info (
8888 "%s has still %ss before being terminateable" ,
@@ -93,14 +93,14 @@ async def _find_terminateable_instances(
9393 elapsed_time_since_startup = arrow .utcnow ().datetime - instance .launch_time
9494 allowed_time_to_wait = startup_delay
9595 if elapsed_time_since_startup >= allowed_time_to_wait :
96- terminateable_instances .append (instance )
96+ terminateable_instances .add (instance )
9797
9898 # get all terminateable instances associated worker instances
9999 worker_instances = await _get_all_associated_worker_instances (
100100 app , terminateable_instances
101101 )
102102
103- return terminateable_instances + worker_instances
103+ return terminateable_instances . union ( worker_instances )
104104
105105
106106async def check_clusters (app : FastAPI ) -> None :
0 commit comments