Skip to content

Commit d04f303

Browse files
authored
✨Autoscaling monitoring tool: make compatible with non-billable deploys (#6285)
1 parent 4058241 commit d04f303

File tree

5 files changed

+87
-28
lines changed

5 files changed

+87
-28
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ repos:
2828
args:
2929
- "--py311-plus"
3030
name: upgrade code
31+
exclude: ^scripts/maintenance/computational-clusters/autoscaled_monitor/cli\.py$ # Optional get replaced and typer does not like it
3132
- repo: https://github.com/hadialqattan/pycln
3233
rev: v2.1.4
3334
hooks:

scripts/maintenance/computational-clusters/autoscaled_monitor/cli.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
from pathlib import Path
3-
from typing import Annotated
3+
from typing import Annotated, Optional
44

55
import parse
66
import rich
@@ -10,15 +10,22 @@
1010
from . import core as api
1111
from .constants import (
1212
DEFAULT_COMPUTATIONAL_EC2_FORMAT,
13+
DEFAULT_COMPUTATIONAL_EC2_FORMAT_WORKERS,
1314
DEFAULT_DYNAMIC_EC2_FORMAT,
1415
DEPLOY_SSH_KEY_PARSER,
16+
wallet_id_spec,
1517
)
1618
from .ec2 import autoscaling_ec2_client, cluster_keeper_ec2_client
1719
from .models import AppState
1820

1921
state: AppState = AppState(
2022
dynamic_parser=parse.compile(DEFAULT_DYNAMIC_EC2_FORMAT),
21-
computational_parser=parse.compile(DEFAULT_COMPUTATIONAL_EC2_FORMAT),
23+
computational_parser_primary=parse.compile(
24+
DEFAULT_COMPUTATIONAL_EC2_FORMAT, {"wallet_id_spec": wallet_id_spec}
25+
),
26+
computational_parser_workers=parse.compile(
27+
DEFAULT_COMPUTATIONAL_EC2_FORMAT_WORKERS, {"wallet_id_spec": wallet_id_spec}
28+
),
2229
)
2330

2431
app = typer.Typer()
@@ -68,13 +75,24 @@ def main(
6875
state.ec2_resource_clusters_keeper = cluster_keeper_ec2_client(state)
6976

7077
assert state.environment["EC2_INSTANCES_KEY_NAME"]
71-
state.dynamic_parser = parse.compile(
72-
f"{state.environment['EC2_INSTANCES_NAME_PREFIX']}-{{key_name}}"
73-
)
78+
dynamic_pattern = f"{state.environment['EC2_INSTANCES_NAME_PREFIX']}-{{key_name}}"
79+
state.dynamic_parser = parse.compile(dynamic_pattern)
80+
rich.print(f"using dynamic-naming-regex: {dynamic_pattern}")
7481
if state.environment["CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX"]:
75-
state.computational_parser = parse.compile(
76-
f"{state.environment['CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX']}-{DEFAULT_COMPUTATIONAL_EC2_FORMAT}"
82+
state.computational_parser_primary = parse.compile(
83+
f"{state.environment['CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX'].strip('-')}-{DEFAULT_COMPUTATIONAL_EC2_FORMAT}",
84+
{"wallet_id_spec", wallet_id_spec},
85+
)
86+
state.computational_parser_workers = parse.compile(
87+
f"{state.environment['CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX'].strip('-')}-{DEFAULT_COMPUTATIONAL_EC2_FORMAT_WORKERS}",
88+
{"wallet_id_spec", wallet_id_spec},
7789
)
90+
rich.print(
91+
f"compuational-primary-naming-regex: {state.computational_parser_primary._expression}" # noqa: SLF001
92+
)
93+
rich.print(
94+
f"compuational-workers-naming-regex: {state.computational_parser_workers._expression}" # noqa: SLF001
95+
)
7896

7997
# locate ssh key path
8098
for file_path in deploy_config.glob("**/*.pem"):
@@ -117,7 +135,10 @@ def summary(
117135
@app.command()
118136
def cancel_jobs(
119137
user_id: Annotated[int, typer.Option(help="the user ID")],
120-
wallet_id: Annotated[int, typer.Option(help="the wallet ID")],
138+
wallet_id: Annotated[
139+
Optional[int | None], # noqa: UP007 # typer does not understand | syntax
140+
typer.Option(help="the wallet ID"),
141+
] = None,
121142
*,
122143
force: Annotated[
123144
bool,

scripts/maintenance/computational-clusters/autoscaled_monitor/constants.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,20 @@
44
import parse
55
from pydantic import ByteSize
66

7+
8+
@parse.with_pattern(r"None|\d+")
9+
def wallet_id_spec(text) -> None | int:
10+
if text == "None":
11+
return None
12+
return int(text)
13+
14+
715
DEFAULT_COMPUTATIONAL_EC2_FORMAT: Final[
816
str
9-
] = r"osparc-computational-cluster-{role}-{swarm_stack_name}-user_id:{user_id:d}-wallet_id:{wallet_id:d}"
17+
] = r"osparc-computational-cluster-{role}-{swarm_stack_name}-user_id:{user_id:d}-wallet_id:{wallet_id:wallet_id_spec}"
18+
DEFAULT_COMPUTATIONAL_EC2_FORMAT_WORKERS: Final[
19+
str
20+
] = r"osparc-computational-cluster-{role}-{swarm_stack_name}-user_id:{user_id:d}-wallet_id:{wallet_id:wallet_id_spec}-{key_name}"
1021
DEFAULT_DYNAMIC_EC2_FORMAT: Final[str] = r"osparc-dynamic-autoscaled-worker-{key_name}"
1122
DEPLOY_SSH_KEY_PARSER: Final[parse.Parser] = parse.compile(r"osparc-{random_name}.pem")
1223

scripts/maintenance/computational-clusters/autoscaled_monitor/core.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ def _parse_computational(
3838
state: AppState, instance: Instance
3939
) -> ComputationalInstance | None:
4040
name = utils.get_instance_name(instance)
41-
if result := state.computational_parser.search(name):
41+
if result := (
42+
state.computational_parser_workers.parse(name)
43+
or state.computational_parser_primary.parse(name)
44+
):
4245
assert isinstance(result, parse.Result)
46+
# special handling for optional wallet
47+
rich.print(result.named)
48+
4349
last_heartbeat = utils.get_last_heartbeat(instance)
4450
return ComputationalInstance(
4551
role=InstanceRole(result["role"]),
@@ -538,12 +544,13 @@ async def cancel_jobs( # noqa: C901, PLR0912
538544
rich.print(the_cluster.datasets)
539545
try:
540546
if response := typer.prompt(
541-
"[yellow]Which dataset to cancel? all for all of them.[/yellow]",
547+
"Which dataset to cancel? (all: will cancel everything, 1-5: will cancel jobs 1-5, or 4: will cancel job #4)",
542548
default="none",
543549
):
544550
if response == "none":
545551
rich.print("[yellow]not cancelling anything[/yellow]")
546552
elif response == "all":
553+
rich.print("cancelling all tasks")
547554
for comp_task, dask_task in task_to_dask_job:
548555
if dask_task is not None and dask_task.state != "unknown":
549556
await dask.trigger_job_cancellation_in_scheduler(
@@ -565,25 +572,43 @@ async def cancel_jobs( # noqa: C901, PLR0912
565572

566573
rich.print("cancelled all tasks")
567574
else:
568-
selected_index = TypeAdapter(int).validate_python(response)
569-
comp_task, dask_task = task_to_dask_job[selected_index]
570-
if dask_task is not None and dask_task.state != "unknown":
571-
await dask.trigger_job_cancellation_in_scheduler(
572-
state, the_cluster, dask_task.job_id
573-
)
574-
if comp_task is None:
575-
# we need to clear it of the cluster
576-
await dask.remove_job_from_scheduler(
577-
state, the_cluster, dask_task.job_id
578-
)
575+
try:
576+
# Split the response and handle ranges
577+
indices = response.split("-")
578+
if len(indices) == 2:
579+
start_index, end_index = map(int, indices)
580+
selected_indices = range(start_index, end_index + 1)
581+
else:
582+
selected_indices = [int(indices[0])]
583+
584+
for selected_index in selected_indices:
585+
comp_task, dask_task = task_to_dask_job[selected_index]
586+
if dask_task is not None and dask_task.state != "unknown":
587+
await dask.trigger_job_cancellation_in_scheduler(
588+
state, the_cluster, dask_task.job_id
589+
)
590+
if comp_task is None:
591+
# we need to clear it of the cluster
592+
await dask.remove_job_from_scheduler(
593+
state, the_cluster, dask_task.job_id
594+
)
595+
596+
if comp_task is not None and force:
597+
await db.abort_job_in_db(
598+
state, comp_task.project_id, comp_task.node_id
599+
)
600+
rich.print(f"Cancelled selected tasks: {response}")
579601

580-
if comp_task is not None and force:
581-
await db.abort_job_in_db(
582-
state, comp_task.project_id, comp_task.node_id
602+
except ValidationError:
603+
rich.print(
604+
"[yellow]wrong index format, not cancelling anything[/yellow]"
605+
)
606+
except IndexError:
607+
rich.print(
608+
"[yellow]index out of range, not cancelling anything[/yellow]"
583609
)
584-
585610
except ValidationError:
586-
rich.print("[yellow]wrong index, not cancelling anything[/yellow]")
611+
rich.print("[yellow]wrong input, not cancelling anything[/yellow]")
587612

588613

589614
async def trigger_cluster_termination(

scripts/maintenance/computational-clusters/autoscaled_monitor/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ class AppState:
1818
ec2_resource_autoscaling: EC2ServiceResource | None = None
1919
ec2_resource_clusters_keeper: EC2ServiceResource | None = None
2020
dynamic_parser: parse.Parser
21-
computational_parser: parse.Parser
21+
computational_parser_primary: parse.Parser
22+
computational_parser_workers: parse.Parser
2223
deploy_config: Path | None = None
2324
ssh_key_path: Path | None = None
2425

0 commit comments

Comments
 (0)