Skip to content

Commit a9ee18e

Browse files
committed
implemented compute cluster total resources
1 parent 6059775 commit a9ee18e

File tree

3 files changed

+41
-9
lines changed

3 files changed

+41
-9
lines changed

services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ async def compute_cluster_total_resources(
155155
assert self # nosec
156156
try:
157157
return await dask.compute_cluster_total_resources(
158-
_scheduler_url(app), _scheduler_auth(app), instances
158+
_scheduler_url(app),
159+
_scheduler_auth(app),
160+
[i.ec2_instance for i in instances],
159161
)
160162
except DaskNoWorkersError:
161163
return Resources.create_as_empty()

services/autoscaling/src/simcore_service_autoscaling/modules/dask.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
DaskWorkerNotFoundError,
2222
)
2323
from ..core.settings import DaskMonitoringSettings
24-
from ..models import AssociatedInstance, DaskTask, DaskTaskId
24+
from ..models import DaskTask, DaskTaskId
2525
from ..utils.utils_ec2 import (
2626
node_host_name_from_ec2_private_dns,
2727
node_ip_from_ec2_private_dns,
@@ -306,23 +306,31 @@ def _list_processing_tasks_on_worker(
306306
async def compute_cluster_total_resources(
307307
scheduler_url: AnyUrl,
308308
authentication: ClusterAuthentication,
309-
instances: list[AssociatedInstance],
309+
instances: list[EC2InstanceData],
310310
) -> Resources:
311311
if not instances:
312312
return Resources.create_as_empty()
313313
async with _scheduler_client(scheduler_url, authentication) as client:
314-
instance_hosts = (
315-
node_ip_from_ec2_private_dns(i.ec2_instance) for i in instances
316-
)
314+
instance_host_resources_map = {
315+
node_ip_from_ec2_private_dns(i): i.resources for i in instances
316+
}
317317
scheduler_info = client.scheduler_info()
318318
if "workers" not in scheduler_info or not scheduler_info["workers"]:
319319
raise DaskNoWorkersError(url=scheduler_url)
320320
workers: dict[str, Any] = scheduler_info["workers"]
321+
cluster_resources = Resources.create_as_empty()
321322
for worker_details in workers.values():
322-
if worker_details["host"] not in instance_hosts:
323+
if worker_details["host"] not in instance_host_resources_map:
323324
continue
325+
worker_ram = worker_details["memory_limit"]
326+
worker_threads = worker_details["nthreads"]
327+
cluster_resources += Resources(
328+
cpus=instance_host_resources_map[worker_details["host"]].cpus,
329+
ram=TypeAdapter(ByteSize).validate_python(worker_ram),
330+
generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: worker_threads},
331+
)
324332

325-
return Resources.create_as_empty()
333+
return cluster_resources
326334

327335

328336
async def try_retire_nodes(

services/autoscaling/tests/unit/test_modules_dask.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,11 @@ def _add_fct(x: int, y: int) -> int:
380380

381381

382382
async def test_compute_cluster_total_resources(
383+
dask_spec_local_cluster: distributed.SpecCluster,
383384
scheduler_url: AnyUrl,
384385
scheduler_authentication: ClusterAuthentication,
386+
fake_ec2_instance_data: Callable[..., EC2InstanceData],
387+
fake_localhost_ec2_instance_data: EC2InstanceData,
385388
):
386389
# asking for resources of empty cluster returns empty resources
387390
assert (
@@ -390,6 +393,26 @@ async def test_compute_cluster_total_resources(
390393
)
391394
== Resources.create_as_empty()
392395
)
396+
ec2_instance_data = fake_ec2_instance_data()
397+
assert ec2_instance_data.resources.cpus > 0
398+
assert ec2_instance_data.resources.ram > 0
399+
assert ec2_instance_data.resources.generic_resources == {}
400+
assert (
401+
await compute_cluster_total_resources(
402+
scheduler_url, scheduler_authentication, [ec2_instance_data]
403+
)
404+
== Resources.create_as_empty()
405+
), "this instance is not connected and should not be accounted for"
406+
407+
cluster_total_resources = await compute_cluster_total_resources(
408+
scheduler_url, scheduler_authentication, [fake_localhost_ec2_instance_data]
409+
)
410+
assert cluster_total_resources.cpus > 0
411+
assert cluster_total_resources.ram > 0
412+
assert DASK_WORKER_THREAD_RESOURCE_NAME in cluster_total_resources.generic_resources
413+
assert (
414+
cluster_total_resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] == 2
415+
)
393416

394417

395418
@pytest.mark.parametrize(
@@ -454,7 +477,6 @@ async def test_is_worker_connected(
454477

455478

456479
async def test_is_worker_retired(
457-
dask_spec_local_cluster: distributed.SpecCluster,
458480
scheduler_url: AnyUrl,
459481
scheduler_authentication: ClusterAuthentication,
460482
fake_ec2_instance_data: Callable[..., EC2InstanceData],

0 commit comments

Comments
 (0)