@@ -150,10 +150,10 @@ async def _analyze_current_cluster(
150150 active_nodes = active_nodes ,
151151 pending_nodes = pending_nodes ,
152152 drained_nodes = drained_nodes ,
153- buffer_drained_nodes = hot_buffer_drained_nodes ,
153+ hot_buffer_drained_nodes = hot_buffer_drained_nodes ,
154154 pending_ec2s = [NonAssociatedInstance (ec2_instance = i ) for i in pending_ec2s ],
155155 broken_ec2s = [NonAssociatedInstance (ec2_instance = i ) for i in broken_ec2s ],
156- buffer_ec2s = [
156+ warm_buffer_ec2s = [
157157 NonAssociatedInstance (ec2_instance = i ) for i in buffer_ec2_instances
158158 ],
159159 terminating_nodes = terminating_nodes ,
@@ -313,7 +313,7 @@ async def _try_attach_pending_ec2s(
313313 _logger .exception ("Unexpected EC2 private dns" )
314314 # NOTE: first provision the reserve drained nodes if possible
315315 all_drained_nodes = (
316- cluster .drained_nodes + cluster .buffer_drained_nodes + new_found_instances
316+ cluster .drained_nodes + cluster .hot_buffer_drained_nodes + new_found_instances
317317 )
318318 drained_nodes , buffer_drained_nodes , _ = sort_drained_nodes (
319319 app_settings , all_drained_nodes , allowed_instance_types
@@ -385,7 +385,9 @@ async def _activate_drained_nodes(
385385) -> Cluster :
386386 nodes_to_activate = [
387387 node
388- for node in itertools .chain (cluster .drained_nodes , cluster .buffer_drained_nodes )
388+ for node in itertools .chain (
389+ cluster .drained_nodes , cluster .hot_buffer_drained_nodes
390+ )
389391 if node .assigned_tasks
390392 ]
391393
@@ -408,7 +410,7 @@ async def _activate_drained_nodes(
408410 ]
409411 remaining_reserved_drained_nodes = [
410412 node
411- for node in cluster .buffer_drained_nodes
413+ for node in cluster .hot_buffer_drained_nodes
412414 if node .ec2_instance .id not in new_active_node_ids
413415 ]
414416 return dataclasses .replace (
@@ -428,11 +430,11 @@ async def _start_warm_buffer_instances(
428430 assert app_settings .AUTOSCALING_EC2_INSTANCES # nosec
429431
430432 instances_to_start = [
431- i .ec2_instance for i in cluster .buffer_ec2s if i .assigned_tasks
433+ i .ec2_instance for i in cluster .warm_buffer_ec2s if i .assigned_tasks
432434 ]
433435
434436 if (
435- len (cluster .buffer_drained_nodes )
437+ len (cluster .hot_buffer_drained_nodes )
436438 < app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MACHINES_BUFFER
437439 ):
438440 # check if we can migrate warm buffers to hot buffers
@@ -444,7 +446,7 @@ async def _start_warm_buffer_instances(
444446 )
445447 free_startable_warm_buffers_to_replace_hot_buffers = [
446448 warm_buffer .ec2_instance
447- for warm_buffer in cluster .buffer_ec2s
449+ for warm_buffer in cluster .warm_buffer_ec2s
448450 if (warm_buffer .ec2_instance .type == hot_buffer_instance_type )
449451 and not warm_buffer .assigned_tasks
450452 ]
@@ -458,7 +460,7 @@ async def _start_warm_buffer_instances(
458460
459461 instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers [
460462 : app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MACHINES_BUFFER
461- - len (cluster .buffer_drained_nodes )
463+ - len (cluster .hot_buffer_drained_nodes )
462464 - len (unnassigned_pending_ec2s )
463465 - len (unnassigned_pending_nodes )
464466 ]
@@ -483,7 +485,7 @@ async def _start_warm_buffer_instances(
483485 cluster ,
484486 buffer_ec2s = [
485487 i
486- for i in cluster .buffer_ec2s
488+ for i in cluster .warm_buffer_ec2s
487489 if i .ec2_instance .id not in started_instance_ids
488490 ],
489491 pending_ec2s = cluster .pending_ec2s
@@ -559,10 +561,10 @@ async def _assign_tasks_to_current_cluster(
559561 functools .partial (_try_assign_task_to_ec2_instance , instances = instances )
560562 for instances in (
561563 cluster .active_nodes ,
562- cluster .drained_nodes + cluster .buffer_drained_nodes ,
564+ cluster .drained_nodes + cluster .hot_buffer_drained_nodes ,
563565 cluster .pending_nodes ,
564566 cluster .pending_ec2s ,
565- cluster .buffer_ec2s ,
567+ cluster .warm_buffer_ec2s ,
566568 )
567569 ]
568570
@@ -686,7 +688,7 @@ async def _find_needed_instances(
686688 if (
687689 num_missing_nodes := (
688690 app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MACHINES_BUFFER
689- - len (cluster .buffer_drained_nodes )
691+ - len (cluster .hot_buffer_drained_nodes )
690692 )
691693 ) > 0 :
692694 # check if some are already pending
@@ -695,7 +697,7 @@ async def _find_needed_instances(
695697 ] + [i .ec2_instance for i in cluster .pending_nodes if not i .assigned_tasks ]
696698 if len (remaining_pending_instances ) < (
697699 app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MACHINES_BUFFER
698- - len (cluster .buffer_drained_nodes )
700+ - len (cluster .hot_buffer_drained_nodes )
699701 ):
700702 default_instance_type = get_hot_buffer_type (available_ec2_types )
701703 num_instances_per_type [default_instance_type ] += num_missing_nodes
@@ -1163,7 +1165,7 @@ async def _scale_up_cluster(
11631165 app_settings = get_application_settings (app )
11641166 assert app_settings .AUTOSCALING_EC2_INSTANCES # nosec
11651167 if not unassigned_tasks and (
1166- len (cluster .buffer_drained_nodes )
1168+ len (cluster .hot_buffer_drained_nodes )
11671169 >= app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MACHINES_BUFFER
11681170 ):
11691171 return cluster
@@ -1245,7 +1247,9 @@ async def _notify_autoscaling_status(
12451247) -> None :
12461248 monitored_instances = list (
12471249 itertools .chain (
1248- cluster .active_nodes , cluster .drained_nodes , cluster .buffer_drained_nodes
1250+ cluster .active_nodes ,
1251+ cluster .drained_nodes ,
1252+ cluster .hot_buffer_drained_nodes ,
12491253 )
12501254 )
12511255
0 commit comments