Skip to content

Commit 462ff51

Browse files
committed
more
1 parent 317273c commit 462ff51

File tree

1 file changed

+7
-18
lines changed
  • services/autoscaling/src/simcore_service_autoscaling/modules

1 file changed

+7
-18
lines changed

services/autoscaling/src/simcore_service_autoscaling/modules/dask.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -208,24 +208,14 @@ async def list_unrunnable_tasks(
208208
DaskSchedulerNotFoundError
209209
"""
210210

211-
def _list_tasks(
212-
dask_scheduler: distributed.Scheduler,
213-
) -> dict[dask.typing.Key, dict[str, float]]:
214-
# NOTE: task.key can be a byte, str, or a tuple
215-
return {
216-
task.key: task.resource_restrictions or {}
217-
for task in dask_scheduler.unrunnable
218-
}
219-
220211
async with _scheduler_client(scheduler_url, authentication) as client:
221212
known_tasks = await _list_cluster_known_tasks(client)
222213
list_of_tasks = known_tasks["unrunnable"]
223214

224215
return [
225216
DaskTask(
226217
task_id=_dask_key_to_dask_task_id(task_id),
227-
required_resources=task_resources
228-
| {DASK_WORKER_THREAD_RESOURCE_NAME: 1},
218+
required_resources=task_resources,
229219
)
230220
for task_id, task_resources in list_of_tasks.items()
231221
]
@@ -291,24 +281,23 @@ async def get_worker_used_resources(
291281
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)
292282
known_tasks = await _list_cluster_known_tasks(client)
293283
worker_processing_tasks = known_tasks["processing"].get(worker_url, [])
284+
if not worker_processing_tasks:
285+
return Resources.create_as_empty()
294286

295287
total_resources_used: collections.Counter[str] = collections.Counter()
296288
for _, task_resources in worker_processing_tasks:
297289
total_resources_used.update(task_resources)
298290

299291
_logger.debug("found %s for %s", f"{total_resources_used=}", f"{worker_url=}")
300-
worker_used_resources = Resources(
292+
return Resources(
301293
cpus=total_resources_used.get("CPU", 0),
302294
ram=TypeAdapter(ByteSize).validate_python(
303295
total_resources_used.get("RAM", 0)
304296
),
297+
generic_resources={
298+
k: v for k, v in total_resources_used.items() if k not in {"CPU", "RAM"}
299+
},
305300
)
306-
if worker_processing_tasks:
307-
worker_used_resources.generic_resources[
308-
DASK_WORKER_THREAD_RESOURCE_NAME
309-
] = len(worker_processing_tasks)
310-
311-
return worker_used_resources
312301

313302

314303
async def compute_cluster_total_resources(

0 commit comments

Comments
 (0)