Skip to content

Commit 36cf5bc

Browse files
sandereggmrnicegyu11
authored andcommitted
🎨Autoscaling monitor: allow SSH tunneling for DB access, and add return value for CLI usage (ITISFoundation#7329)
1 parent 59d895b commit 36cf5bc

File tree

7 files changed

+1582
-42
lines changed

7 files changed

+1582
-42
lines changed

‎scripts/maintenance/computational-clusters/autoscaled_monitor/cli.py‎

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import asyncio
2+
import os
23
from pathlib import Path
34
from typing import Annotated, Optional
45

56
import parse
67
import rich
78
import typer
9+
from ansible.inventory.manager import InventoryManager
10+
from ansible.parsing.dataloader import DataLoader
811
from dotenv import dotenv_values
912

1013
from . import core as api
@@ -17,7 +20,7 @@
1720
wallet_id_spec,
1821
)
1922
from .ec2 import autoscaling_ec2_client, cluster_keeper_ec2_client
20-
from .models import AppState
23+
from .models import AppState, BastionHost
2124

2225
state: AppState = AppState(
2326
dynamic_parser=parse.compile(DEFAULT_DYNAMIC_EC2_FORMAT),
@@ -32,33 +35,57 @@
3235
app = typer.Typer()
3336

3437

35-
def _parse_environment(deploy_config: Path) -> dict[str, str | None]:
38+
def _parse_repo_config(deploy_config: Path) -> dict[str, str | None]:
3639
repo_config = deploy_config / "repo.config"
3740
if not repo_config.exists():
3841
rich.print(
39-
f"[red]{repo_config} does not exist! Please run OPS code to generate it[/red]"
42+
f"[red]{repo_config} does not exist! Please run `make repo.config` in {deploy_config} to generate it[/red]"
4043
)
41-
raise typer.Exit(1)
44+
raise typer.Exit(os.EX_DATAERR)
4245

4346
environment = dotenv_values(repo_config)
4447

4548
assert environment
4649
return environment
4750

4851

52+
def _parse_inventory(deploy_config: Path) -> BastionHost:
53+
inventory_path = deploy_config / "ansible" / "inventory.ini"
54+
if not inventory_path.exists():
55+
rich.print(
56+
f"[red]{inventory_path} does not exist! Please run `make inventory` in {deploy_config} to generate it[/red]"
57+
)
58+
raise typer.Exit(os.EX_DATAERR)
59+
60+
loader = DataLoader()
61+
inventory = InventoryManager(loader=loader, sources=[f"{inventory_path}"])
62+
63+
try:
64+
return BastionHost(
65+
ip=inventory.groups["CAULDRON_UNIX"].get_vars()["bastion_ip"],
66+
user_name=inventory.groups["CAULDRON_UNIX"].get_vars()["bastion_user"],
67+
)
68+
except KeyError as err:
69+
rich.print(
70+
f"[red]{inventory_path} invalid! Unable to find bastion_ip in the inventory file. TIP: Please run `make inventory` in {deploy_config} to generate it[/red]"
71+
)
72+
raise typer.Exit(os.EX_DATAERR) from err
73+
74+
4975
@app.callback()
5076
def main(
5177
deploy_config: Annotated[
5278
Path, typer.Option(help="path to the deploy configuration")
53-
]
79+
],
5480
):
5581
"""Manages external clusters"""
5682

5783
state.deploy_config = deploy_config.expanduser()
5884
assert (
5985
deploy_config.is_dir()
6086
), "deploy-config argument is not pointing to a directory!"
61-
state.environment = _parse_environment(deploy_config)
87+
state.environment = _parse_repo_config(deploy_config)
88+
state.main_bastion_host = _parse_inventory(deploy_config)
6289

