|
28 | 28 | from servicelib.utils_formatting import timedelta_as_minute_second |
29 | 29 | from types_aiobotocore_ec2.literals import InstanceTypeType |
30 | 30 |
|
31 | | -from ...constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME |
| 31 | +from ...constants import ( |
| 32 | + DOCKER_JOIN_COMMAND_EC2_TAG_KEY, |
| 33 | + DOCKER_JOIN_COMMAND_NAME, |
| 34 | + DOCKER_PULL_COMMAND, |
| 35 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
| 36 | + MACHINE_PULLING_EC2_TAG_KEY, |
| 37 | + PREPULL_COMMAND_NAME, |
| 38 | +) |
32 | 39 | from ...core.errors import ( |
33 | 40 | Ec2InvalidDnsNameError, |
34 | 41 | TaskBestFittingInstanceNotFoundError, |
|
56 | 63 | post_tasks_progress_message, |
57 | 64 | ) |
58 | 65 | from ...utils.warm_buffer_machines import ( |
| 66 | + dump_pre_pulled_images_as_tags, |
59 | 67 | get_activated_warm_buffer_ec2_tags, |
60 | 68 | get_deactivated_warm_buffer_ec2_tags, |
61 | 69 | is_warm_buffer_machine, |
| 70 | + list_pre_pulled_images_tag_keys, |
| 71 | + load_pre_pulled_images_from_tags, |
62 | 72 | ) |
63 | 73 | from ..docker import get_docker_client |
64 | 74 | from ..ec2 import get_ec2_client |
@@ -1348,6 +1358,118 @@ async def _notify_autoscaling_status( |
1348 | 1358 | get_instrumentation(app).cluster_metrics.update_from_cluster(cluster) |
1349 | 1359 |
|
1350 | 1360 |
|
| 1361 | +async def _ensure_hot_buffers_have_pre_pulled_images( |
| 1362 | + app: FastAPI, cluster: Cluster |
| 1363 | +) -> None: |
| 1364 | + if not cluster.hot_buffer_drained_nodes: |
| 1365 | + return |
| 1366 | + ssm_client = get_ssm_client(app) |
| 1367 | + ec2_client = get_ec2_client(app) |
| 1368 | + app_settings = get_application_settings(app) |
| 1369 | + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec |
| 1370 | + # check if we have hot buffers that need to pull images |
| 1371 | + hot_buffer_nodes_needing_pre_pull = [] |
| 1372 | + for node in cluster.hot_buffer_drained_nodes: |
| 1373 | + if MACHINE_PULLING_EC2_TAG_KEY in node.ec2_instance.tags: |
| 1374 | + # check the pulling state |
| 1375 | + ssm_command_id = node.ec2_instance.tags.get( |
| 1376 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY |
| 1377 | + ) |
| 1378 | + if not ssm_command_id: |
| 1379 | + _logger.warning( |
| 1380 | + "%s has pulling tag but no command id, removing tag", |
| 1381 | + node.ec2_instance.id, |
| 1382 | + ) |
| 1383 | + await ec2_client.remove_instances_tags( |
| 1384 | + [node.ec2_instance], tag_keys=[MACHINE_PULLING_EC2_TAG_KEY] |
| 1385 | + ) |
| 1386 | + continue |
| 1387 | + ssm_command = await ssm_client.get_command( |
| 1388 | + node.ec2_instance.id, command_id=ssm_command_id |
| 1389 | + ) |
| 1390 | + if ssm_command.status == "Success": |
| 1391 | + _logger.info("%s finished pre-pulling images", node.ec2_instance.id) |
| 1392 | + await ec2_client.remove_instances_tags( |
| 1393 | + [node.ec2_instance], |
| 1394 | + tag_keys=[ |
| 1395 | + MACHINE_PULLING_EC2_TAG_KEY, |
| 1396 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
| 1397 | + ], |
| 1398 | + ) |
| 1399 | + elif ssm_command.status in ("Failed", "TimedOut"): |
| 1400 | + _logger.error( |
| 1401 | + "%s failed pre-pulling images, status is %s. this will be retried later", |
| 1402 | + node.ec2_instance.id, |
| 1403 | + ssm_command.status, |
| 1404 | + ) |
| 1405 | + await ec2_client.remove_instances_tags( |
| 1406 | + [node.ec2_instance], |
| 1407 | + tag_keys=[ |
| 1408 | + MACHINE_PULLING_EC2_TAG_KEY, |
| 1409 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
| 1410 | + *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), |
| 1411 | + ], |
| 1412 | + ) |
| 1413 | + else: |
| 1414 | + _logger.info( |
| 1415 | + "%s is still pre-pulling images, status is %s", |
| 1416 | + node.ec2_instance.id, |
| 1417 | + ssm_command.status, |
| 1418 | + ) |
| 1419 | + continue |
| 1420 | + |
| 1421 | + # check what they have |
| 1422 | + pre_pulled_images = load_pre_pulled_images_from_tags(node.ec2_instance.tags) |
| 1423 | + desired_pre_pulled_images = ( |
| 1424 | + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
| 1425 | + node.ec2_instance.type |
| 1426 | + ].pre_pull_images |
| 1427 | + ) |
| 1428 | + if pre_pulled_images != desired_pre_pulled_images: |
| 1429 | + _logger.info( |
| 1430 | + "%s needs to pre-pull images %s, currently has %s", |
| 1431 | + node.ec2_instance.id, |
| 1432 | + desired_pre_pulled_images, |
| 1433 | + pre_pulled_images, |
| 1434 | + ) |
| 1435 | + hot_buffer_nodes_needing_pre_pull.append(node) |
| 1436 | + |
| 1437 | + # now trigger pre-pull on these nodes |
| 1438 | + for node in hot_buffer_nodes_needing_pre_pull: |
| 1439 | + _logger.info( |
| 1440 | + "triggering pre-pull of images %s on %s of type %s", |
| 1441 | + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
| 1442 | + node.ec2_instance.type |
| 1443 | + ].pre_pull_images, |
| 1444 | + node.ec2_instance.id, |
| 1445 | + node.ec2_instance.type, |
| 1446 | + ) |
| 1447 | + desired_pre_pulled_images = ( |
| 1448 | + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
| 1449 | + node.ec2_instance.type |
| 1450 | + ].pre_pull_images |
| 1451 | + ) |
| 1452 | + change_docker_compose_and_pull_command = " && ".join( |
| 1453 | + ( |
| 1454 | + utils_docker.write_compose_file_command(desired_pre_pulled_images), |
| 1455 | + DOCKER_PULL_COMMAND, |
| 1456 | + ) |
| 1457 | + ) |
| 1458 | + ssm_command = await ssm_client.send_command( |
| 1459 | + tuple(i.ec2_instance.id for i in hot_buffer_nodes_needing_pre_pull), |
| 1460 | + command=change_docker_compose_and_pull_command, |
| 1461 | + command_name=PREPULL_COMMAND_NAME, |
| 1462 | + ) |
| 1463 | + await ec2_client.set_instances_tags( |
| 1464 | + tuple(i.ec2_instance for i in hot_buffer_nodes_needing_pre_pull), |
| 1465 | + tags={ |
| 1466 | + MACHINE_PULLING_EC2_TAG_KEY: "true", |
| 1467 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: ssm_command.command_id, |
| 1468 | + } |
| 1469 | + | dump_pre_pulled_images_as_tags(desired_pre_pulled_images), |
| 1470 | + ) |
| 1471 | + |
| 1472 | + |
1351 | 1473 | async def auto_scale_cluster( |
1352 | 1474 | *, app: FastAPI, auto_scaling_mode: AutoscalingProvider |
1353 | 1475 | ) -> None: |
@@ -1375,6 +1497,8 @@ async def auto_scale_cluster( |
1375 | 1497 | app, cluster, auto_scaling_mode, allowed_instance_types |
1376 | 1498 | ) |
1377 | 1499 |
|
| 1500 | + # ensure hot buffers have desired pre-pulled images |
| 1501 | + |
1378 | 1502 | # notify |
1379 | 1503 | await _notify_machine_creation_progress(app, cluster) |
1380 | 1504 | await _notify_autoscaling_status(app, cluster, auto_scaling_mode) |
0 commit comments