Skip to content

Commit 3936e65

Browse files
committed
properly closing the dask Client, seems to solve issue with too many files (and perhaps some others)
1 parent a0778c8 commit 3936e65

File tree

1 file changed

+10
-12
lines changed

1 file changed

+10
-12
lines changed

pydra/engine/workers.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -875,18 +875,16 @@ def run_el(self, runnable, rerun=False, **kwargs):
875875

876876
async def exec_dask(self, runnable, rerun=False):
877877
"""Run a task (coroutine wrapper)."""
878-
if self.client is None:
879-
from dask.distributed import Client
880-
881-
self.client = await Client(**self.client_args, asynchronous=True)
882-
883-
if isinstance(runnable, TaskBase):
884-
future = self.client.submit(runnable._run, rerun)
885-
result = await future
886-
else: # it could be tuple that includes pickle files with tasks and inputs
887-
ind, task_main_pkl, task_orig = runnable
888-
future = self.client.submit(load_and_run, task_main_pkl, ind, rerun)
889-
result = await future
878+
from dask.distributed import Client
879+
880+
async with Client(**self.client_args, asynchronous=True) as client:
881+
if isinstance(runnable, TaskBase):
882+
future = client.submit(runnable._run, rerun)
883+
result = await future
884+
else: # it could be tuple that includes pickle files with tasks and inputs
885+
ind, task_main_pkl, task_orig = runnable
886+
future = client.submit(load_and_run, task_main_pkl, ind, rerun)
887+
result = await future
890888
return result
891889

892890
def close(self):

0 commit comments

Comments
 (0)