Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import os
from pathlib import Path
from typing import Annotated, Optional

import parse
import rich
import typer
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from dotenv import dotenv_values

from . import core as api
Expand All @@ -17,7 +20,7 @@
wallet_id_spec,
)
from .ec2 import autoscaling_ec2_client, cluster_keeper_ec2_client
from .models import AppState
from .models import AppState, BastionHost

state: AppState = AppState(
dynamic_parser=parse.compile(DEFAULT_DYNAMIC_EC2_FORMAT),
Expand All @@ -32,33 +35,57 @@
app = typer.Typer()


def _parse_environment(deploy_config: Path) -> dict[str, str | None]:
def _parse_repo_config(deploy_config: Path) -> dict[str, str | None]:
repo_config = deploy_config / "repo.config"
if not repo_config.exists():
rich.print(
f"[red]{repo_config} does not exist! Please run OPS code to generate it[/red]"
f"[red]{repo_config} does not exist! Please run `make repo.config` in {deploy_config} to generate it[/red]"
)
raise typer.Exit(1)
raise typer.Exit(os.EX_DATAERR)

environment = dotenv_values(repo_config)

assert environment
return environment


def _parse_inventory(deploy_config: Path) -> BastionHost:
inventory_path = deploy_config / "ansible" / "inventory.ini"
if not inventory_path.exists():
rich.print(
f"[red]{inventory_path} does not exist! Please run `make inventory` in {deploy_config} to generate it[/red]"
)
raise typer.Exit(os.EX_DATAERR)

loader = DataLoader()
inventory = InventoryManager(loader=loader, sources=[f"{inventory_path}"])

try:
return BastionHost(
ip=inventory.groups["CAULDRON_UNIX"].get_vars()["bastion_ip"],
user_name=inventory.groups["CAULDRON_UNIX"].get_vars()["bastion_user"],
)
except KeyError as err:
rich.print(
f"[red]{inventory_path} invalid! Unable to find bastion_ip in the inventory file. TIP: Please run `make inventory` in {deploy_config} to generate it[/red]"
)
raise typer.Exit(os.EX_DATAERR) from err


@app.callback()
def main(
deploy_config: Annotated[
Path, typer.Option(help="path to the deploy configuration")
]
],
):
"""Manages external clusters"""

state.deploy_config = deploy_config.expanduser()
assert (
deploy_config.is_dir()
), "deploy-config argument is not pointing to a directory!"
state.environment = _parse_environment(deploy_config)
state.environment = _parse_repo_config(deploy_config)
state.main_bastion_host = _parse_inventory(deploy_config)

