|
52 | 52 | post_tasks_progress_message, |
53 | 53 | ) |
54 | 54 | from ...utils.warm_buffer_machines import ( |
55 | | - get_activated_buffer_ec2_tags, |
| 55 | + get_activated_warm_buffer_ec2_tags, |
56 | 56 | get_deactivated_buffer_ec2_tags, |
57 | 57 | is_warm_buffer_machine, |
58 | 58 | ) |
@@ -89,7 +89,7 @@ async def _analyze_current_cluster( |
89 | 89 | state_names=["terminated"], |
90 | 90 | ) |
91 | 91 |
|
92 | | - buffer_ec2_instances = await get_ec2_client(app).get_instances( |
| 92 | + warm_buffer_ec2_instances = await get_ec2_client(app).get_instances( |
93 | 93 | key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], |
94 | 94 | tags=get_deactivated_buffer_ec2_tags(auto_scaling_mode.get_ec2_tags(app)), |
95 | 95 | state_names=["stopped"], |
@@ -154,7 +154,7 @@ async def _analyze_current_cluster( |
154 | 154 | pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s], |
155 | 155 | broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s], |
156 | 156 | warm_buffer_ec2s=[ |
157 | | - NonAssociatedInstance(ec2_instance=i) for i in buffer_ec2_instances |
| 157 | + NonAssociatedInstance(ec2_instance=i) for i in warm_buffer_ec2_instances |
158 | 158 | ], |
159 | 159 | terminating_nodes=terminating_nodes, |
160 | 160 | terminated_instances=[ |
@@ -203,7 +203,7 @@ async def _terminate_broken_ec2s(app: FastAPI, cluster: Cluster) -> Cluster: |
203 | 203 | ) |
204 | 204 |
|
205 | 205 |
|
206 | | -async def _make_pending_buffer_ec2s_join_cluster( |
| 206 | +async def _make_pending_warm_buffer_ec2s_join_cluster( |
207 | 207 | app: FastAPI, |
208 | 208 | cluster: Cluster, |
209 | 209 | ) -> Cluster: |
@@ -339,10 +339,10 @@ async def _sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: |
339 | 339 | allowed_instance_type_names |
340 | 340 | ), "EC2_INSTANCES_ALLOWED_TYPES cannot be empty!" |
341 | 341 |
|
342 | | - allowed_instance_types: list[ |
343 | | - EC2InstanceType |
344 | | - ] = await ec2_client.get_ec2_instance_capabilities( |
345 | | - cast(set[InstanceTypeType], set(allowed_instance_type_names)) |
| 342 | + allowed_instance_types: list[EC2InstanceType] = ( |
| 343 | + await ec2_client.get_ec2_instance_capabilities( |
| 344 | + cast(set[InstanceTypeType], set(allowed_instance_type_names)) |
| 345 | + ) |
346 | 346 | ) |
347 | 347 |
|
348 | 348 | def _as_selection(instance_type: EC2InstanceType) -> int: |
@@ -469,15 +469,17 @@ async def _start_warm_buffer_instances( |
469 | 469 | return cluster |
470 | 470 |
|
471 | 471 | with log_context( |
472 | | - _logger, logging.INFO, f"start {len(instances_to_start)} buffer machines" |
| 472 | + _logger, logging.INFO, f"start {len(instances_to_start)} warm buffer machines" |
473 | 473 | ): |
474 | 474 | started_instances = await get_ec2_client(app).start_instances( |
475 | 475 | instances_to_start |
476 | 476 | ) |
477 | 477 | # NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity) |
478 | 478 | await get_ec2_client(app).set_instances_tags( |
479 | 479 | started_instances, |
480 | | - tags=get_activated_buffer_ec2_tags(auto_scaling_mode.get_ec2_tags(app)), |
| 480 | + tags=get_activated_warm_buffer_ec2_tags( |
| 481 | + auto_scaling_mode.get_ec2_tags(app) |
| 482 | + ), |
481 | 483 | ) |
482 | 484 | started_instance_ids = [i.id for i in started_instances] |
483 | 485 |
|
@@ -682,7 +684,7 @@ async def _find_needed_instances( |
682 | 684 | ), |
683 | 685 | ) |
684 | 686 |
|
685 | | - # 2. check the buffer needs |
| 687 | + # 2. check the hot buffer needs |
686 | 688 | app_settings = get_application_settings(app) |
687 | 689 | assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec |
688 | 690 | if ( |
@@ -1076,9 +1078,9 @@ async def _notify_based_on_machine_type( |
1076 | 1078 | launch_time_to_tasks: dict[datetime.datetime, list] = collections.defaultdict(list) |
1077 | 1079 | now = datetime.datetime.now(datetime.UTC) |
1078 | 1080 | for instance in instances: |
1079 | | - launch_time_to_tasks[instance.ec2_instance.launch_time] += ( |
1080 | | - instance.assigned_tasks |
1081 | | - ) |
| 1081 | + launch_time_to_tasks[ |
| 1082 | + instance.ec2_instance.launch_time |
| 1083 | + ] += instance.assigned_tasks |
1082 | 1084 |
|
1083 | 1085 | for launch_time, tasks in launch_time_to_tasks.items(): |
1084 | 1086 | time_since_launch = now - launch_time |
@@ -1289,7 +1291,7 @@ async def auto_scale_cluster( |
1289 | 1291 | # cleanup |
1290 | 1292 | cluster = await _cleanup_disconnected_nodes(app, cluster) |
1291 | 1293 | cluster = await _terminate_broken_ec2s(app, cluster) |
1292 | | - cluster = await _make_pending_buffer_ec2s_join_cluster(app, cluster) |
| 1294 | + cluster = await _make_pending_warm_buffer_ec2s_join_cluster(app, cluster) |
1293 | 1295 | cluster = await _try_attach_pending_ec2s( |
1294 | 1296 | app, cluster, auto_scaling_mode, allowed_instance_types |
1295 | 1297 | ) |
|
0 commit comments