Skip to content

Commit 8c72037

Browse files
committed
added ssm client, use ssm client, on the way to deploying stack via ssm
1 parent a2bc8ce commit 8c72037

File tree

9 files changed

+182
-22
lines changed

9 files changed

+182
-22
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from typing import Final
2+
3+
from aws_library.ec2._models import AWSTagKey
4+
from pydantic import parse_obj_as
5+
6+
DOCKER_STACK_DEPLOY_COMMAND_NAME: Final[str] = "private cluster docker deploy"
7+
DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
8+
AWSTagKey, "io.simcore.clusters-keeper.private_cluster_docker_deploy"
9+
)

services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from ..modules.ec2 import setup as setup_ec2
1919
from ..modules.rabbitmq import setup as setup_rabbitmq
2020
from ..modules.redis import setup as setup_redis
21+
from ..modules.ssm import setup as setup_ssm
2122
from ..rpc.rpc_routes import setup_rpc_routes
2223
from .settings import ApplicationSettings
2324

@@ -48,6 +49,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
4849
setup_rabbitmq(app)
4950
setup_rpc_routes(app)
5051
setup_ec2(app)
52+
setup_ssm(app)
5153
setup_redis(app)
5254
setup_clusters_management(app)
5355

services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from settings_library.ec2 import EC2Settings
2626
from settings_library.rabbit import RabbitSettings
2727
from settings_library.redis import RedisSettings
28+
from settings_library.ssm import SSMSettings
2829
from settings_library.utils_logging import MixinLoggingSettings
2930
from types_aiobotocore_ec2.literals import InstanceTypeType
3031

@@ -49,6 +50,19 @@ class Config(EC2Settings.Config):
4950
}
5051

5152

53+
class ClustersKeeperSSMSettings(SSMSettings):
54+
class Config(SSMSettings.Config):
55+
env_prefix = CLUSTERS_KEEPER_ENV_PREFIX
56+
prefixed_examples: ClassVar[list[dict[str, Any]]] = [
57+
{f"{CLUSTERS_KEEPER_ENV_PREFIX}{key}": var for key, var in example.items()}
58+
for example in SSMSettings.Config.schema_extra["examples"]
59+
]
60+
61+
schema_extra: ClassVar[dict[str, Any]] = {
62+
"examples": prefixed_examples,
63+
}
64+
65+
5266
class WorkersEC2InstancesSettings(BaseCustomSettings):
5367
WORKERS_EC2_INSTANCES_ALLOWED_TYPES: dict[str, EC2InstanceBootSpecific] = Field(
5468
...,
@@ -182,6 +196,12 @@ class PrimaryEC2InstancesSettings(BaseCustomSettings):
182196
"that take longer than this time will be terminated as sometimes it happens that EC2 machine fail on start.",
183197
)
184198

199+
PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL: str = Field(
200+
default="172.20.0.0/14",
201+
description="defines the docker swarm default address pool in CIDR format "
202+
"(see https://docs.docker.com/reference/cli/docker/swarm/init/)",
203+
)
204+
185205
@validator("PRIMARY_EC2_INSTANCES_ALLOWED_TYPES")
186206
@classmethod
187207
def check_valid_instance_names(
@@ -249,6 +269,10 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
249269
auto_default_from_env=True
250270
)
251271

272+
CLUSTERS_KEEPER_SSM_ACCESS: ClustersKeeperSSMSettings | None = Field(
273+
auto_default_from_env=True
274+
)
275+
252276
CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES: PrimaryEC2InstancesSettings | None = Field(
253277
auto_default_from_env=True
254278
)