# connect to ec2s
state.ec2_resource_autoscaling = autoscaling_ec2_client(state)
Expand Down Expand Up @@ -113,7 +140,8 @@ def summary(

"""

asyncio.run(api.summary(state, user_id or None, wallet_id or None))
if not asyncio.run(api.summary(state, user_id or None, wallet_id or None)):
raise typer.Exit(1)


@app.command()
Expand Down Expand Up @@ -157,5 +185,11 @@ def trigger_cluster_termination(
asyncio.run(api.trigger_cluster_termination(state, user_id, wallet_id))


@app.command()
def check_database_connection() -> None:
"""this will check the connection to simcore database is ready"""
asyncio.run(api.check_database_connection(state))


if __name__ == "__main__":
app()
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _print_dynamic_instances(
f"Up: {utils.timedelta_formatting(time_now - instance.ec2_instance.launch_time, color_code=True)}",
f"ExtIP: {instance.ec2_instance.public_ip_address}",
f"IntIP: {instance.ec2_instance.private_ip_address}",
f"/mnt/docker(free): {utils.color_encode_with_threshold(instance.disk_space.human_readable(), instance.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
f"/mnt/docker(free): {utils.color_encode_with_threshold(instance.disk_space.human_readable(), instance.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
]
),
service_table,
Expand Down Expand Up @@ -190,7 +190,7 @@ def _print_computational_clusters(
f"UserID: {cluster.primary.user_id}",
f"WalletID: {cluster.primary.wallet_id}",
f"Heartbeat: {utils.timedelta_formatting(time_now - cluster.primary.last_heartbeat) if cluster.primary.last_heartbeat else 'n/a'}",
f"/mnt/docker(free): {utils.color_encode_with_threshold(cluster.primary.disk_space.human_readable(), cluster.primary.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
f"/mnt/docker(free): {utils.color_encode_with_threshold(cluster.primary.disk_space.human_readable(), cluster.primary.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
]
),
"\n".join(
Expand Down Expand Up @@ -223,7 +223,7 @@ def _print_computational_clusters(
table.add_row(
"\n".join(
[
f"[italic]{utils.color_encode_with_state(f'Worker {index+1}', worker.ec2_instance)}[/italic]",
f"[italic]{utils.color_encode_with_state(f'Worker {index + 1}', worker.ec2_instance)}[/italic]",
f"Name: {worker.name}",
f"ID: {worker.ec2_instance.id}",
f"AMI: {worker.ec2_instance.image_id}",
Expand All @@ -232,7 +232,7 @@ def _print_computational_clusters(
f"ExtIP: {worker.ec2_instance.public_ip_address}",
f"IntIP: {worker.ec2_instance.private_ip_address}",
f"DaskWorkerIP: {worker.dask_ip}",
f"/mnt/docker(free): {utils.color_encode_with_threshold(worker.disk_space.human_readable(), worker.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
f"/mnt/docker(free): {utils.color_encode_with_threshold(worker.disk_space.human_readable(), worker.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
"",
]
),
Expand Down Expand Up @@ -301,7 +301,6 @@ async def _analyze_computational_instances(
computational_instances: list[ComputationalInstance],
ssh_key_path: Path | None,
) -> list[ComputationalCluster]:

all_disk_spaces = [UNDEFINED_BYTESIZE] * len(computational_instances)
if ssh_key_path is not None:
all_disk_spaces = await asyncio.gather(
Expand Down Expand Up @@ -414,7 +413,7 @@ async def _parse_dynamic_instances(
return dynamic_instances


async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -> None:
async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -> bool:
# get all the running instances
assert state.ec2_resource_autoscaling
dynamic_instances = await ec2.list_dynamic_instances_from_ec2(
Expand All @@ -429,6 +428,14 @@ async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -
state.ec2_resource_autoscaling.meta.client.meta.region_name,
)

time_threshold = arrow.utcnow().shift(minutes=-30).datetime

dynamic_services_in_error = any(
service.needs_manual_intervention and service.created_at < time_threshold
for instance in dynamic_autoscaled_instances
for service in instance.running_services
)

assert state.ec2_resource_clusters_keeper
computational_instances = await ec2.list_computational_instances_from_ec2(
state, user_id, wallet_id
Expand All @@ -442,6 +449,8 @@ async def summary(state: AppState, user_id: int | None, wallet_id: int | None) -
state.ec2_resource_clusters_keeper.meta.client.meta.region_name,
)

return not dynamic_services_in_error


def _print_computational_tasks(
user_id: int,
Expand Down Expand Up @@ -638,3 +647,7 @@ async def trigger_cluster_termination(
)
else:
rich.print("not deleting anything")


async def check_database_connection(state: AppState) -> None:
await db.check_db_connection(state)
92 changes: 66 additions & 26 deletions scripts/maintenance/computational-clusters/autoscaled_monitor/db.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import uuid
from collections.abc import AsyncGenerator
Expand All @@ -9,37 +10,59 @@
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from .models import AppState, ComputationalTask, PostgresDB
from .ssh import ssh_tunnel


@contextlib.asynccontextmanager
async def db_engine(state: AppState) -> AsyncGenerator[AsyncEngine, Any]:
engine = None
try:
for env in [
"POSTGRES_USER",
"POSTGRES_PASSWORD",
"POSTGRES_ENDPOINT",
"POSTGRES_DB",
]:
assert state.environment[env]
postgres_db = PostgresDB(
dsn=TypeAdapter(PostgresDsn).validate_python(
f"postgresql+asyncpg://{state.environment['POSTGRES_USER']}:{state.environment['POSTGRES_PASSWORD']}@{state.environment['POSTGRES_ENDPOINT']}/{state.environment['POSTGRES_DB']}"
async def db_engine(
state: AppState,
) -> AsyncGenerator[AsyncEngine, Any]:
async with contextlib.AsyncExitStack() as stack:
assert state.environment["POSTGRES_ENDPOINT"] # nosec
db_endpoint = state.environment["POSTGRES_ENDPOINT"]
if state.main_bastion_host:
assert state.ssh_key_path # nosec
db_host, db_port = db_endpoint.split(":")
tunnel = stack.enter_context(
ssh_tunnel(
ssh_host=state.main_bastion_host.ip,
username=state.main_bastion_host.user_name,
private_key_path=state.ssh_key_path,
remote_bind_host=db_host,
remote_bind_port=int(db_port),
)
)
assert tunnel
db_endpoint = (
f"{tunnel.local_bind_address[0]}:{tunnel.local_bind_address[1]}"
)
)

engine = create_async_engine(
f"{postgres_db.dsn}",
connect_args={
"server_settings": {
"application_name": "osparc-clusters-monitoring-script"
}
},
)
yield engine
finally:
if engine:
await engine.dispose()
engine = None
try:
for env in [
"POSTGRES_USER",
"POSTGRES_PASSWORD",
"POSTGRES_DB",
]:
assert state.environment[env]
postgres_db = PostgresDB(
dsn=TypeAdapter(PostgresDsn).validate_python(
f"postgresql+asyncpg://{state.environment['POSTGRES_USER']}:{state.environment['POSTGRES_PASSWORD']}@{db_endpoint}/{state.environment['POSTGRES_DB']}"
)
)

engine = create_async_engine(
f"{postgres_db.dsn}",
connect_args={
"server_settings": {
"application_name": "osparc-clusters-monitoring-script"
}
},
)
yield engine
finally:
if engine:
await engine.dispose()


async def abort_job_in_db(
Expand All @@ -57,6 +80,23 @@ async def abort_job_in_db(
rich.print(f"set comp_tasks for {project_id=}/{node_id=} set to ABORTED")


async def check_db_connection(state: AppState) -> bool:
try:
async with contextlib.AsyncExitStack() as stack:
engine = await stack.enter_async_context(db_engine(state))
async with asyncio.timeout(5):
db_connection = await stack.enter_async_context(engine.connect())
result = await db_connection.execute(sa.text("SELECT 1"))
result.one()
rich.print(
"[green]Database connection test completed successfully![/green]"
)
return True
except Exception as e: # pylint: disable=broad-exception-caught
rich.print(f"[red]Database connection test failed: {e}[/red]")
return False


async def list_computational_tasks_from_db(
state: AppState, user_id: int
) -> list[ComputationalTask]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from pydantic import BaseModel, ByteSize, PostgresDsn


@dataclass(kw_only=True, frozen=True, slots=True)
class BastionHost:
ip: str
user_name: str


@dataclass(kw_only=True)
class AppState:
environment: dict[str, str | None] = field(default_factory=dict)
Expand All @@ -22,6 +28,7 @@ class AppState:
computational_parser_workers: parse.Parser
deploy_config: Path | None = None
ssh_key_path: Path | None = None
main_bastion_host: BastionHost | None = None

computational_bastion: Instance | None = None
dynamic_bastion: Instance | None = None
Expand Down
5 changes: 3 additions & 2 deletions scripts/maintenance/computational-clusters/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ dependencies = [
"black",
"boto3",
# NOTE: these must be in sync with ospar
"cloudpickle",
"dask[distributed]",
"cloudpickle==3.1.0",
"dask[distributed]==2024.12.0",
"mypy_boto3_ec2",
"types-boto3",
"parse",
Expand All @@ -19,6 +19,7 @@ dependencies = [
"rich",
"sqlalchemy[asyncio]",
"sshtunnel",
"ansible>=10.7.0",
]
name = "autoscaled-monitor"
version = "1.0.0"
Expand Down
Loading
Loading