File tree Expand file tree Collapse file tree 1 file changed +6
-7
lines changed
scripts/maintenance/computational-clusters/autoscaled_monitor Expand file tree Collapse file tree 1 file changed +6
-7
lines changed Original file line number Diff line number Diff line change @@ -36,7 +36,6 @@ async def dask_client(
3636 )
3737
3838 try :
39-
4039 async with contextlib .AsyncExitStack () as stack :
4140 if instance .public_ip_address is not None :
4241 url = AnyUrl (f"tls://{ instance .public_ip_address } :{ _SCHEDULER_PORT } " )
@@ -85,7 +84,7 @@ async def trigger_job_cancellation_in_scheduler(
8584 task_id : TaskId ,
8685) -> None :
8786 async with dask_client (state , cluster .primary .ec2_instance ) as client :
88- task_future = distributed .Future (task_id )
87+ task_future = distributed .Future (task_id , client = client )
8988 cancel_event = distributed .Event (
9089 name = TASK_CANCEL_EVENT_NAME_TEMPLATE .format (task_future .key ),
9190 client = client ,
@@ -112,13 +111,13 @@ def _list_tasks(
112111
113112 return dict (task_state_to_tasks )
114113
115- list_of_tasks : dict [TaskState , list [TaskId ]] = []
114+ list_of_tasks : dict [TaskState , list [TaskId ]] = {}
116115 try :
117- list_of_tasks = await client .run_on_scheduler (
118- _list_tasks
119- ) # type: ignore
116+ list_of_tasks = await client .run_on_scheduler (_list_tasks ) # type: ignore
120117 except TypeError :
121- rich .print (f"ERROR while recoverring unrunnable tasks using { dask_client = } . Defaulting to empty list of tasks!!" )
118+ rich .print (
119+ f"ERROR while recoverring unrunnable tasks using { dask_client = } . Defaulting to empty list of tasks!!"
120+ )
122121 return list_of_tasks
123122
124123
You canāt perform that action at this time.
0 commit comments