6390
# connect to ec2s
6491
state.ec2_resource_autoscaling = autoscaling_ec2_client(state)
@@ -113,7 +140,8 @@ def summary(
113140
114141
"""
115142

116-
asyncio.run(api.summary(state, user_id or None, wallet_id or None))
143+
if not asyncio.run(api.summary(state, user_id or None, wallet_id or None)):
144+
raise typer.Exit(1)
117145

118146

119147
@app.command()
@@ -157,5 +185,11 @@ def trigger_cluster_termination(
157185
asyncio.run(api.trigger_cluster_termination(state, user_id, wallet_id))
158186

159187

188+
@app.command()
189+
def check_database_connection() -> None:
190+
"""this will check the connection to simcore database is ready"""
191+
asyncio.run(api.check_database_connection(state))
192+
193+
160194
if __name__ == "__main__":
161195
app()

‎scripts/maintenance/computational-clusters/autoscaled_monitor/core.py‎

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def _print_dynamic_instances(
142142
f"Up: {utils.timedelta_formatting(time_now - instance.ec2_instance.launch_time, color_code=True)}",
143143
f"ExtIP: {instance.ec2_instance.public_ip_address}",
144144
f"IntIP: {instance.ec2_instance.private_ip_address}",
145-
f"/mnt/docker(free): {utils.color_encode_with_threshold(instance.disk_space.human_readable(), instance.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
145+
f"/mnt/docker(free): {utils.color_encode_with_threshold(instance.disk_space.human_readable(), instance.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
146146
]
147147
),
148148
service_table,
@@ -190,7 +190,7 @@ def _print_computational_clusters(
190190
f"UserID: {cluster.primary.user_id}",
191191
f"WalletID: {cluster.primary.wallet_id}",
192192
f"Heartbeat: {utils.timedelta_formatting(time_now - cluster.primary.last_heartbeat) if cluster.primary.last_heartbeat else 'n/a'}",
193-
f"/mnt/docker(free): {utils.color_encode_with_threshold(cluster.primary.disk_space.human_readable(), cluster.primary.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
193+
f"/mnt/docker(free): {utils.color_encode_with_threshold(cluster.primary.disk_space.human_readable(), cluster.primary.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
194194
]
195195
),
196196
"\n".join(
@@ -223,7 +223,7 @@ def _print_computational_clusters(
223223
table.add_row(
224224
"\n".join(
225225
[
226-
f"[italic]{utils.color_encode_with_state(f'Worker {index+1}', worker.ec2_instance)}[/italic]",
226+
f"[italic]{utils.color_encode_with_state(f'Worker {index + 1}', worker.ec2_instance)}[/italic]",
227227
f"Name: {worker.name}",
228228
f"ID: {worker.ec2_instance.id}",
229229
f"AMI: {worker.ec2_instance.image_id}",
@@ -232,7 +232,7 @@ def _print_computational_clusters(
232232
f"ExtIP: {worker.ec2_instance.public_ip_address}",
233233
f"IntIP: {worker.ec2_instance.private_ip_address}",
234234
f"DaskWorkerIP: {worker.dask_ip}",
235-
f"/mnt/docker(free): {utils.color_encode_with_threshold(worker.disk_space.human_readable(), worker.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
235+
f"/mnt/docker(free): {utils.color_encode_with_threshold(worker.disk_space.human_readable(), worker.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
236236
"",
237237
]
238238
),
@@ -301,7 +301,6 @@ async def _analyze_computational_instances(
301301
computational_instances: list[ComputationalInstance],
302302
ssh_key_path: Path | None,
303303
) -> list[ComputationalCluster]:
304-
305304
all_disk_spaces = [UNDEFINED_BYTESIZE] * len(computational_instances)
306305
if ssh_key_path is not None:
307306
all_disk_spaces = await asyncio.gather(
@@ -414,7 +413,7 @@ async def _parse_dynamic_instances(
414413
return dynamic_instances
415414

416415

417-
async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -> None:
416+
async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -> bool:
418417
# get all the running instances
419418
assert state.ec2_resource_autoscaling
420419
dynamic_instances = await ec2.list_dynamic_instances_from_ec2(
@@ -429,6 +428,14 @@ async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -
429428
state.ec2_resource_autoscaling.meta.client.meta.region_name,
430429
)
431430

431+
time_threshold = arrow.utcnow().shift(minutes=-30).datetime
432+
433+
dynamic_services_in_error = any(
434+
service.needs_manual_intervention and service.created_at < time_threshold
435+
for instance in dynamic_autoscaled_instances
436+
for service in instance.running_services
437+
)
438+
432439
assert state.ec2_resource_clusters_keeper
433440
computational_instances = await ec2.list_computational_instances_from_ec2(
434441
state, user_id, wallet_id
@@ -442,6 +449,8 @@ async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -
442449
state.ec2_resource_clusters_keeper.meta.client.meta.region_name,
443450
)
444451

452+
return not dynamic_services_in_error
453+
445454

446455
def _print_computational_tasks(
447456
user_id: int,
@@ -638,3 +647,7 @@ async def trigger_cluster_termination(
638647
)
639648
else:
640649
rich.print("not deleting anything")
650+
651+
652+
async def check_database_connection(state: AppState) -> None:
653+
await db.check_db_connection(state)

‎scripts/maintenance/computational-clusters/autoscaled_monitor/db.py‎

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import contextlib
23
import uuid
34
from collections.abc import AsyncGenerator
@@ -9,37 +10,59 @@
910
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
1011

1112
from .models import AppState, ComputationalTask, PostgresDB
13+
from .ssh import ssh_tunnel
1214

1315

1416
@contextlib.asynccontextmanager
15-
async def db_engine(state: AppState) -> AsyncGenerator[AsyncEngine, Any]:
16-
engine = None
17-
try:
18-
for env in [
19-
"POSTGRES_USER",
20-
"POSTGRES_PASSWORD",
21-
"POSTGRES_ENDPOINT",
22-
"POSTGRES_DB",
23-
]:
24-
assert state.environment[env]
25-
postgres_db = PostgresDB(
26-
dsn=TypeAdapter(PostgresDsn).validate_python(
27-
f"postgresql+asyncpg://{state.environment['POSTGRES_USER']}:{state.environment['POSTGRES_PASSWORD']}@{state.environment['POSTGRES_ENDPOINT']}/{state.environment['POSTGRES_DB']}"
17+
async def db_engine(
18+
state: AppState,
19+
) -> AsyncGenerator[AsyncEngine, Any]:
20+
async with contextlib.AsyncExitStack() as stack:
21+
assert state.environment["POSTGRES_ENDPOINT"] # nosec
22+
db_endpoint = state.environment["POSTGRES_ENDPOINT"]
23+
if state.main_bastion_host:
24+
assert state.ssh_key_path # nosec
25+
db_host, db_port = db_endpoint.split(":")
26+
tunnel = stack.enter_context(
27+
ssh_tunnel(
28+
ssh_host=state.main_bastion_host.ip,
29+
username=state.main_bastion_host.user_name,
30+
private_key_path=state.ssh_key_path,
31+
remote_bind_host=db_host,
32+
remote_bind_port=int(db_port),
33+
)
34+
)
35+
assert tunnel
36+
db_endpoint = (
37+
f"{tunnel.local_bind_address[0]}:{tunnel.local_bind_address[1]}"
2838
)
29-
)
3039

31-
engine = create_async_engine(
32-
f"{postgres_db.dsn}",
33-
connect_args={
34-
"server_settings": {
35-
"application_name": "osparc-clusters-monitoring-script"
36-
}
37-
},
38-
)
39-
yield engine
40-
finally:
41-
if engine:
42-
await engine.dispose()
40+
engine = None
41+
try:
42+
for env in [
43+
"POSTGRES_USER",
44+
"POSTGRES_PASSWORD",
45+
"POSTGRES_DB",
46+
]:
47+
assert state.environment[env]
48+
postgres_db = PostgresDB(
49+
dsn=TypeAdapter(PostgresDsn).validate_python(
50+
f"postgresql+asyncpg://{state.environment['POSTGRES_USER']}:{state.environment['POSTGRES_PASSWORD']}@{db_endpoint}/{state.environment['POSTGRES_DB']}"
51+
)
52+
)
53+
54+
engine = create_async_engine(
55+
f"{postgres_db.dsn}",
56+
connect_args={
57+
"server_settings": {
58+
"application_name": "osparc-clusters-monitoring-script"
59+
}
60+
},
61+
)
62+
yield engine
63+
finally:
64+
if engine:
65+
await engine.dispose()
4366

4467

4568
async def abort_job_in_db(
@@ -57,6 +80,23 @@ async def abort_job_in_db(
5780
rich.print(f"set comp_tasks for {project_id=}/{node_id=} set to ABORTED")
5881

5982

83+
async def check_db_connection(state: AppState) -> bool:
84+
try:
85+
async with contextlib.AsyncExitStack() as stack:
86+
engine = await stack.enter_async_context(db_engine(state))
87+
async with asyncio.timeout(5):
88+
db_connection = await stack.enter_async_context(engine.connect())
89+
result = await db_connection.execute(sa.text("SELECT 1"))
90+
result.one()
91+
rich.print(
92+
"[green]Database connection test completed successfully![/green]"
93+
)
94+
return True
95+
except Exception as e: # pylint: disable=broad-exception-caught
96+
rich.print(f"[red]Database connection test failed: {e}[/red]")
97+
return False
98+
99+
60100
async def list_computational_tasks_from_db(
61101
state: AppState, user_id: int
62102
) -> list[ComputationalTask]:

‎scripts/maintenance/computational-clusters/autoscaled_monitor/models.py‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
from pydantic import BaseModel, ByteSize, PostgresDsn
1313

1414

15+
@dataclass(kw_only=True, frozen=True, slots=True)
16+
class BastionHost:
17+
ip: str
18+
user_name: str
19+
20+
1521
@dataclass(kw_only=True)
1622
class AppState:
1723
environment: dict[str, str | None] = field(default_factory=dict)
@@ -22,6 +28,7 @@ class AppState:
2228
computational_parser_workers: parse.Parser
2329
deploy_config: Path | None = None
2430
ssh_key_path: Path | None = None
31+
main_bastion_host: BastionHost | None = None
2532

2633
computational_bastion: Instance | None = None
2734
dynamic_bastion: Instance | None = None

‎scripts/maintenance/computational-clusters/pyproject.toml‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ dependencies = [
66
"black",
77
"boto3",
88
# NOTE: these must be in sync with ospar
9-
"cloudpickle",
10-
"dask[distributed]",
9+
"cloudpickle==3.1.0",
10+
"dask[distributed]==2024.12.0",
1111
"mypy_boto3_ec2",
1212
"types-boto3",
1313
"parse",
@@ -19,6 +19,7 @@ dependencies = [
1919
"rich",
2020
"sqlalchemy[asyncio]",
2121
"sshtunnel",
22+
"ansible>=10.7.0",
2223
]
2324
name = "autoscaled-monitor"
2425
version = "1.0.0"

0 commit comments

Comments
 (0)