Skip to content

Commit 45ae0eb

Browse files
authored
🎨Maintenance: osparc clusters improvements (#5423)
1 parent b88a995 commit 45ae0eb

File tree

1 file changed

+36
-19
lines changed

1 file changed

+36
-19
lines changed

scripts/maintenance/computational-clusters/osparc_clusters.py

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
_SSH_USER_NAME: Final[str] = "ubuntu"
3131

3232

33-
@dataclass(frozen=True, slots=True, kw_only=True)
33+
@dataclass(slots=True, kw_only=True)
3434
class AutoscaledInstance:
3535
name: str
3636
ec2_instance: Instance
@@ -42,7 +42,7 @@ class InstanceRole(str, Enum):
4242
worker = "worker"
4343

4444

45-
@dataclass(frozen=True, slots=True, kw_only=True)
45+
@dataclass(slots=True, kw_only=True)
4646
class ComputationalInstance(AutoscaledInstance):
4747
role: InstanceRole
4848
user_id: int
@@ -62,7 +62,7 @@ class DynamicService:
6262
containers: list[str]
6363

6464

65-
@dataclass(frozen=True, slots=True, kw_only=True)
65+
@dataclass(slots=True, kw_only=True)
6666
class DynamicInstance(AutoscaledInstance):
6767
running_services: list[DynamicService]
6868

@@ -402,7 +402,6 @@ def _print_computational_clusters(
402402
) -> None:
403403
time_now = arrow.utcnow()
404404
table = Table(
405-
Column(""),
406405
Column("Instance", justify="left", overflow="fold"),
407406
Column("Links", justify="left", overflow="fold"),
408407
Column("Computational details"),
@@ -416,9 +415,9 @@ def _print_computational_clusters(
416415
):
417416
# first print primary machine info
418417
table.add_row(
419-
f"[bold]{_color_encode_with_state('Primary', cluster.primary.ec2_instance)}",
420418
"\n".join(
421419
[
420+
f"[bold]{_color_encode_with_state('Primary', cluster.primary.ec2_instance)}",
422421
f"Name: {cluster.primary.name}",
423422
f"ID: {cluster.primary.ec2_instance.id}",
424423
f"AMI: {cluster.primary.ec2_instance.image_id}({cluster.primary.ec2_instance.image.name})",
@@ -448,19 +447,20 @@ def _print_computational_clusters(
448447
)
449448

450449
# now add the workers
451-
for worker in cluster.workers:
450+
for index, worker in enumerate(cluster.workers):
452451
table.add_row(
453-
f"[italic]{_color_encode_with_state('Worker', worker.ec2_instance)}[/italic]",
454452
"\n".join(
455453
[
454+
f"[italic]{_color_encode_with_state(f'Worker {index+1}', worker.ec2_instance)}[/italic]",
455+
f"Name: {worker.name}",
456456
f"ID: {worker.ec2_instance.id}",
457457
f"AMI: {worker.ec2_instance.image_id}({worker.ec2_instance.image.name})",
458458
f"Type: {worker.ec2_instance.instance_type}",
459459
f"Up: {_timedelta_formatting(time_now - worker.ec2_instance.launch_time, color_code=True)}",
460460
f"ExtIP: {worker.ec2_instance.public_ip_address}",
461461
f"IntIP: {worker.ec2_instance.private_ip_address}",
462-
f"Name: {worker.name}",
463462
f"/mnt/docker(free): {_color_encode_with_threshold(worker.disk_space.human_readable(), worker.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
463+
"",
464464
]
465465
),
466466
"\n".join(
@@ -526,8 +526,6 @@ def _dask_list_tasks(dask_client: distributed.Client) -> dict[TaskState, list[Ta
526526
def _list_tasks(
527527
dask_scheduler: distributed.Scheduler,
528528
) -> dict[TaskId, TaskState]:
529-
from collections import defaultdict
530-
531529
task_state_to_tasks = defaultdict(list)
532530
for task in dask_scheduler.tasks.values():
533531
task_state_to_tasks[task.state].append(task.key)
@@ -556,16 +554,34 @@ def _dask_client(ip_address: str) -> distributed.Client:
556554

557555
def _analyze_computational_instances(
558556
computational_instances: list[ComputationalInstance],
559-
ssh_key_path: Path,
557+
ssh_key_path: Path | None,
560558
) -> list[ComputationalCluster]:
559+
560+
all_disk_spaces = [UNDEFINED_BYTESIZE] * len(computational_instances)
561+
if ssh_key_path is not None:
562+
all_disk_spaces = asyncio.get_event_loop().run_until_complete(
563+
asyncio.gather(
564+
*(
565+
asyncio.get_event_loop().run_in_executor(
566+
None,
567+
_ssh_and_get_available_disk_space,
568+
instance.ec2_instance,
569+
_SSH_USER_NAME,
570+
ssh_key_path,
571+
)
572+
for instance in computational_instances
573+
),
574+
return_exceptions=True,
575+
)
576+
)
577+
561578
computational_clusters = []
562-
for instance in track(
563-
computational_instances, description="Collecting computational clusters data..."
579+
for instance, disk_space in track(
580+
zip(computational_instances, all_disk_spaces, strict=True),
581+
description="Collecting computational clusters data...",
564582
):
565-
docker_disk_space = _ssh_and_get_available_disk_space(
566-
instance.ec2_instance, _SSH_USER_NAME, ssh_key_path
567-
)
568-
upgraded_instance = replace(instance, disk_space=docker_disk_space)
583+
if isinstance(disk_space, ByteSize):
584+
instance.disk_space = disk_space
569585
if instance.role is InstanceRole.manager:
570586
scheduler_info = {}
571587
datasets_on_cluster = ()
@@ -581,9 +597,10 @@ def _analyze_computational_instances(
581597

582598
assert isinstance(datasets_on_cluster, tuple)
583599
assert isinstance(processing_jobs, dict)
600+
584601
computational_clusters.append(
585602
ComputationalCluster(
586-
primary=upgraded_instance,
603+
primary=instance,
587604
workers=[],
588605
scheduler_info=scheduler_info,
589606
datasets=datasets_on_cluster,
@@ -600,7 +617,7 @@ def _analyze_computational_instances(
600617
cluster.primary.user_id == instance.user_id
601618
and cluster.primary.wallet_id == instance.wallet_id
602619
):
603-
cluster.workers.append(upgraded_instance)
620+
cluster.workers.append(instance)
604621

605622
return computational_clusters
606623

0 commit comments

Comments
 (0)