|
56 | 56 | _logger = logging.getLogger(__name__) |
57 | 57 |
|
58 | 58 |
|
59 | | -async def _analyze_running_instance_state( |
| 59 | +async def _record_instance_ready_metrics( |
| 60 | + app: FastAPI, *, instance: EC2InstanceData |
| 61 | +) -> None: |
| 62 | + """Record metrics for instances ready to pull images.""" |
| 63 | + if has_instrumentation(app): |
| 64 | + get_instrumentation( |
| 65 | + app |
| 66 | + ).buffer_machines_pools_metrics.instances_ready_to_pull_seconds.labels( |
| 67 | + instance_type=instance.type |
| 68 | + ).observe((arrow.utcnow().datetime - instance.launch_time).total_seconds()) |
| 69 | + |
| 70 | + |
| 71 | +async def _handle_completed_cloud_init_instance( |
60 | 72 | app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData |
61 | | -): |
| 73 | +) -> None: |
| 74 | + """Handle instance that has completed cloud init.""" |
| 75 | + app_settings = get_application_settings(app) |
| 76 | + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec |
| 77 | + |
| 78 | + await _record_instance_ready_metrics(app, instance=instance) |
| 79 | + |
| 80 | + if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
| 81 | + instance.type |
| 82 | + ].pre_pull_images: |
| 83 | + buffer_pool.waiting_to_pull_instances.add(instance) |
| 84 | + else: |
| 85 | + buffer_pool.waiting_to_stop_instances.add(instance) |
| 86 | + |
| 87 | + |
| 88 | +async def _handle_ssm_connected_instance( |
| 89 | + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData |
| 90 | +) -> None: |
| 91 | + """Handle instance connected to SSM server.""" |
62 | 92 | ssm_client = get_ssm_client(app) |
| 93 | + |
| 94 | + try: |
| 95 | + if await ssm_client.wait_for_has_instance_completed_cloud_init(instance.id): |
| 96 | + await _handle_completed_cloud_init_instance( |
| 97 | + app, buffer_pool=buffer_pool, instance=instance |
| 98 | + ) |
| 99 | + else: |
| 100 | + buffer_pool.pending_instances.add(instance) |
| 101 | + except ( |
| 102 | + SSMCommandExecutionResultError, |
| 103 | + SSMCommandExecutionTimeoutError, |
| 104 | + ): |
| 105 | + _logger.exception( |
| 106 | + "Unnexpected error when checking EC2 cloud initialization completion!. " |
| 107 | + "The machine will be terminated. TIP: check the initialization phase for errors." |
| 108 | + ) |
| 109 | + buffer_pool.broken_instances.add(instance) |
| 110 | + |
| 111 | + |
| 112 | +async def _handle_unconnected_instance( |
| 113 | + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData |
| 114 | +) -> None: |
| 115 | + """Handle instance not connected to SSM server.""" |
63 | 116 | app_settings = get_application_settings(app) |
64 | 117 | assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec |
65 | 118 |
|
| 119 | + is_broken = bool( |
| 120 | + (arrow.utcnow().datetime - instance.launch_time) |
| 121 | + > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME |
| 122 | + ) |
| 123 | + |
| 124 | + if is_broken: |
| 125 | + _logger.error( |
| 126 | + "The machine does not connect to the SSM server after %s. It will be terminated. TIP: check the initialization phase for errors.", |
| 127 | + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME, |
| 128 | + ) |
| 129 | + buffer_pool.broken_instances.add(instance) |
| 130 | + else: |
| 131 | + buffer_pool.pending_instances.add(instance) |
| 132 | + |
| 133 | + |
| 134 | +async def _analyze_running_instance_state( |
| 135 | + app: FastAPI, *, buffer_pool: WarmBufferPool, instance: EC2InstanceData |
| 136 | +) -> None: |
| 137 | + """Analyze and categorize running instance based on its current state.""" |
| 138 | + ssm_client = get_ssm_client(app) |
| 139 | + |
66 | 140 | if BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags: |
67 | 141 | buffer_pool.pulling_instances.add(instance) |
68 | 142 | elif await ssm_client.is_instance_connected_to_ssm_server(instance.id): |
69 | | - try: |
70 | | - if await ssm_client.wait_for_has_instance_completed_cloud_init(instance.id): |
71 | | - if has_instrumentation(app): |
72 | | - get_instrumentation( |
73 | | - app |
74 | | - ).buffer_machines_pools_metrics.instances_ready_to_pull_seconds.labels( |
75 | | - instance_type=instance.type |
76 | | - ).observe( |
77 | | - (arrow.utcnow().datetime - instance.launch_time).total_seconds() |
78 | | - ) |
79 | | - if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
80 | | - instance.type |
81 | | - ].pre_pull_images: |
82 | | - buffer_pool.waiting_to_pull_instances.add(instance) |
83 | | - else: |
84 | | - buffer_pool.waiting_to_stop_instances.add(instance) |
85 | | - else: |
86 | | - buffer_pool.pending_instances.add(instance) |
87 | | - except ( |
88 | | - SSMCommandExecutionResultError, |
89 | | - SSMCommandExecutionTimeoutError, |
90 | | - ): |
91 | | - _logger.exception( |
92 | | - "Unnexpected error when checking EC2 cloud initialization completion!. " |
93 | | - "The machine will be terminated. TIP: check the initialization phase for errors." |
94 | | - ) |
95 | | - buffer_pool.broken_instances.add(instance) |
| 143 | + await _handle_ssm_connected_instance( |
| 144 | + app, buffer_pool=buffer_pool, instance=instance |
| 145 | + ) |
96 | 146 | else: |
97 | | - is_broken = bool( |
98 | | - (arrow.utcnow().datetime - instance.launch_time) |
99 | | - > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME |
| 147 | + await _handle_unconnected_instance( |
| 148 | + app, buffer_pool=buffer_pool, instance=instance |
100 | 149 | ) |
101 | 150 |
|
102 | | - if is_broken: |
103 | | - _logger.error( |
104 | | - "The machine does not connect to the SSM server after %s. It will be terminated. TIP: check the initialization phase for errors.", |
105 | | - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME, |
106 | | - ) |
107 | | - buffer_pool.broken_instances.add(instance) |
108 | | - else: |
109 | | - buffer_pool.pending_instances.add(instance) |
110 | | - |
111 | 151 |
|
112 | 152 | async def _analyse_current_state( |
113 | 153 | app: FastAPI, *, auto_scaling_mode: AutoscalingProvider |
|
0 commit comments