Skip to content

Commit 879da9a

Browse files
authored
🐛Autoscaled instances monitoring script: small fixes (#7337)
1 parent d832a46 commit 879da9a

File tree

1 file changed

+6
-7
lines changed
  • scripts/maintenance/computational-clusters/autoscaled_monitor

1 file changed

+6
-7
lines changed

scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ async def dask_client(
3636
)
3737

3838
try:
39-
4039
async with contextlib.AsyncExitStack() as stack:
4140
if instance.public_ip_address is not None:
4241
url = AnyUrl(f"tls://{instance.public_ip_address}:{_SCHEDULER_PORT}")
@@ -85,7 +84,7 @@ async def trigger_job_cancellation_in_scheduler(
8584
task_id: TaskId,
8685
) -> None:
8786
async with dask_client(state, cluster.primary.ec2_instance) as client:
88-
task_future = distributed.Future(task_id)
87+
task_future = distributed.Future(task_id, client=client)
8988
cancel_event = distributed.Event(
9089
name=TASK_CANCEL_EVENT_NAME_TEMPLATE.format(task_future.key),
9190
client=client,
@@ -112,13 +111,13 @@ def _list_tasks(
112111

113112
return dict(task_state_to_tasks)
114113

115-
list_of_tasks: dict[TaskState, list[TaskId]] = []
114+
list_of_tasks: dict[TaskState, list[TaskId]] = {}
116115
try:
117-
list_of_tasks = await client.run_on_scheduler(
118-
_list_tasks
119-
) # type: ignore
116+
list_of_tasks = await client.run_on_scheduler(_list_tasks) # type: ignore
120117
except TypeError:
121-
rich.print(f"ERROR while recoverring unrunnable tasks using {dask_client=}. Defaulting to empty list of tasks!!")
118+
rich.print(
119+
f"ERROR while recoverring unrunnable tasks using {dask_client=}. Defaulting to empty list of tasks!!"
120+
)
122121
return list_of_tasks
123122

124123

0 commit comments

Comments
 (0)