1515)
1616from distributed .core import Status
1717from models_library .clusters import ClusterAuthentication , TLSAuthentication
18- from pydantic import AnyUrl , ByteSize , TypeAdapter
18+ from pydantic import AnyUrl
1919
2020from ..core .errors import (
2121 DaskNoWorkersError ,
@@ -306,7 +306,7 @@ async def compute_cluster_total_resources(
306306 if not instances :
307307 return Resources .create_as_empty ()
308308 async with _scheduler_client (scheduler_url , authentication ) as client :
309- instance_host_resources_map = {
309+ ec2_instance_resources_map = {
310310 node_ip_from_ec2_private_dns (i ): i .resources for i in instances
311311 }
312312 scheduler_info = client .scheduler_info ()
@@ -315,20 +315,17 @@ async def compute_cluster_total_resources(
315315 workers : dict [str , Any ] = scheduler_info ["workers" ]
316316 cluster_resources = Resources .create_as_empty ()
317317 for worker_details in workers .values ():
318- if worker_details ["host" ] not in instance_host_resources_map :
318+ if worker_details ["host" ] not in ec2_instance_resources_map :
319319 continue
320+ # get dask information about resources
320321 worker_dask_resources = worker_details ["resources" ]
321322 worker_threads = worker_details ["nthreads" ]
322- cluster_resources += Resources (
323- cpus = worker_dask_resources .get (
324- "CPU" , instance_host_resources_map [worker_details ["host" ]].cpus
325- ),
326- ram = TypeAdapter (ByteSize ).validate_python (
327- worker_dask_resources .get (
328- "RAM" , instance_host_resources_map [worker_details ["host" ]].ram
329- )
330- ),
331- generic_resources = {DASK_WORKER_THREAD_RESOURCE_NAME : worker_threads },
323+ worker_dask_resources = {
324+ ** worker_dask_resources ,
325+ DASK_WORKER_THREAD_RESOURCE_NAME : worker_threads ,
326+ }
327+ cluster_resources += Resources .from_flat_dict (
328+ worker_dask_resources .items (), mapping = DASK_TO_RESOURCE_NAME_MAPPING
332329 )
333330
334331 return cluster_resources
0 commit comments