|
20 | 20 | EC2InsufficientCapacityError, |
21 | 21 | EC2TooManyInstancesError, |
22 | 22 | ) |
| 23 | +from aws_library.ec2._models import AWSTagKey |
| 24 | +from aws_library.ssm._errors import SSMAccessError |
| 25 | +from common_library.logging.logging_errors import create_troubleshooting_log_kwargs |
23 | 26 | from fastapi import FastAPI |
24 | 27 | from models_library.generated_models.docker_rest_api import Node |
25 | 28 | from models_library.rabbitmq_messages import ProgressType |
@@ -1396,97 +1399,159 @@ async def _notify_autoscaling_status( |
1396 | 1399 | get_instrumentation(app).cluster_metrics.update_from_cluster(cluster) |
1397 | 1400 |
|
1398 | 1401 |
|
1399 | | -async def _ensure_hot_buffers_have_pre_pulled_images( |
1400 | | - app: FastAPI, cluster: Cluster |
1401 | | -) -> None: |
1402 | | - if not cluster.hot_buffer_drained_nodes: |
1403 | | - return |
| 1402 | +async def _handle_pre_pull_status( |
| 1403 | + app: FastAPI, node: AssociatedInstance |
| 1404 | +) -> AssociatedInstance: |
| 1405 | + if MACHINE_PULLING_EC2_TAG_KEY not in node.ec2_instance.tags: |
| 1406 | + return node |
| 1407 | + |
1404 | 1408 | ssm_client = get_ssm_client(app) |
1405 | 1409 | ec2_client = get_ec2_client(app) |
1406 | | - app_settings = get_application_settings(app) |
1407 | | - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec |
1408 | | - # check if we have hot buffers that need to pull images |
1409 | | - hot_buffer_nodes_needing_pre_pull = [] |
1410 | | - for node in cluster.hot_buffer_drained_nodes: |
1411 | | - if MACHINE_PULLING_EC2_TAG_KEY in node.ec2_instance.tags: |
1412 | | - # check the pulling state |
1413 | | - ssm_command_id = node.ec2_instance.tags.get( |
1414 | | - MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY |
1415 | | - ) |
1416 | | - if not ssm_command_id: |
1417 | | - _logger.warning( |
1418 | | - "%s has pulling tag but no command id, removing tag", |
1419 | | - node.ec2_instance.id, |
1420 | | - ) |
1421 | | - await ec2_client.remove_instances_tags( |
1422 | | - [node.ec2_instance], tag_keys=[MACHINE_PULLING_EC2_TAG_KEY] |
1423 | | - ) |
1424 | | - continue |
1425 | | - ssm_command = await ssm_client.get_command( |
1426 | | - node.ec2_instance.id, command_id=ssm_command_id |
| 1410 | + ssm_command_id = node.ec2_instance.tags.get(MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY) |
| 1411 | + |
| 1412 | + async def _remove_tags_and_return( |
| 1413 | + node: AssociatedInstance, tag_keys: list[AWSTagKey] |
| 1414 | + ) -> AssociatedInstance: |
| 1415 | + await ec2_client.remove_instances_tags( |
| 1416 | + [node.ec2_instance], |
| 1417 | + tag_keys=tag_keys, |
| 1418 | + ) |
| 1419 | + for tag_key in tag_keys: |
| 1420 | + node.ec2_instance.tags.pop(tag_key, None) |
| 1421 | + return node |
| 1422 | + |
| 1423 | + if not ssm_command_id: |
| 1424 | + _logger.error( |
| 1425 | + "%s has '%s' tag key set but no associated command id '%s' tag key, " |
| 1426 | + "this is unexpected but will be cleaned now. Pre-pulling will be retried again later.", |
| 1427 | + node.ec2_instance.id, |
| 1428 | + MACHINE_PULLING_EC2_TAG_KEY, |
| 1429 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
| 1430 | + ) |
| 1431 | + return await _remove_tags_and_return( |
| 1432 | + node, |
| 1433 | + [ |
| 1434 | + MACHINE_PULLING_EC2_TAG_KEY, |
| 1435 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
| 1436 | + *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), |
| 1437 | + ], |
| 1438 | + ) |
| 1439 | + |
| 1440 | + try: |
| 1441 | + ssm_command = await ssm_client.get_command( |
| 1442 | + node.ec2_instance.id, command_id=ssm_command_id |
| 1443 | + ) |
| 1444 | + except SSMAccessError as exc: |
| 1445 | + _logger.exception( |
| 1446 | + **create_troubleshooting_log_kwargs( |
| 1447 | + f"Unexpected SSM access error to get status of command {ssm_command_id} on {node.ec2_instance.id}", |
| 1448 | + error=exc, |
| 1449 | + tip="Pre-pulling will be retried again later.", |
1427 | 1450 | ) |
1428 | | - if ssm_command.status == "Success": |
| 1451 | + ) |
| 1452 | + return await _remove_tags_and_return( |
| 1453 | + node, |
| 1454 | + [ |
| 1455 | + MACHINE_PULLING_EC2_TAG_KEY, |
| 1456 | + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
| 1457 | + *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), |
| 1458 | + ], |
| 1459 | + ) |
| 1460 | + else: |
| 1461 | + match ssm_command.status: |
| 1462 | + case "Success": |
1429 | 1463 | _logger.info("%s finished pre-pulling images", node.ec2_instance.id) |
1430 | | - await ec2_client.remove_instances_tags( |
1431 | | - [node.ec2_instance], |
1432 | | - tag_keys=[ |
| 1464 | + return await _remove_tags_and_return( |
| 1465 | + node, |
| 1466 | + [ |
1433 | 1467 | MACHINE_PULLING_EC2_TAG_KEY, |
1434 | 1468 | MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
1435 | 1469 | ], |
1436 | 1470 | ) |
1437 | | - elif ssm_command.status in ("Failed", "TimedOut"): |
| 1471 | + case "Failed" | "TimedOut": |
1438 | 1472 | _logger.error( |
1439 | 1473 | "%s failed pre-pulling images, status is %s. this will be retried later", |
1440 | 1474 | node.ec2_instance.id, |
1441 | 1475 | ssm_command.status, |
1442 | 1476 | ) |
1443 | | - await ec2_client.remove_instances_tags( |
1444 | | - [node.ec2_instance], |
1445 | | - tag_keys=[ |
| 1477 | + return await _remove_tags_and_return( |
| 1478 | + node, |
| 1479 | + [ |
1446 | 1480 | MACHINE_PULLING_EC2_TAG_KEY, |
1447 | 1481 | MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, |
1448 | 1482 | *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), |
1449 | 1483 | ], |
1450 | 1484 | ) |
1451 | | - else: |
| 1485 | + case _: |
1452 | 1486 | _logger.info( |
1453 | | - "%s is still pre-pulling images, status is %s", |
| 1487 | + "%s is pre-pulling %s, status is %s", |
1454 | 1488 | node.ec2_instance.id, |
| 1489 | + load_pre_pulled_images_from_tags(node.ec2_instance.tags), |
1455 | 1490 | ssm_command.status, |
1456 | 1491 | ) |
1457 | | - continue |
| 1492 | + # skip the instance this time as this is still ongoing |
| 1493 | + return node |
| 1494 | + |
| 1495 | + |
| 1496 | +async def _pre_pull_docker_images_on_idle_hot_buffers( |
| 1497 | + app: FastAPI, cluster: Cluster |
| 1498 | +) -> None: |
| 1499 | + if not cluster.hot_buffer_drained_nodes: |
| 1500 | + return |
| 1501 | + ssm_client = get_ssm_client(app) |
| 1502 | + ec2_client = get_ec2_client(app) |
| 1503 | + app_settings = get_application_settings(app) |
| 1504 | + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec |
| 1505 | + # check if we have hot buffers that need to pull images |
| 1506 | + hot_buffer_nodes_needing_pre_pull = [] |
| 1507 | + for node in cluster.hot_buffer_drained_nodes: |
| 1508 | + updated_node = await _handle_pre_pull_status(app, node) |
| 1509 | + if MACHINE_PULLING_EC2_TAG_KEY in updated_node.ec2_instance.tags: |
| 1510 | + continue # skip this one as it is still pre-pulling |
1458 | 1511 |
|
1459 | 1512 | # check what they have |
1460 | | - pre_pulled_images = load_pre_pulled_images_from_tags(node.ec2_instance.tags) |
1461 | | - desired_pre_pulled_images = ( |
| 1513 | + pre_pulled_images = load_pre_pulled_images_from_tags( |
| 1514 | + updated_node.ec2_instance.tags |
| 1515 | + ) |
| 1516 | + ec2_boot_specific = ( |
1462 | 1517 | app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
1463 | | - node.ec2_instance.type |
1464 | | - ].pre_pull_images |
| 1518 | + updated_node.ec2_instance.type |
| 1519 | + ] |
| 1520 | + ) |
| 1521 | + desired_pre_pulled_images = list( |
| 1522 | + set( |
| 1523 | + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING |
| 1524 | + ) |
| 1525 | + | set(ec2_boot_specific.pre_pull_images), |
1465 | 1526 | ) |
1466 | 1527 | if pre_pulled_images != desired_pre_pulled_images: |
1467 | 1528 | _logger.info( |
1468 | 1529 | "%s needs to pre-pull images %s, currently has %s", |
1469 | | - node.ec2_instance.id, |
| 1530 | + updated_node.ec2_instance.id, |
1470 | 1531 | desired_pre_pulled_images, |
1471 | 1532 | pre_pulled_images, |
1472 | 1533 | ) |
1473 | | - hot_buffer_nodes_needing_pre_pull.append(node) |
| 1534 | + hot_buffer_nodes_needing_pre_pull.append(updated_node) |
1474 | 1535 |
|
1475 | 1536 | # now trigger pre-pull on these nodes |
1476 | 1537 | for node in hot_buffer_nodes_needing_pre_pull: |
1477 | | - _logger.info( |
1478 | | - "triggering pre-pull of images %s on %s of type %s", |
| 1538 | + ec2_boot_specific = ( |
1479 | 1539 | app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
1480 | 1540 | node.ec2_instance.type |
1481 | | - ].pre_pull_images, |
| 1541 | + ] |
| 1542 | + ) |
| 1543 | + desired_pre_pulled_images = list( |
| 1544 | + set( |
| 1545 | + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING |
| 1546 | + ) |
| 1547 | + | set(ec2_boot_specific.pre_pull_images), |
| 1548 | + ) |
| 1549 | + _logger.info( |
| 1550 | + "triggering pre-pull of images %s on %s of type %s", |
| 1551 | + desired_pre_pulled_images, |
1482 | 1552 | node.ec2_instance.id, |
1483 | 1553 | node.ec2_instance.type, |
1484 | 1554 | ) |
1485 | | - desired_pre_pulled_images = ( |
1486 | | - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ |
1487 | | - node.ec2_instance.type |
1488 | | - ].pre_pull_images |
1489 | | - ) |
1490 | 1555 | change_docker_compose_and_pull_command = " && ".join( |
1491 | 1556 | ( |
1492 | 1557 | utils_docker.write_compose_file_command(desired_pre_pulled_images), |
@@ -1535,8 +1600,8 @@ async def auto_scale_cluster( |
1535 | 1600 | app, cluster, auto_scaling_mode, allowed_instance_types |
1536 | 1601 | ) |
1537 | 1602 |
|
1538 | | - # ensure hot buffers have desired pre-pulled images |
1539 | | - |
| 1603 | + # take care of hot buffer pre-pulling |
| 1604 | + await _pre_pull_docker_images_on_idle_hot_buffers(app, cluster) |
1540 | 1605 | # notify |
1541 | 1606 | await _notify_machine_creation_progress(app, cluster) |
1542 | 1607 | await _notify_autoscaling_status(app, cluster, auto_scaling_mode) |
0 commit comments