diff --git a/packages/aws-library/src/aws_library/ec2/_models.py b/packages/aws-library/src/aws_library/ec2/_models.py index b136fb503007..e08e207b0b0e 100644 --- a/packages/aws-library/src/aws_library/ec2/_models.py +++ b/packages/aws-library/src/aws_library/ec2/_models.py @@ -157,16 +157,9 @@ class EC2InstanceBootSpecific(BaseModel): list[DockerGenericTag], Field( default_factory=list, - description="a list of docker image/tags to pull on instance cold start", + description="a list of docker image/tags to pull on the instance", ), ] = DEFAULT_FACTORY - pre_pull_images_cron_interval: Annotated[ - datetime.timedelta, - Field( - description="time interval between pulls of images (minimum is 1 minute) " - "(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", - ), - ] = datetime.timedelta(minutes=30) buffer_count: Annotated[ NonNegativeInt, Field(description="number of buffer EC2s to keep (defaults to 0)"), @@ -180,7 +173,9 @@ def validate_bash_calls(cls, v): temp_file.writelines(v) temp_file.flush() # NOTE: this will not capture runtime errors, but at least some syntax errors such as invalid quotes - sh.bash("-n", temp_file.name) # pyright: ignore[reportCallIssue] # sh is untyped, but this call is safe for bash syntax checking + sh.bash( + "-n", temp_file.name + ) # pyright: ignore[reportCallIssue] # sh is untyped, but this call is safe for bash syntax checking except sh.ErrorReturnCode as exc: msg = f"Invalid bash call in custom_boot_scripts: {v}, Error: {exc.stderr}" raise ValueError(msg) from exc @@ -231,7 +226,7 @@ def _update_json_schema_extra(schema: JsonDict) -> None: "simcore/services/dynamic/another-nice-one:2.4.5", "asd", ], - "pre_pull_images_cron_interval": "01:00:00", + "pre_pull_images_cron_interval": "01:00:00", # retired but kept for tests }, { # AMI + pre-pull + buffer count diff --git a/packages/aws-library/src/aws_library/ssm/_client.py b/packages/aws-library/src/aws_library/ssm/_client.py index 2b51a93b82a2..137dbdec9372 100644 --- a/packages/aws-library/src/aws_library/ssm/_client.py +++ b/packages/aws-library/src/aws_library/ssm/_client.py @@ -107,7 +107,6 @@ async def send_command( @log_decorator(_logger, logging.DEBUG) @ssm_exception_handler(_logger) async def get_command(self, instance_id: str, *, command_id: str) -> SSMCommand: - response = await self._client.get_command_invocation( CommandId=command_id, InstanceId=instance_id ) @@ -130,6 +129,13 @@ async def get_command(self, instance_id: str, *, command_id: str) -> SSMCommand: ), ) + @log_decorator(_logger, logging.DEBUG) + @ssm_exception_handler(_logger) + async def cancel_command(self, instance_id: str, *, command_id: str) -> None: + await self._client.cancel_command( + CommandId=command_id, InstanceIds=[instance_id] + ) + @log_decorator(_logger, logging.DEBUG) @ssm_exception_handler(_logger) async def is_instance_connected_to_ssm_server(self, instance_id: str) -> bool: diff --git a/packages/aws-library/tests/test_ec2_client.py b/packages/aws-library/tests/test_ec2_client.py index 76cdf62e0994..63e8437b960a 100644 --- a/packages/aws-library/tests/test_ec2_client.py +++ b/packages/aws-library/tests/test_ec2_client.py @@ -604,7 +604,11 @@ async def test_set_instance_tags( # now remove some real ones tag_key_to_remove = random.choice(list(new_tags)) # noqa: S311 await simcore_ec2_api.remove_instances_tags( - created_instances, tag_keys=[tag_key_to_remove] + created_instances, + tag_keys=[ + tag_key_to_remove, + TypeAdapter(AWSTagKey).validate_python("whatever_i_dont_exist"), + ], ) new_tags.pop(tag_key_to_remove) await _assert_instances_in_ec2( diff --git a/packages/aws-library/tests/test_ssm_client.py b/packages/aws-library/tests/test_ssm_client.py index 4c3a6c0c772a..f915dee4b404 100644 --- a/packages/aws-library/tests/test_ssm_client.py +++ b/packages/aws-library/tests/test_ssm_client.py @@ -37,8 +37,7 @@ async def simcore_ssm_api( await ec2.close() -async def test_ssm_client_lifespan(simcore_ssm_api: SimcoreSSMAPI): - ... +async def test_ssm_client_lifespan(simcore_ssm_api: SimcoreSSMAPI): ... async def test_aiobotocore_ssm_client_when_ssm_server_goes_up_and_down( @@ -125,6 +124,30 @@ async def test_send_command( ) +async def test_cancel_command( + mocked_aws_server: ThreadedMotoServer, + simcore_ssm_api: SimcoreSSMAPI, + faker: Faker, +): + command_name = faker.word() + target_instance_id = faker.pystr() + sent_command = await simcore_ssm_api.send_command( + instance_ids=[target_instance_id], + command=faker.text(), + command_name=command_name, + ) + assert sent_command + assert sent_command.command_id + assert sent_command.name == command_name + assert sent_command.instance_ids == [target_instance_id] + assert sent_command.status == "Success" + + # cancelling a finished command is a no-op but is a bit of a joke as moto does not implement cancel command yet + await simcore_ssm_api.cancel_command( + instance_id=target_instance_id, command_id=sent_command.command_id + ) + + async def test_is_instance_connected_to_ssm_server( mocked_aws_server: ThreadedMotoServer, simcore_ssm_api: SimcoreSSMAPI, diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py index 5f16fefc801a..7ad6965078c9 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py @@ -1,7 +1,7 @@ import base64 from collections.abc import Sequence -from common_library.json_serialization import json_dumps +from common_library.json_serialization import json_loads from models_library.docker import DockerGenericTag from types_aiobotocore_ec2 import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType @@ -46,6 +46,7 @@ async def assert_autoscaled_dynamic_ec2_instances( expected_instance_type: InstanceTypeType, expected_instance_state: InstanceStateNameType, expected_additional_tag_keys: list[str], + expected_pre_pulled_images: list[DockerGenericTag] | None = None, instance_filters: Sequence[FilterTypeDef] | None, expected_user_data: list[str] | None = None, check_reservation_index: int | None = None, @@ -64,6 +65,7 @@ async def assert_autoscaled_dynamic_ec2_instances( *expected_additional_tag_keys, ], expected_user_data=expected_user_data, + expected_pre_pulled_images=expected_pre_pulled_images, instance_filters=instance_filters, check_reservation_index=check_reservation_index, ) @@ -142,10 +144,9 @@ def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool: iter(filter(_by_pre_pull_image, instance["Tags"])) ) assert "Value" in instance_pre_pulled_images_aws_tag - assert ( - instance_pre_pulled_images_aws_tag["Value"] - == f"{json_dumps(expected_pre_pulled_images)}" - ) + assert sorted( + json_loads(instance_pre_pulled_images_aws_tag["Value"]) + ) == sorted(expected_pre_pulled_images) assert "PrivateDnsName" in instance instance_private_dns_name = instance["PrivateDnsName"] diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/moto.py b/packages/pytest-simcore/src/pytest_simcore/helpers/moto.py index 65c589ba1b91..ab2152aa097c 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/moto.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/moto.py @@ -55,6 +55,16 @@ def _patch_describe_instance_information( return {"InstanceInformationList": [{"PingStatus": "Online"}]} +def _patch_cancel_command(self, operation_name, api_params) -> dict[str, Any]: + warnings.warn( + "moto is missing the cancel_command function, therefore it is manually mocked." + "TIP: periodically check if it gets updated https://docs.getmoto.org/en/latest/docs/services/ssm.html#ssm", + UserWarning, + stacklevel=1, + ) + return {} + + # Mocked aiobotocore _make_api_call function async def patched_aiobotocore_make_api_call(self, operation_name, api_params): # For example for the Access Analyzer service @@ -63,6 +73,8 @@ async def patched_aiobotocore_make_api_call(self, operation_name, api_params): # Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 if operation_name == "SendCommand": return await _patch_send_command(self, operation_name, api_params) + if operation_name == "CancelCommand": + return _patch_cancel_command(self, operation_name, api_params) if operation_name == "DescribeInstanceInformation": return _patch_describe_instance_information(self, operation_name, api_params) diff --git a/services/autoscaling/src/simcore_service_autoscaling/constants.py b/services/autoscaling/src/simcore_service_autoscaling/constants.py index 55fe8468bf1b..0feee4532a33 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/constants.py +++ b/services/autoscaling/src/simcore_service_autoscaling/constants.py @@ -1,13 +1,14 @@ import re +from pathlib import Path from typing import Final from aws_library.ec2._models import AWSTagKey, AWSTagValue, EC2Tags from pydantic import TypeAdapter -BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = TypeAdapter( - AWSTagKey -).validate_python("pulling") -BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = TypeAdapter( +MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = TypeAdapter(AWSTagKey).validate_python( + "pulling" +) +MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = TypeAdapter( AWSTagKey ).validate_python("ssm-command-id") PREPULL_COMMAND_NAME: Final[str] = "docker images pulling" @@ -17,10 +18,14 @@ AWSTagKey ).validate_python("io.simcore.autoscaling.joined_command_sent") +DOCKER_COMPOSE_CMD: Final[str] = "docker compose" +PRE_PULL_COMPOSE_PATH: Final[Path] = Path("/docker-pull.compose.yml") +DOCKER_COMPOSE_PULL_SCRIPT_PATH: Final[Path] = Path("/docker-pull-script.sh") -DOCKER_PULL_COMMAND: Final[ - str -] = "docker compose -f /docker-pull.compose.yml -p buffering pull" + +DOCKER_PULL_COMMAND: Final[str] = ( + f"{DOCKER_COMPOSE_CMD} -f {PRE_PULL_COMPOSE_PATH} -p buffering pull" +) PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = TypeAdapter( AWSTagKey diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/application.py b/services/autoscaling/src/simcore_service_autoscaling/core/application.py index 27ec57257d6f..c63515f1aaee 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/application.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/application.py @@ -1,5 +1,6 @@ import logging +from common_library.json_serialization import json_dumps from fastapi import FastAPI from servicelib.fastapi.tracing import ( initialize_fastapi_app_tracing, @@ -32,7 +33,7 @@ from ..modules.ssm import setup as setup_ssm from .settings import ApplicationSettings -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) def create_app(settings: ApplicationSettings, tracing_config: TracingConfig) -> FastAPI: @@ -49,6 +50,10 @@ def create_app(settings: ApplicationSettings, tracing_config: TracingConfig) -> app.state.settings = settings app.state.tracing_config = tracing_config assert app.state.settings.API_VERSION == API_VERSION # nosec + _logger.info( + "Application settings: %s", + json_dumps(settings, indent=2, sort_keys=True), + ) # PLUGINS SETUP if tracing_config.tracing_enabled: diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index c240dcc68c32..0ae53b943954 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -3,11 +3,12 @@ from typing import Annotated, Final, Self, cast from aws_library.ec2 import EC2InstanceBootSpecific, EC2Tags +from common_library.basic_types import DEFAULT_FACTORY from common_library.logging.logging_utils_filtering import LoggerName, MessageSubstring from fastapi import FastAPI from models_library.basic_types import LogLevel, PortInt, VersionTag from models_library.clusters import ClusterAuthentication -from models_library.docker import DockerLabelKey +from models_library.docker import DockerGenericTag, DockerLabelKey from pydantic import ( AliasChoices, AnyUrl, @@ -63,6 +64,14 @@ class EC2InstancesSettings(BaseCustomSettings): ), ] + EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: Annotated[ + list[DockerGenericTag], + Field( + description="List of docker images to pre-pull on cold started new EC2 instances", + default_factory=list, + ), + ] = DEFAULT_FACTORY + EC2_INSTANCES_KEY_NAME: Annotated[ str, Field( diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index a7a027a89452..ff4b0ad4f5b6 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -20,6 +20,9 @@ EC2InsufficientCapacityError, EC2TooManyInstancesError, ) +from aws_library.ec2._models import AWSTagKey +from aws_library.ssm._errors import SSMAccessError +from common_library.logging.logging_errors import create_troubleshooting_log_kwargs from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Node from models_library.rabbitmq_messages import ProgressType @@ -28,7 +31,14 @@ from servicelib.utils_formatting import timedelta_as_minute_second from types_aiobotocore_ec2.literals import InstanceTypeType -from ...constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME +from ...constants import ( + DOCKER_JOIN_COMMAND_EC2_TAG_KEY, + DOCKER_JOIN_COMMAND_NAME, + DOCKER_PULL_COMMAND, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + MACHINE_PULLING_EC2_TAG_KEY, + PREPULL_COMMAND_NAME, +) from ...core.errors import ( Ec2InvalidDnsNameError, TaskBestFittingInstanceNotFoundError, @@ -56,9 +66,12 @@ post_tasks_progress_message, ) from ...utils.warm_buffer_machines import ( + dump_pre_pulled_images_as_tags, get_activated_warm_buffer_ec2_tags, get_deactivated_warm_buffer_ec2_tags, is_warm_buffer_machine, + list_pre_pulled_images_tag_keys, + load_pre_pulled_images_from_tags, ) from ..docker import get_docker_client from ..ec2 import get_ec2_client @@ -383,6 +396,38 @@ async def _activate_and_notify( return dataclasses.replace(drained_node, node=updated_node) +async def _cancel_previous_pulling_command_if_any( + app: FastAPI, + instance: EC2InstanceData, +) -> None: + if not ( + (MACHINE_PULLING_EC2_TAG_KEY in instance.tags) + and (MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY in instance.tags) + ): + # nothing to do + return + + ssm_client = get_ssm_client(app) + ec2_client = get_ec2_client(app) + command_id = instance.tags[MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY] + command = await ssm_client.get_command(instance.id, command_id=command_id) + if command.status in ("Pending", "InProgress"): + with log_context( + _logger, + logging.INFO, + msg=f"cancelling previous pulling {command_id} on {instance.id}", + ): + await ssm_client.cancel_command(instance.id, command_id=command_id) + await ec2_client.remove_instances_tags( + [instance], + tag_keys=[ + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + MACHINE_PULLING_EC2_TAG_KEY, + *list_pre_pulled_images_tag_keys(instance.tags), + ], + ) + + async def _activate_drained_nodes( app: FastAPI, cluster: Cluster, @@ -403,6 +448,12 @@ async def _activate_drained_nodes( logging.INFO, f"activate {len(nodes_to_activate)} drained nodes {[n.ec2_instance.id for n in nodes_to_activate]}", ): + await asyncio.gather( + *( + _cancel_previous_pulling_command_if_any(app, n.ec2_instance) + for n in nodes_to_activate + ) + ) activated_nodes = await asyncio.gather( *(_activate_and_notify(app, node) for node in nodes_to_activate) ) @@ -1348,6 +1399,174 @@ async def _notify_autoscaling_status( get_instrumentation(app).cluster_metrics.update_from_cluster(cluster) +async def _handle_pre_pull_status( + app: FastAPI, node: AssociatedInstance +) -> AssociatedInstance: + if MACHINE_PULLING_EC2_TAG_KEY not in node.ec2_instance.tags: + return node + + ssm_client = get_ssm_client(app) + ec2_client = get_ec2_client(app) + ssm_command_id = node.ec2_instance.tags.get(MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY) + + async def _remove_tags_and_return( + node: AssociatedInstance, tag_keys: list[AWSTagKey] + ) -> AssociatedInstance: + await ec2_client.remove_instances_tags( + [node.ec2_instance], + tag_keys=tag_keys, + ) + for tag_key in tag_keys: + node.ec2_instance.tags.pop(tag_key, None) + return node + + if not ssm_command_id: + _logger.error( + "%s has '%s' tag key set but no associated command id '%s' tag key, " + "this is unexpected but will be cleaned now. Pre-pulling will be retried again later.", + node.ec2_instance.id, + MACHINE_PULLING_EC2_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + ) + return await _remove_tags_and_return( + node, + [ + MACHINE_PULLING_EC2_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), + ], + ) + + try: + ssm_command = await ssm_client.get_command( + node.ec2_instance.id, command_id=ssm_command_id + ) + except SSMAccessError as exc: + _logger.exception( + **create_troubleshooting_log_kwargs( + f"Unexpected SSM access error to get status of command {ssm_command_id} on {node.ec2_instance.id}", + error=exc, + tip="Pre-pulling will be retried again later.", + ) + ) + return await _remove_tags_and_return( + node, + [ + MACHINE_PULLING_EC2_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), + ], + ) + match ssm_command.status: + case "Success": + _logger.info("%s finished pre-pulling images", node.ec2_instance.id) + return await _remove_tags_and_return( + node, + [ + MACHINE_PULLING_EC2_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + ], + ) + case "Failed" | "TimedOut": + _logger.error( + "%s failed pre-pulling images, status is %s. this will be retried later", + node.ec2_instance.id, + ssm_command.status, + ) + return await _remove_tags_and_return( + node, + [ + MACHINE_PULLING_EC2_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + *list_pre_pulled_images_tag_keys(node.ec2_instance.tags), + ], + ) + case _: + _logger.info( + "%s is pre-pulling %s, status is %s", + node.ec2_instance.id, + load_pre_pulled_images_from_tags(node.ec2_instance.tags), + ssm_command.status, + ) + # skip the instance this time as this is still ongoing + return node + + +async def _pre_pull_docker_images_on_idle_hot_buffers( + app: FastAPI, cluster: Cluster +) -> None: + if not cluster.hot_buffer_drained_nodes: + return + ssm_client = get_ssm_client(app) + ec2_client = get_ec2_client(app) + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + # check if we have hot buffers that need to pull images + hot_buffer_nodes_needing_pre_pull = [] + for node in cluster.hot_buffer_drained_nodes: + updated_node = await _handle_pre_pull_status(app, node) + if MACHINE_PULLING_EC2_TAG_KEY in updated_node.ec2_instance.tags: + continue # skip this one as it is still pre-pulling + + # check what they have + pre_pulled_images = load_pre_pulled_images_from_tags( + updated_node.ec2_instance.tags + ) + ec2_boot_specific = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + updated_node.ec2_instance.type + ] + ) + desired_pre_pulled_images = utils_docker.compute_full_list_of_pre_pulled_images( + ec2_boot_specific, app_settings + ) + + if pre_pulled_images != desired_pre_pulled_images: + _logger.info( + "%s needs to pre-pull images %s, currently has %s", + updated_node.ec2_instance.id, + desired_pre_pulled_images, + pre_pulled_images, + ) + hot_buffer_nodes_needing_pre_pull.append(updated_node) + + # now trigger pre-pull on these nodes + for node in hot_buffer_nodes_needing_pre_pull: + ec2_boot_specific = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + node.ec2_instance.type + ] + ) + desired_pre_pulled_images = utils_docker.compute_full_list_of_pre_pulled_images( + ec2_boot_specific, app_settings + ) + _logger.info( + "triggering pre-pull of images %s on %s of type %s", + desired_pre_pulled_images, + node.ec2_instance.id, + node.ec2_instance.type, + ) + change_docker_compose_and_pull_command = " && ".join( + ( + utils_docker.write_compose_file_command(desired_pre_pulled_images), + DOCKER_PULL_COMMAND, + ) + ) + ssm_command = await ssm_client.send_command( + (node.ec2_instance.id,), + command=change_docker_compose_and_pull_command, + command_name=PREPULL_COMMAND_NAME, + ) + await ec2_client.set_instances_tags( + (node.ec2_instance,), + tags={ + MACHINE_PULLING_EC2_TAG_KEY: "true", + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: ssm_command.command_id, + } + | dump_pre_pulled_images_as_tags(desired_pre_pulled_images), + ) + + async def auto_scale_cluster( *, app: FastAPI, auto_scaling_mode: AutoscalingProvider ) -> None: @@ -1375,6 +1594,8 @@ async def auto_scale_cluster( app, cluster, auto_scaling_mode, allowed_instance_types ) + # take care of hot buffer pre-pulling + await _pre_pull_docker_images_on_idle_hot_buffers(app, cluster) # notify await _notify_machine_creation_progress(app, cluster) await _notify_autoscaling_status(app, cluster, auto_scaling_mode) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py index 7dcaa48dd61d..f881b9a140ac 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_warm_buffer_machines_pool_core.py @@ -35,13 +35,14 @@ from types_aiobotocore_ec2.literals import InstanceTypeType from ...constants import ( - BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, - BUFFER_MACHINE_PULLING_EC2_TAG_KEY, DOCKER_PULL_COMMAND, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + MACHINE_PULLING_EC2_TAG_KEY, PREPULL_COMMAND_NAME, ) from ...core.settings import get_application_settings from ...models import WarmBufferPool, WarmBufferPoolManager +from ...utils import utils_docker from ...utils.warm_buffer_machines import ( dump_pre_pulled_images_as_tags, ec2_warm_buffer_startup_script, @@ -76,10 +77,13 @@ def _handle_completed_cloud_init_instance( assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec _record_instance_ready_metrics(app, instance=instance) - - if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ - instance.type - ].pre_pull_images: + pre_pull_images = utils_docker.compute_full_list_of_pre_pulled_images( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + instance.type + ], + app_settings, + ) + if pre_pull_images: buffer_pool.waiting_to_pull_instances.add(instance) else: buffer_pool.waiting_to_stop_instances.add(instance) @@ -137,7 +141,7 @@ async def _analyze_running_instance_state( """Analyze and categorize running instance based on its current state.""" ssm_client = get_ssm_client(app) - if BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags: + if MACHINE_PULLING_EC2_TAG_KEY in instance.tags: buffer_pool.pulling_instances.add(instance) elif await ssm_client.is_instance_connected_to_ssm_server(instance.id): await _handle_ssm_connected_instance( @@ -230,14 +234,18 @@ async def _terminate_instances_with_invalid_pre_pulled_images( instance_type ].pre_pulled_instances() + desired_pre_pull_images = utils_docker.compute_full_list_of_pre_pulled_images( + ec2_boot_config, app_settings + ) + for instance in all_pre_pulled_instances: pre_pulled_images = load_pre_pulled_images_from_tags(instance.tags) if ( pre_pulled_images is not None - ) and pre_pulled_images != ec2_boot_config.pre_pull_images: + ) and pre_pulled_images != desired_pre_pull_images: _logger.info( "%s", - f"{instance.id=} has invalid {pre_pulled_images=}, expected is {ec2_boot_config.pre_pull_images=}", + f"{instance.id=} has invalid {pre_pulled_images=}, expected is {desired_pre_pull_images=}", ) terminateable_instances.add(instance) @@ -344,8 +352,8 @@ async def _handle_pool_image_pulling( await ec2_client.set_instances_tags( tuple(pool.waiting_to_pull_instances), tags={ - BUFFER_MACHINE_PULLING_EC2_TAG_KEY: "true", - BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: ssm_command.command_id, + MACHINE_PULLING_EC2_TAG_KEY: "true", + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: ssm_command.command_id, }, ) @@ -353,9 +361,7 @@ async def _handle_pool_image_pulling( broken_instances_to_terminate: set[EC2InstanceData] = set() # wait for the image pulling to complete for instance in pool.pulling_instances: - if ssm_command_id := instance.tags.get( - BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY - ): + if ssm_command_id := instance.tags.get(MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY): ssm_command = await ssm_client.get_command( instance.id, command_id=ssm_command_id ) @@ -389,9 +395,12 @@ async def _handle_pool_image_pulling( await ec2_client.set_instances_tags( tuple(instances_to_stop), tags=dump_pre_pulled_images_as_tags( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ - instance_type - ].pre_pull_images + utils_docker.compute_full_list_of_pre_pulled_images( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + instance_type + ], + app_settings, + ) ), ) return instances_to_stop, broken_instances_to_terminate @@ -418,8 +427,8 @@ async def _handle_image_pre_pulling( "pending buffer instances completed pulling of images, stopping them", ): tag_keys_to_remove = ( - BUFFER_MACHINE_PULLING_EC2_TAG_KEY, - BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + MACHINE_PULLING_EC2_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, ) await ec2_client.remove_instances_tags( tuple(instances_to_stop), diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py index ae78873cb55e..13c25dcd2112 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py @@ -63,25 +63,18 @@ async def ec2_startup_script( join_as_drained=app_settings.AUTOSCALING_DOCKER_JOIN_DRAINED ) ) - if app_settings.AUTOSCALING_REGISTRY: # noqa: SIM102 + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + if app_settings.AUTOSCALING_REGISTRY: + startup_commands.append( + utils_docker.get_docker_login_on_start_bash_command( + app_settings.AUTOSCALING_REGISTRY + ) + ) + if pull_image_cmd := utils_docker.get_docker_pull_images_on_start_bash_command( - ec2_boot_specific.pre_pull_images + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING ): - startup_commands.append( - " && ".join( - [ - utils_docker.get_docker_login_on_start_bash_command( - app_settings.AUTOSCALING_REGISTRY - ), - pull_image_cmd, - ] - ) - ) - startup_commands.append( - utils_docker.get_docker_pull_images_crontab( - ec2_boot_specific.pre_pull_images_cron_interval - ), - ) + startup_commands.append(pull_image_cmd) return " && ".join(startup_commands) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 90c0c3807519..a48951986763 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -8,12 +8,12 @@ import re from contextlib import suppress from copy import deepcopy -from pathlib import Path from typing import Final, cast import arrow import yaml from aws_library.ec2 import EC2InstanceData, Resources +from aws_library.ec2._models import EC2InstanceBootSpecific from models_library.docker import ( DockerGenericTag, DockerLabelKey, @@ -36,6 +36,11 @@ from settings_library.docker_registry import RegistrySettings from types_aiobotocore_ec2.literals import InstanceTypeType +from ..constants import ( + DOCKER_COMPOSE_CMD, + DOCKER_COMPOSE_PULL_SCRIPT_PATH, + PRE_PULL_COMPOSE_PATH, +) from ..core.settings import ApplicationSettings from ..models import AssociatedInstance from ..modules.docker import AutoscalingDocker @@ -443,12 +448,6 @@ def get_docker_login_on_start_bash_command(registry_settings: RegistrySettings) ) -_DOCKER_COMPOSE_CMD: Final[str] = "docker compose" -_PRE_PULL_COMPOSE_PATH: Final[Path] = Path("/docker-pull.compose.yml") -_DOCKER_COMPOSE_PULL_SCRIPT_PATH: Final[Path] = Path("/docker-pull-script.sh") -_CRONJOB_LOGS_PATH: Final[Path] = Path("/var/log/docker-pull-cronjob.log") - - def write_compose_file_command( docker_tags: list[DockerGenericTag], ) -> str: @@ -459,7 +458,7 @@ def write_compose_file_command( }, } compose_yaml = yaml.safe_dump(compose) - return " ".join(["echo", f'"{compose_yaml}"', ">", f"{_PRE_PULL_COMPOSE_PATH}"]) + return " ".join(["echo", f'"{compose_yaml}"', ">", f"{PRE_PULL_COMPOSE_PATH}"]) def get_docker_pull_images_on_start_bash_command( @@ -471,15 +470,15 @@ def get_docker_pull_images_on_start_bash_command( write_docker_compose_pull_script_cmd = " ".join( [ "echo", - f'"#!/bin/sh\necho Pulling started at \\$(date)\n{_DOCKER_COMPOSE_CMD} --project-name=autoscaleprepull --file={_PRE_PULL_COMPOSE_PATH} pull --ignore-pull-failures"', + f'"#!/bin/sh\necho Pulling started at \\$(date)\n{DOCKER_COMPOSE_CMD} --project-name=autoscaleprepull --file={PRE_PULL_COMPOSE_PATH} pull --ignore-pull-failures"', ">", - f"{_DOCKER_COMPOSE_PULL_SCRIPT_PATH}", + f"{DOCKER_COMPOSE_PULL_SCRIPT_PATH}", ] ) make_docker_compose_script_executable = " ".join( - ["chmod", "+x", f"{_DOCKER_COMPOSE_PULL_SCRIPT_PATH}"] + ["chmod", "+x", f"{DOCKER_COMPOSE_PULL_SCRIPT_PATH}"] ) - docker_compose_pull_cmd = " ".join([f".{_DOCKER_COMPOSE_PULL_SCRIPT_PATH}"]) + docker_compose_pull_cmd = " ".join([f".{DOCKER_COMPOSE_PULL_SCRIPT_PATH}"]) return " && ".join( [ write_compose_file_command(docker_tags), @@ -490,23 +489,6 @@ def get_docker_pull_images_on_start_bash_command( ) -def get_docker_pull_images_crontab(interval: datetime.timedelta) -> str: - # check the interval is within 1 < 60 minutes - checked_interval = round(interval.total_seconds() / 60) - - crontab_entry = " ".join( - [ - "echo", - f'"*/{checked_interval or 1} * * * * root', - f"{_DOCKER_COMPOSE_PULL_SCRIPT_PATH}", - f'>> {_CRONJOB_LOGS_PATH} 2>&1"', - ">>", - "/etc/crontab", - ] - ) - return " && ".join([crontab_entry]) - - async def find_node_with_name( docker_client: AutoscalingDocker, name: str ) -> Node | None: @@ -722,3 +704,13 @@ async def attach_node( def is_node_ready(node: Node) -> bool: assert node.status # nosec return bool(node.status.state is NodeState.ready) + + +def compute_full_list_of_pre_pulled_images( + ec2_boot_specific: EC2InstanceBootSpecific, app_settings: ApplicationSettings +) -> list[DockerGenericTag]: + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + common_images = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING + ) + return sorted(set(common_images) | set(ec2_boot_specific.pre_pull_images)) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py b/services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py index 3e331bc2f97a..6d047398a5aa 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/warm_buffer_machines.py @@ -68,8 +68,10 @@ def load_pre_pulled_images_from_tags(tags: EC2Tags) -> list[DockerGenericTag]: # AWS Tag values are limited to 256 characters so we chunk the images if PRE_PULLED_IMAGES_EC2_TAG_KEY in tags: # read directly - return TypeAdapter(list[DockerGenericTag]).validate_json( - tags[PRE_PULLED_IMAGES_EC2_TAG_KEY] + return sorted( + TypeAdapter(list[DockerGenericTag]).validate_json( + tags[PRE_PULLED_IMAGES_EC2_TAG_KEY] + ) ) assembled_json = "".join( @@ -86,24 +88,35 @@ def load_pre_pulled_images_from_tags(tags: EC2Tags) -> list[DockerGenericTag]: ) ) if assembled_json: - return TypeAdapter(list[DockerGenericTag]).validate_json(assembled_json) + return sorted(TypeAdapter(list[DockerGenericTag]).validate_json(assembled_json)) return [] +def list_pre_pulled_images_tag_keys(tags: EC2Tags) -> list[AWSTagKey]: + return [ + TypeAdapter(AWSTagKey).validate_python(key) + for key in tags + if PRE_PULLED_IMAGES_EC2_TAG_KEY in key + ] + + def ec2_warm_buffer_startup_script( ec2_boot_specific: EC2InstanceBootSpecific, app_settings: ApplicationSettings ) -> str: startup_commands = ec2_boot_specific.custom_boot_scripts.copy() - if ec2_boot_specific.pre_pull_images: + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + desired_pre_pull_images = utils_docker.compute_full_list_of_pre_pulled_images( + ec2_boot_specific, app_settings + ) + if desired_pre_pull_images: assert app_settings.AUTOSCALING_REGISTRY # nosec + startup_commands.extend( ( utils_docker.get_docker_login_on_start_bash_command( app_settings.AUTOSCALING_REGISTRY ), - utils_docker.write_compose_file_command( - ec2_boot_specific.pre_pull_images - ), + utils_docker.write_compose_file_command(desired_pre_pull_images), ) ) return " && ".join(startup_commands) diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 0b4482dd066b..57c9b381fc2d 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -1034,6 +1034,38 @@ def hot_buffer_instance_type(app_settings: ApplicationSettings) -> InstanceTypeT ) +@pytest.fixture +def hot_buffer_has_pre_pull( + app_settings: ApplicationSettings, + hot_buffer_instance_type: InstanceTypeType, +) -> bool: + assert app_settings.AUTOSCALING_EC2_INSTANCES + return bool( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING + or app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + hot_buffer_instance_type + ].pre_pull_images + ) + + +@pytest.fixture +def hot_buffer_expected_pre_pulled_images( + app_settings: ApplicationSettings, + hot_buffer_instance_type: InstanceTypeType, +) -> list[DockerGenericTag]: + assert app_settings.AUTOSCALING_EC2_INSTANCES + return sorted( + set( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING + ) + | set( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + hot_buffer_instance_type + ].pre_pull_images + ) + ) + + @pytest.fixture def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]: return mocker.patch( @@ -1243,3 +1275,24 @@ async def _do( return instance_ids return _do + + +@pytest.fixture +def with_ec2_instances_cold_start_docker_images_pre_pulling( + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker +) -> EnvVarsDict: + images = TypeAdapter(list[DockerGenericTag]).validate_python( + [ + "nginx:latest", + "itisfoundation/my-very-nice-service-in-common:latest", + "simcore/services/dynamic/another-nice-one:2.4.5161", + "asd", + ] + ) + envs = setenvs_from_dict( + monkeypatch, + { + "EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING": json.dumps(images), + }, + ) + return app_environment | envs diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py index 40b3f6b3b90c..8ba17f3f34ff 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py @@ -54,7 +54,12 @@ ) from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict -from simcore_service_autoscaling.constants import BUFFER_MACHINE_TAG_KEY +from simcore_service_autoscaling.constants import ( + BUFFER_MACHINE_TAG_KEY, + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + MACHINE_PULLING_EC2_TAG_KEY, + PRE_PULLED_IMAGES_EC2_TAG_KEY, +) from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import AssociatedInstance, Cluster from simcore_service_autoscaling.modules.cluster_scaling._auto_scaling_core import ( @@ -205,6 +210,7 @@ def minimal_configuration( mocked_ssm_server_envs: EnvVarsDict, enabled_dynamic_mode: EnvVarsDict, mocked_ec2_instances_envs: EnvVarsDict, + with_ec2_instances_cold_start_docker_images_pre_pulling: EnvVarsDict, disabled_rabbitmq: None, disable_autoscaling_background_task: None, disable_buffers_pool_background_task: None, @@ -364,6 +370,9 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ec2_client: EC2Client, ec2_instance_custom_tags: dict[str, str], instance_type_filters: Sequence[FilterTypeDef], + hot_buffer_instance_type: InstanceTypeType, + hot_buffer_has_pre_pull: bool, + hot_buffer_expected_pre_pulled_images: list[DockerGenericTag], ): assert app_settings.AUTOSCALING_EC2_INSTANCES await auto_scale_cluster( @@ -373,12 +382,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ec2_client, expected_num_reservations=1, expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, - expected_instance_type=cast( - InstanceTypeType, - next( - iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) - ), - ), + expected_instance_type=hot_buffer_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), instance_filters=instance_type_filters, @@ -391,6 +395,16 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ) mock_rabbitmq_post_message.reset_mock() # calling again should attach the new nodes to the reserve, but nothing should start + # it will also trigger pre-pulling of images if there is pre-pulling + expected_pre_pull_tag_keys = ( + [ + MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY, + MACHINE_PULLING_EC2_TAG_KEY, + PRE_PULLED_IMAGES_EC2_TAG_KEY, + ] + if hot_buffer_has_pre_pull + else [] + ) await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) @@ -398,14 +412,12 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ec2_client, expected_num_reservations=1, expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, - expected_instance_type=cast( - InstanceTypeType, - next( - iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) - ), - ), + expected_instance_type=hot_buffer_instance_type, expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=list( + ec2_instance_custom_tags.keys() | expected_pre_pull_tag_keys + ), + expected_pre_pulled_images=hot_buffer_expected_pre_pulled_images or None, instance_filters=instance_type_filters, ) assert fake_node.description @@ -428,7 +440,10 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect }, ) - # calling it again should not create anything new + # calling it again should not create anything new, pre-pulling should be done + expected_pre_pull_tag_keys = ( + [PRE_PULLED_IMAGES_EC2_TAG_KEY] if hot_buffer_has_pre_pull else [] + ) for _ in range(10): await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() @@ -437,14 +452,12 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ec2_client, expected_num_reservations=1, expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, - expected_instance_type=cast( - InstanceTypeType, - next( - iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) - ), - ), + expected_instance_type=hot_buffer_instance_type, expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=list( + ec2_instance_custom_tags.keys() | expected_pre_pull_tag_keys + ), + expected_pre_pulled_images=hot_buffer_expected_pre_pulled_images or None, instance_filters=instance_type_filters, ) @@ -2101,6 +2114,8 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 ], create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], hot_buffer_instance_type: InstanceTypeType, + hot_buffer_has_pre_pull: bool, + hot_buffer_expected_pre_pulled_images: list[DockerGenericTag], spied_cluster_analysis: MockType, instance_type_filters: Sequence[FilterTypeDef], stopped_instance_type_filters: Sequence[FilterTypeDef], @@ -2138,7 +2153,7 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 expected_additional_tag_keys=list(ec2_instance_custom_tags), instance_filters=instance_type_filters, ) - # this brings a new analysis + # this brings a new analysis and will start pre-pulling images await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscalingProvider() ) @@ -2181,14 +2196,21 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 return_value=fake_hot_buffer_nodes, ) + # there we are done pre-pulling images await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + expected_pre_pull_tag_keys = ( + [PRE_PULLED_IMAGES_EC2_TAG_KEY] if hot_buffer_has_pre_pull else [] + ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, expected_num_instances=num_hot_buffer, expected_instance_type=hot_buffer_instance_type, expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=list( + ec2_instance_custom_tags.keys() | expected_pre_pull_tag_keys + ), + expected_pre_pulled_images=hot_buffer_expected_pre_pulled_images or None, instance_filters=instance_type_filters, ) spied_cluster = assert_cluster_state( @@ -2223,7 +2245,10 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 expected_num_instances=num_hot_buffer, expected_instance_type=hot_buffer_instance_type, expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=list( + ec2_instance_custom_tags.keys() | expected_pre_pull_tag_keys + ), + expected_pre_pulled_images=hot_buffer_expected_pre_pulled_images or None, instance_filters=instance_type_filters, ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( @@ -2269,7 +2294,10 @@ async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7 expected_num_instances=num_hot_buffer, expected_instance_type=hot_buffer_instance_type, expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=list( + ec2_instance_custom_tags.keys() | expected_pre_pull_tag_keys + ), + expected_pre_pulled_images=hot_buffer_expected_pre_pulled_images or None, instance_filters=instance_type_filters, ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py index c1b2cface44c..d8817de9150a 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_warm_buffer_machine_core.py @@ -314,6 +314,7 @@ class _BufferMachineParams: ], ) async def test_monitor_buffer_machines_terminates_supernumerary_instances( + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, fake_pre_pull_images: list[DockerGenericTag], ec2_client: EC2Client, diff --git a/services/autoscaling/tests/unit/test_utils_cluster_scaling.py b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py index 7afa42f770d3..1c325c1f6234 100644 --- a/services/autoscaling/tests/unit/test_utils_cluster_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py @@ -4,7 +4,6 @@ # pylint: disable=unused-variable # pylint: disable=too-many-arguments -import datetime import json import re from collections.abc import Callable @@ -145,11 +144,8 @@ async def test_ec2_startup_script_just_ami( ) ) assert not instance_boot_specific.pre_pull_images - assert instance_boot_specific.pre_pull_images_cron_interval == datetime.timedelta( - minutes=30 - ) startup_script = await ec2_startup_script(instance_boot_specific, app_settings) - assert len(startup_script.split("&&")) == 1 + assert len(startup_script.split("&&")) == 2 assert re.fullmatch( r"^docker swarm join --availability=drain --token .*$", startup_script ) @@ -207,6 +203,7 @@ def disabled_registry(monkeypatch: pytest.MonkeyPatch) -> None: async def test_ec2_startup_script_with_pre_pulling( minimal_configuration: None, ec2_instances_boot_ami_pre_pull: EnvVarsDict, + with_ec2_instances_cold_start_docker_images_pre_pulling: EnvVarsDict, app_settings: ApplicationSettings, ): assert app_settings.AUTOSCALING_EC2_INSTANCES @@ -216,11 +213,10 @@ async def test_ec2_startup_script_with_pre_pulling( ) ) assert instance_boot_specific.pre_pull_images - assert instance_boot_specific.pre_pull_images_cron_interval startup_script = await ec2_startup_script(instance_boot_specific, app_settings) - assert len(startup_script.split("&&")) == 7 + assert len(startup_script.split("&&")) == 6 assert re.fullmatch( - r"^(docker swarm join [^&&]+) && (echo [^\s]+ \| docker login [^&&]+) && (echo [^&&]+) && (echo [^&&]+) && (chmod \+x [^&&]+) && (./docker-pull-script.sh) && (echo .+)$", + r"^(docker swarm join [^&&]+) && (echo [^\s]+ \| docker login [^&&]+) && (echo [^&&]+) && (echo [^&&]+) && (chmod \+x [^&&]+) && (./docker-pull-script.sh)$", startup_script, ), f"{startup_script=}" @@ -238,9 +234,8 @@ async def test_ec2_startup_script_with_custom_scripts( ) ) assert not instance_boot_specific.pre_pull_images - assert instance_boot_specific.pre_pull_images_cron_interval startup_script = await ec2_startup_script(instance_boot_specific, app_settings) - assert len(startup_script.split("&&")) == 1 + len( + assert len(startup_script.split("&&")) == 2 + len( ec2_instances_boot_ami_scripts ) assert re.fullmatch( @@ -262,7 +257,6 @@ async def test_ec2_startup_script_with_pre_pulling_but_no_registry( ) ) assert instance_boot_specific.pre_pull_images - assert instance_boot_specific.pre_pull_images_cron_interval startup_script = await ec2_startup_script(instance_boot_specific, app_settings) assert len(startup_script.split("&&")) == 1 assert re.fullmatch( @@ -299,6 +293,7 @@ def test_sort_drained_nodes( create_fake_node: Callable[..., DockerNode], create_associated_instance: Callable[..., AssociatedInstance], ): + assert app_settings.AUTOSCALING_EC2_INSTANCES machine_buffer_type = get_hot_buffer_type(random_fake_available_instances) _NUM_DRAINED_NODES = 20 _NUM_NODE_WITH_TYPE_BUFFER = ( diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index 34b64d6861a2..12571b31d08d 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -50,11 +50,11 @@ attach_node, compute_cluster_total_resources, compute_cluster_used_resources, + compute_full_list_of_pre_pulled_images, compute_node_used_resources, compute_tasks_needed_resources, find_node_with_name, get_docker_login_on_start_bash_command, - get_docker_pull_images_crontab, get_docker_pull_images_on_start_bash_command, get_docker_swarm_join_bash_command, get_max_resources_from_docker_task, @@ -1078,34 +1078,6 @@ def test_get_docker_pull_images_on_start_bash_command( assert get_docker_pull_images_on_start_bash_command(images) == expected_cmd -@pytest.mark.parametrize( - "interval, expected_cmd", - [ - ( - datetime.timedelta(minutes=20), - 'echo "*/20 * * * * root /docker-pull-script.sh >> /var/log/docker-pull-cronjob.log 2>&1" >> /etc/crontab', - ), - ( - datetime.timedelta(seconds=20), - 'echo "*/1 * * * * root /docker-pull-script.sh >> /var/log/docker-pull-cronjob.log 2>&1" >> /etc/crontab', - ), - ( - datetime.timedelta(seconds=200), - 'echo "*/3 * * * * root /docker-pull-script.sh >> /var/log/docker-pull-cronjob.log 2>&1" >> /etc/crontab', - ), - ( - datetime.timedelta(days=3), - 'echo "*/4320 * * * * root /docker-pull-script.sh >> /var/log/docker-pull-cronjob.log 2>&1" >> /etc/crontab', - ), - ], - ids=str, -) -def test_get_docker_pull_images_crontab( - interval: datetime.timedelta, expected_cmd: str -): - assert get_docker_pull_images_crontab(interval) == expected_cmd - - def test_is_node_ready_and_available(create_fake_node: Callable[..., Node]): # check not ready state return false for node_status in [ @@ -1313,3 +1285,37 @@ async def test_attach_node( assert is_node_ready_and_available(host_node, availability=Availability.active) # but not osparc ready assert not is_node_osparc_ready(updated_node) + + +def test_compute_full_list_of_pre_pulled_images( + disabled_rabbitmq: None, + disabled_ec2: None, + disabled_ssm: None, + mocked_redis_server: None, + enabled_dynamic_mode: EnvVarsDict, + disable_autoscaling_background_task: None, + with_ec2_instances_cold_start_docker_images_pre_pulling: EnvVarsDict, + app_settings: ApplicationSettings, +): + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING + ), "this test requires some common docker images" + + for ( + instance_type, + instance_boot_specific, + ) in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.items(): + returned_list = compute_full_list_of_pre_pulled_images( + instance_boot_specific, app_settings + ) + assert ( + sorted(returned_list) == returned_list + ), f"the list for {instance_type} should be sorted" + assert len(returned_list) == len( + set(returned_list) + ), f"the list for {instance_type} should not have duplicates" + assert all( + i in returned_list + for i in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING + ) diff --git a/services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py b/services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py index f9cee912bcf6..c416b954fe2d 100644 --- a/services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py +++ b/services/autoscaling/tests/unit/test_utils_warm_buffer_machines.py @@ -25,6 +25,7 @@ get_activated_warm_buffer_ec2_tags, get_deactivated_warm_buffer_ec2_tags, is_warm_buffer_machine, + list_pre_pulled_images_tag_keys, load_pre_pulled_images_from_tags, ) @@ -169,8 +170,58 @@ def test_dump_load_pre_pulled_images_as_tags( images: list[DockerGenericTag], expected_tags: EC2Tags ): assert dump_pre_pulled_images_as_tags(images) == expected_tags - assert load_pre_pulled_images_from_tags(expected_tags) == images + assert load_pre_pulled_images_from_tags(expected_tags) == sorted(images) def test_load_pre_pulled_images_as_tags_no_tag_present_returns_empty_list(faker: Faker): assert load_pre_pulled_images_from_tags(faker.pydict(allowed_types=(str,))) == [] + + +@pytest.mark.parametrize( + "images, expected_tags", + [ + pytest.param( + [ + "itisfoundation/dynamic-sidecar:latest", + "itisfoundation/agent:latest", + "registry.pytest.com/simcore/services/dynamic/ti-postpro:2.0.34", + "registry.pytest.com/simcore/services/dynamic/ti-simu:1.0.12", + "registry.pytest.com/simcore/services/dynamic/ti-pers:1.0.19", + "registry.pytest.com/simcore/services/dynamic/sim4life-postpro:2.0.106", + "registry.pytest.com/simcore/services/dynamic/s4l-core-postpro:2.0.106", + "registry.pytest.com/simcore/services/dynamic/s4l-core-stream:2.0.106", + "registry.pytest.com/simcore/services/dynamic/sym-server-8-0-0-dy:2.0.106", + "registry.pytest.com/simcore/services/dynamic/sim4life-8-0-0-modeling:3.2.34", + "registry.pytest.com/simcore/services/dynamic/s4l-core-8-0-0-modeling:3.2.34", + "registry.pytest.com/simcore/services/dynamic/s4l-stream-8-0-0-dy:3.2.34", + "registry.pytest.com/simcore/services/dynamic/sym-server-8-0-0-dy:3.2.34", + ], + { + f"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_0": '["itisfoundation/dynamic-sidecar:latest","itisfoundation/agent:latest","registry.pytest.com/simcore/services/dynamic/ti-postpro:2.0.34","registry.pytest.com/simcore/services/dynamic/ti-simu:1.0.12","registry.pytest.com/simcore/services/dynamic/ti-pers:1.0.', + f"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_1": '19","registry.pytest.com/simcore/services/dynamic/sim4life-postpro:2.0.106","registry.pytest.com/simcore/services/dynamic/s4l-core-postpro:2.0.106","registry.pytest.com/simcore/services/dynamic/s4l-core-stream:2.0.106","registry.pytest.com/simcore/services', + f"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_2": '/dynamic/sym-server-8-0-0-dy:2.0.106","registry.pytest.com/simcore/services/dynamic/sim4life-8-0-0-modeling:3.2.34","registry.pytest.com/simcore/services/dynamic/s4l-core-8-0-0-modeling:3.2.34","registry.pytest.com/simcore/services/dynamic/s4l-stream-8-0-0', + f"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_3": '-dy:3.2.34","registry.pytest.com/simcore/services/dynamic/sym-server-8-0-0-dy:3.2.34"]', + }, + id="many images that get chunked to AWS Tag max length", + ), + pytest.param( + ["itisfoundation/dynamic-sidecar:latest", "itisfoundation/agent:latest"], + { + PRE_PULLED_IMAGES_EC2_TAG_KEY: '["itisfoundation/dynamic-sidecar:latest","itisfoundation/agent:latest"]' + }, + id="<256 characters jsonized number of images does not get chunked", + ), + pytest.param( + [], + {PRE_PULLED_IMAGES_EC2_TAG_KEY: "[]"}, + id="empty list", + ), + ], +) +def test_list_pre_pulled_images_tag_keys( + images: list[DockerGenericTag], expected_tags: EC2Tags +): + assert dump_pre_pulled_images_as_tags(images) == expected_tags + assert list_pre_pulled_images_tag_keys(expected_tags) == sorted( + expected_tags.keys() + ) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py index 463ac51189e8..b56e3d7593b5 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py @@ -13,6 +13,7 @@ VersionTag, ) from models_library.clusters import ClusterAuthentication +from models_library.docker import DockerGenericTag from pydantic import ( AliasChoices, Field, @@ -82,6 +83,13 @@ class WorkersEC2InstancesSettings(BaseCustomSettings): description="Defines which EC2 instances are considered as candidates for new EC2 instance and their respective boot specific parameters", ), ] + WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: Annotated[ + list[DockerGenericTag], + Field( + description="List of docker images to pre-pull on cold started new EC2 instances", + default_factory=list, + ), + ] = DEFAULT_FACTORY WORKERS_EC2_INSTANCES_KEY_NAME: Annotated[ str, @@ -170,7 +178,6 @@ class PrimaryEC2InstancesSettings(BaseCustomSettings): description="Defines which EC2 instances are considered as candidates for new EC2 instance and their respective boot specific parameters", ), ] - PRIMARY_EC2_INSTANCES_MAX_INSTANCES: Annotated[ int, Field( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index 761d3029c454..dc44dd9ece75 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -105,6 +105,7 @@ services: DASK_MONITORING_URL: tls://dask-scheduler:8786 DASK_SCHEDULER_AUTH: '{"type":"tls","tls_ca_file":"${DASK_TLS_CA_FILE}","tls_client_cert":"${DASK_TLS_CERT}","tls_client_key":"${DASK_TLS_KEY}"}' EC2_INSTANCES_ALLOWED_TYPES: ${WORKERS_EC2_INSTANCES_ALLOWED_TYPES} + EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: ${WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING} EC2_INSTANCES_CUSTOM_TAGS: ${WORKERS_EC2_INSTANCES_CUSTOM_TAGS} EC2_INSTANCES_ATTACHED_IAM_PROFILE: "" EC2_INSTANCES_KEY_NAME: ${WORKERS_EC2_INSTANCES_KEY_NAME} diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 653cb6f0ecdc..229f60648029 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -100,6 +100,7 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str: f"EC2_INSTANCES_NAME_PREFIX={cluster_machines_name_prefix}", f"LOG_LEVEL={app_settings.log_level}", f"WORKERS_EC2_INSTANCES_ALLOWED_TYPES={_convert_to_env_dict(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_ALLOWED_TYPES)}", + f"WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING={_convert_to_env_list(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING)}", f"WORKERS_EC2_INSTANCES_CUSTOM_TAGS={_convert_to_env_dict(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_CUSTOM_TAGS | additional_custom_tags)}", f"WORKERS_EC2_INSTANCES_KEY_NAME={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_KEY_NAME}", f"WORKERS_EC2_INSTANCES_MAX_INSTANCES={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_MAX_INSTANCES}", diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 6d8e1c40c3ce..c326ac596bd4 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -203,17 +203,18 @@ services: AUTOSCALING_EC2_INSTANCES: ${AUTOSCALING_EC2_INSTANCES} # used to enable/disable EC2_INSTANCES_ALLOWED_TYPES: ${EC2_INSTANCES_ALLOWED_TYPES} + EC2_INSTANCES_ATTACHED_IAM_PROFILE: ${EC2_INSTANCES_ATTACHED_IAM_PROFILE} + EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: ${EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING} + EC2_INSTANCES_CUSTOM_TAGS: ${EC2_INSTANCES_CUSTOM_TAGS} + EC2_INSTANCES_KEY_NAME: ${EC2_INSTANCES_KEY_NAME} EC2_INSTANCES_MACHINES_BUFFER: ${EC2_INSTANCES_MACHINES_BUFFER} EC2_INSTANCES_MAX_INSTANCES: ${EC2_INSTANCES_MAX_INSTANCES} EC2_INSTANCES_MAX_START_TIME: ${EC2_INSTANCES_MAX_START_TIME} EC2_INSTANCES_NAME_PREFIX: ${EC2_INSTANCES_NAME_PREFIX} EC2_INSTANCES_SECURITY_GROUP_IDS: ${EC2_INSTANCES_SECURITY_GROUP_IDS} EC2_INSTANCES_SUBNET_IDS: ${EC2_INSTANCES_SUBNET_IDS} - EC2_INSTANCES_KEY_NAME: ${EC2_INSTANCES_KEY_NAME} EC2_INSTANCES_TIME_BEFORE_DRAINING: ${EC2_INSTANCES_TIME_BEFORE_DRAINING} EC2_INSTANCES_TIME_BEFORE_TERMINATION: ${EC2_INSTANCES_TIME_BEFORE_TERMINATION} - EC2_INSTANCES_CUSTOM_TAGS: ${EC2_INSTANCES_CUSTOM_TAGS} - EC2_INSTANCES_ATTACHED_IAM_PROFILE: ${EC2_INSTANCES_ATTACHED_IAM_PROFILE} AUTOSCALING_NODES_MONITORING: ${AUTOSCALING_NODES_MONITORING} # dyn autoscaling envvar NODES_MONITORING_NODE_LABELS: ${NODES_MONITORING_NODE_LABELS} @@ -327,6 +328,7 @@ services: WORKERS_EC2_INSTANCES_MAX_START_TIME: ${WORKERS_EC2_INSTANCES_MAX_START_TIME} WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS: ${WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS} WORKERS_EC2_INSTANCES_SUBNET_IDS: ${WORKERS_EC2_INSTANCES_SUBNET_IDS} + WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: ${WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING} WORKERS_EC2_INSTANCES_CUSTOM_TAGS: ${WORKERS_EC2_INSTANCES_CUSTOM_TAGS} CLUSTERS_KEEPER_TRACING: ${CLUSTERS_KEEPER_TRACING} secrets: *dask_tls_secrets