Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async def dask_client(
)

try:

async with contextlib.AsyncExitStack() as stack:
if instance.public_ip_address is not None:
url = AnyUrl(f"tls://{instance.public_ip_address}:{_SCHEDULER_PORT}")
Expand Down Expand Up @@ -85,7 +84,7 @@ async def trigger_job_cancellation_in_scheduler(
task_id: TaskId,
) -> None:
async with dask_client(state, cluster.primary.ec2_instance) as client:
task_future = distributed.Future(task_id)
task_future = distributed.Future(task_id, client=client)
cancel_event = distributed.Event(
name=TASK_CANCEL_EVENT_NAME_TEMPLATE.format(task_future.key),
client=client,
Expand All @@ -112,13 +111,13 @@ def _list_tasks(

return dict(task_state_to_tasks)

list_of_tasks: dict[TaskState, list[TaskId]] = []
list_of_tasks: dict[TaskState, list[TaskId]] = {}
try:
list_of_tasks = await client.run_on_scheduler(
_list_tasks
) # type: ignore
list_of_tasks = await client.run_on_scheduler(_list_tasks) # type: ignore
except TypeError:
rich.print(f"ERROR while recoverring unrunnable tasks using {dask_client=}. Defaulting to empty list of tasks!!")
rich.print(
f"ERROR while recoverring unrunnable tasks using {dask_client=}. Defaulting to empty list of tasks!!"
)
return list_of_tasks


Expand Down