Skip to content

Commit 4a02e27

Browse files
committed
simplify
1 parent b8663e5 commit 4a02e27

File tree

2 files changed

+9
-15
lines changed

2 files changed

+9
-15
lines changed

services/autoscaling/src/simcore_service_autoscaling/modules/dask.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,11 @@ def _list_processing_tasks(
202202
for task_key, task_state in dask_scheduler.tasks.items():
203203
if task_state.processing_on:
204204
worker_to_processing_tasks[task_state.processing_on.address].append(
205-
(task_key, task_state.resource_restrictions or {})
205+
(
206+
task_key,
207+
(task_state.resource_restrictions or {})
208+
| {DASK_WORKER_THREAD_RESOURCE_NAME: 1},
209+
)
206210
)
207211
return worker_to_processing_tasks
208212

@@ -270,19 +274,6 @@ async def get_worker_used_resources(
270274
DaskNoWorkersError
271275
"""
272276

273-
def _list_processing_tasks_on_worker(
274-
dask_scheduler: distributed.Scheduler, *, worker_url: str
275-
) -> list[tuple[dask.typing.Key, DaskTaskResources]]:
276-
processing_tasks = []
277-
for task_key, task_state in dask_scheduler.tasks.items():
278-
if task_state.processing_on and (
279-
task_state.processing_on.address == worker_url
280-
):
281-
processing_tasks.append(
282-
(task_key, task_state.resource_restrictions or {})
283-
)
284-
return processing_tasks
285-
286277
async with _scheduler_client(scheduler_url, authentication) as client:
287278
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)
288279
worker_to_tasks = await _list_cluster_processing_tasks(client)

services/autoscaling/tests/unit/test_modules_dask.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,10 @@ def _add_fct(x: int, y: int) -> int:
166166
scheduler_url, scheduler_authentication
167167
) == {
168168
next(iter(dask_spec_cluster_client.scheduler_info()["workers"])): [
169-
DaskTask(task_id=DaskTaskId(future_queued_task.key), required_resources={})
169+
DaskTask(
170+
task_id=DaskTaskId(future_queued_task.key),
171+
required_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1},
172+
)
170173
]
171174
}
172175

0 commit comments

Comments
 (0)