diff --git a/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py b/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py index 750ef816bc86..d6ca45ba7a3d 100644 --- a/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py +++ b/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py @@ -36,7 +36,6 @@ async def dask_client( ) try: - async with contextlib.AsyncExitStack() as stack: if instance.public_ip_address is not None: url = AnyUrl(f"tls://{instance.public_ip_address}:{_SCHEDULER_PORT}") @@ -85,7 +84,7 @@ async def trigger_job_cancellation_in_scheduler( task_id: TaskId, ) -> None: async with dask_client(state, cluster.primary.ec2_instance) as client: - task_future = distributed.Future(task_id) + task_future = distributed.Future(task_id, client=client) cancel_event = distributed.Event( name=TASK_CANCEL_EVENT_NAME_TEMPLATE.format(task_future.key), client=client, @@ -112,13 +111,13 @@ def _list_tasks( return dict(task_state_to_tasks) - list_of_tasks: dict[TaskState, list[TaskId]] = [] + list_of_tasks: dict[TaskState, list[TaskId]] = {} try: - list_of_tasks = await client.run_on_scheduler( - _list_tasks - ) # type: ignore + list_of_tasks = await client.run_on_scheduler(_list_tasks) # type: ignore except TypeError: - rich.print(f"ERROR while recoverring unrunnable tasks using {dask_client=}. Defaulting to empty list of tasks!!") + rich.print( + f"ERROR while recoverring unrunnable tasks using {dask_client=}. Defaulting to empty list of tasks!!" + ) return list_of_tasks