@@ -295,12 +295,18 @@ def _list_processing_tasks_on_worker(
295295 total_resources_used .update (task_resources )
296296
297297 _logger .debug ("found %s for %s" , f"{ total_resources_used = } " , f"{ worker_url = } " )
298- return Resources (
298+ worker_used_resources = Resources (
299299 cpus = total_resources_used .get ("CPU" , 0 ),
300300 ram = TypeAdapter (ByteSize ).validate_python (
301301 total_resources_used .get ("RAM" , 0 )
302302 ),
303303 )
304+ if worker_processing_tasks :
305+ worker_used_resources .generic_resources [
306+ DASK_WORKER_THREAD_RESOURCE_NAME
307+ ] = len (worker_processing_tasks )
308+
309+ return worker_used_resources
304310
305311
306312async def compute_cluster_total_resources (
@@ -322,11 +328,17 @@ async def compute_cluster_total_resources(
322328 for worker_details in workers .values ():
323329 if worker_details ["host" ] not in instance_host_resources_map :
324330 continue
325- worker_ram = worker_details ["memory_limit " ]
331+ worker_dask_resources = worker_details ["resources " ]
326332 worker_threads = worker_details ["nthreads" ]
327333 cluster_resources += Resources (
328- cpus = instance_host_resources_map [worker_details ["host" ]].cpus ,
329- ram = TypeAdapter (ByteSize ).validate_python (worker_ram ),
334+ cpus = worker_dask_resources .get (
335+ "CPU" , instance_host_resources_map [worker_details ["host" ]].cpus
336+ ),
337+ ram = TypeAdapter (ByteSize ).validate_python (
338+ worker_dask_resources .get (
339+ "RAM" , instance_host_resources_map [worker_details ["host" ]].ram
340+ )
341+ ),
330342 generic_resources = {DASK_WORKER_THREAD_RESOURCE_NAME : worker_threads },
331343 )
332344
0 commit comments