services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,7 @@ async def create_cluster(
7272
tags=creation_ec2_tags(app_settings, user_id=user_id, wallet_id=wallet_id),
7373
startup_script=create_startup_script(
7474
app_settings,
75-
cluster_machines_name_prefix=get_cluster_name(
76-
app_settings, user_id=user_id, wallet_id=wallet_id, is_manager=False
77-
),
7875
ec2_boot_specific=ec2_instance_boot_specs,
79-
additional_custom_tags={
80-
AWSTagKey("user_id"): AWSTagValue(f"{user_id}"),
81-
AWSTagKey("wallet_id"): AWSTagValue(f"{wallet_id}"),
82-
AWSTagKey("role"): AWSTagValue("worker"),
83-
},
8476
),
8577
ami_id=ec2_instance_boot_specs.ami_id,
8678
key_name=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_KEY_NAME,

services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,31 @@
55

66
import arrow
77
from aws_library.ec2 import AWSTagKey, EC2InstanceData
8+
from aws_library.ec2._models import AWSTagValue
89
from fastapi import FastAPI
910
from models_library.users import UserID
1011
from models_library.wallets import WalletID
1112
from pydantic import parse_obj_as
1213
from servicelib.logging_utils import log_catch
14+
from servicelib.utils import limited_gather
1315

16+
from ..constants import (
17+
DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY,
18+
DOCKER_STACK_DEPLOY_COMMAND_NAME,
19+
)
1420
from ..core.settings import get_application_settings
1521
from ..modules.clusters import (
1622
delete_clusters,
1723
get_all_clusters,
1824
get_cluster_workers,
1925
set_instance_heartbeat,
2026
)
27+
from ..utils.clusters import create_deploy_cluster_stack_script
2128
from ..utils.dask import get_scheduler_auth, get_scheduler_url
22-
from ..utils.ec2 import HEARTBEAT_TAG_KEY
29+
from ..utils.ec2 import HEARTBEAT_TAG_KEY, get_cluster_name
2330
from .dask import is_scheduler_busy, ping_scheduler
31+
from .ec2 import get_ec2_client
32+
from .ssm import get_ssm_client
2433

2534
_logger = logging.getLogger(__name__)
2635

@@ -157,6 +166,62 @@ async def check_clusters(app: FastAPI) -> None:
157166
# we send a command that contain:
158167
# the docker-compose file in binary,
159168
# the call to init the docker swarm and the call to deploy the stack
169+
instances_in_need_of_deployment = {
170+
i
171+
for i in starting_instances - terminateable_instances
172+
if DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY not in i.tags
173+
}
174+
175+
if instances_in_need_of_deployment:
176+
app_settings = get_application_settings(app)
177+
ssm_client = get_ssm_client(app)
178+
ec2_client = get_ec2_client(app)
179+
instances_in_need_of_deployment_ssm_connection_state = await limited_gather(
180+
*[
181+
ssm_client.is_instance_connected_to_ssm_server(i.id)
182+
for i in instances_in_need_of_deployment
183+
],
184+
reraise=False,
185+
log=_logger,
186+
limit=20,
187+
)
188+
ec2_connected_to_ssm_server = [
189+
i
190+
for i, c in zip(
191+
instances_in_need_of_deployment,
192+
instances_in_need_of_deployment_ssm_connection_state,
193+
strict=True,
194+
)
195+
if c is True
196+
]
197+
started_instances_ready_for_command = ec2_connected_to_ssm_server
198+
if started_instances_ready_for_command:
199+
ssm_command = await ssm_client.send_command(
200+
[i.id for i in started_instances_ready_for_command],
201+
command=create_deploy_cluster_stack_script(
202+
app_settings,
203+
cluster_machines_name_prefix=get_cluster_name(
204+
app_settings,
205+
user_id=user_id,
206+
wallet_id=wallet_id,
207+
is_manager=False,
208+
),
209+
additional_custom_tags={
210+
AWSTagKey("user_id"): AWSTagValue(f"{user_id}"),
211+
AWSTagKey("wallet_id"): AWSTagValue(f"{wallet_id}"),
212+
AWSTagKey("role"): AWSTagValue("worker"),
213+
},
214+
),
215+
command_name=DOCKER_STACK_DEPLOY_COMMAND_NAME,
216+
)
217+
await ec2_client.set_instances_tags(
218+
started_instances_ready_for_command,
219+
tags={
220+
DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY: AWSTagValue(
221+
ssm_command.command_id
222+
),
223+
},
224+
)
160225

161226
# the remaining instances are broken (they were at some point connected but now not anymore)
162227
broken_instances = disconnected_instances - starting_instances

services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def setup(app: FastAPI):
4747
for s in [
4848
app_settings.CLUSTERS_KEEPER_EC2_ACCESS,
4949
app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES,
50+
app_settings.CLUSTERS_KEEPER_SSM_ACCESS,
5051
]
5152
):
5253
logger.warning(
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import logging
2+
from typing import cast
3+
4+
from aws_library.ssm import SimcoreSSMAPI
5+
from aws_library.ssm._errors import SSMNotConnectedError
6+
from fastapi import FastAPI
7+
from settings_library.ssm import SSMSettings
8+
from tenacity.asyncio import AsyncRetrying
9+
from tenacity.before_sleep import before_sleep_log
10+
from tenacity.stop import stop_after_delay
11+
from tenacity.wait import wait_random_exponential
12+
13+
from ..core.errors import ConfigurationError
14+
from ..core.settings import get_application_settings
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
def setup(app: FastAPI) -> None:
20+
async def on_startup() -> None:
21+
app.state.ssm_client = None
22+
settings: SSMSettings | None = get_application_settings(
23+
app
24+
).AUTOSCALING_SSM_ACCESS
25+
26+
if not settings:
27+
_logger.warning("SSM client is de-activated in the settings")
28+
return
29+
30+
app.state.ssm_client = client = await SimcoreSSMAPI.create(settings)
31+
32+
async for attempt in AsyncRetrying(
33+
reraise=True,
34+
stop=stop_after_delay(120),
35+
wait=wait_random_exponential(max=30),
36+
before_sleep=before_sleep_log(_logger, logging.WARNING),
37+
):
38+
with attempt:
39+
connected = await client.ping()
40+
if not connected:
41+
raise SSMNotConnectedError # pragma: no cover
42+
43+
async def on_shutdown() -> None:
44+
if app.state.ssm_client:
45+
await cast(SimcoreSSMAPI, app.state.ssm_client).close()
46+
47+
app.add_event_handler("startup", on_startup)
48+
app.add_event_handler("shutdown", on_shutdown)
49+
50+
51+
def get_ssm_client(app: FastAPI) -> SimcoreSSMAPI:
52+
if not app.state.ssm_client:
53+
raise ConfigurationError(
54+
msg="SSM client is not available. Please check the configuration."
55+
)
56+
return cast(SimcoreSSMAPI, app.state.ssm_client)

services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import arrow
99
import yaml
1010
from aws_library.ec2 import EC2InstanceBootSpecific, EC2InstanceData, EC2Tags
11+
from aws_library.ec2._models import CommandStr
1112
from fastapi.encoders import jsonable_encoder
1213
from models_library.api_schemas_clusters_keeper.clusters import (
1314
ClusterState,
@@ -107,47 +108,55 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str:
107108
def create_startup_script(
108109
app_settings: ApplicationSettings,
109110
*,
110-
cluster_machines_name_prefix: str,
111111
ec2_boot_specific: EC2InstanceBootSpecific,
112-
additional_custom_tags: EC2Tags,
113112
) -> str:
114113
assert app_settings.CLUSTERS_KEEPER_EC2_ACCESS # nosec
115114
assert app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES # nosec
116115

117-
environment_variables = _prepare_environment_variables(
118-
app_settings,
119-
cluster_machines_name_prefix=cluster_machines_name_prefix,
120-
additional_custom_tags=additional_custom_tags,
121-
)
122-
123116
startup_commands = ec2_boot_specific.custom_boot_scripts.copy()
117+
return "\n".join(startup_commands)
118+
119+
120+
def create_deploy_cluster_stack_script(
121+
app_settings: ApplicationSettings,
122+
*,
123+
cluster_machines_name_prefix: str,
124+
additional_custom_tags: EC2Tags,
125+
) -> str:
126+
deploy_script: list[CommandStr] = []
124127
assert app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES # nosec
125128
if isinstance(
126129
app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH,
127130
TLSAuthentication,
128131
):
129-
132+
# get the dask certificates
130133
download_certificates_commands = [
131134
f"mkdir --parents {_HOST_CERTIFICATES_BASE_PATH}",
132135
f'aws ssm get-parameter --name "{app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_CA}" --region us-east-1 --with-decryption --query "Parameter.Value" --output text > {_HOST_TLS_CA_FILE_PATH}',
133136
f'aws ssm get-parameter --name "{app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_CERT}" --region us-east-1 --with-decryption --query "Parameter.Value" --output text > {_HOST_TLS_CERT_FILE_PATH}',
134137
f'aws ssm get-parameter --name "{app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_KEY}" --region us-east-1 --with-decryption --query "Parameter.Value" --output text > {_HOST_TLS_KEY_FILE_PATH}',
135138
]
136-
startup_commands.extend(download_certificates_commands)
139+
deploy_script.extend(download_certificates_commands)
140+
141+
environment_variables = _prepare_environment_variables(
142+
app_settings,
143+
cluster_machines_name_prefix=cluster_machines_name_prefix,
144+
additional_custom_tags=additional_custom_tags,
145+
)
137146

138-
startup_commands.extend(
147+
deploy_script.extend(
139148
[
140149
# NOTE: https://stackoverflow.com/questions/41203492/solving-redis-warnings-on-overcommit-memory-and-transparent-huge-pages-for-ubunt
141150
"sysctl vm.overcommit_memory=1",
142151
f"echo '{_docker_compose_yml_base64_encoded()}' | base64 -d > {_HOST_DOCKER_COMPOSE_PATH}",
143152
f"echo '{_prometheus_yml_base64_encoded()}' | base64 -d > {_HOST_PROMETHEUS_PATH}",
144153
f"echo '{_prometheus_basic_auth_yml_base64_encoded(app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME, app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD.get_secret_value())}' | base64 -d > {_HOST_PROMETHEUS_WEB_PATH}",
145154
# NOTE: --default-addr-pool is necessary in order to prevent conflicts with AWS node IPs
146-
"docker swarm init --default-addr-pool 172.20.0.0/14",
155+
f"docker swarm init --default-addr-pool {app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL}",
147156
f"{' '.join(environment_variables)} docker stack deploy --with-registry-auth --compose-file={_HOST_DOCKER_COMPOSE_PATH} dask_stack",
148157
]
149158
)
150-
return "\n".join(startup_commands)
159+
return "\n".join(deploy_script)
151160

152161

153162
def _convert_ec2_state_to_cluster_state(

services/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ services:
198198
PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_KEY: ${PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_KEY}
199199
PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME: ${PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME}
200200
PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD: ${PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD}
201+
PRIMARY_EC2_INSTANCES_MAX_START_TIME: ${PRIMARY_EC2_INSTANCES_MAX_START_TIME}
202+
PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL: ${PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL}
201203
RABBIT_HOST: ${RABBIT_HOST}
202204
RABBIT_PASSWORD: ${RABBIT_PASSWORD}
203205
RABBIT_PORT: ${RABBIT_PORT}

0 commit comments

Comments
 (0)