Skip to content

Commit 01a66f4

Browse files
committed
hopefully fixed issue with raising errors from cf worker
1 parent 6b1feb5 commit 01a66f4

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

pydra/engine/tests/test_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ def FunError(x):
13441344
assert "in FunError" in error_tb[-2]
13451345

13461346

1347-
def test_traceback_wf(tmpdir):
1347+
def test_traceback_wf(plugin_parallel: str, tmp_path: Path):
13481348
"""checking if the error raised in a function is properly returned by a workflow;
13491349
checking if there is an error filename in the error message that contains
13501350
full traceback including the line in the python function
@@ -1361,7 +1361,7 @@ def Workflow(x_list):
13611361

13621362
wf = Workflow(x_list=[3, 4])
13631363
with pytest.raises(RuntimeError, match="Task 'fun_error' failed.*") as exinfo:
1364-
with Submitter(worker="cf", cache_dir=tmpdir) as sub:
1364+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
13651365
sub(wf, raise_errors=True)
13661366

13671367
# getting error file from the error message

pydra/engine/workers.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class Worker(metaclass=abc.ABCMeta):
3636
"""A base class for execution of tasks."""
3737

3838
plugin_name: str
39+
loop: asyncio.AbstractEventLoop
3940

4041
def __init__(self, loop=None):
4142
"""Initialize the worker."""
@@ -48,10 +49,11 @@ def run(self, task: "Task[DefType]", rerun: bool = False) -> "Result":
4849
pass
4950

5051
async def run_async(self, task: "Task[DefType]", rerun: bool = False) -> "Result":
51-
if task.is_async:
52+
if task.is_async: # only for workflows at this stage and the foreseeable
53+
# These jobs are run in the primary process but farm out the workflows jobs
5254
return await task.run_async(rerun=rerun)
5355
else:
54-
return task.run(rerun=rerun)
56+
return self.run(task=task, rerun=rerun)
5557

5658
def close(self):
5759
"""Close this worker."""
@@ -175,7 +177,8 @@ class ConcurrentFuturesWorker(Worker):
175177
plugin_name = "cf"
176178

177179
n_procs: int
178-
loop: cf.ProcessPoolExecutor
180+
loop: asyncio.AbstractEventLoop
181+
pool: cf.ProcessPoolExecutor
179182

180183
def __init__(self, n_procs: int | None = None):
181184
"""Initialize Worker."""
@@ -193,13 +196,12 @@ async def run(
193196
) -> "Result":
194197
"""Run a task."""
195198
assert self.loop, "No event loop available to submit tasks"
196-
task_pkl = cp.dumps(task)
197199
return await self.loop.run_in_executor(
198-
self.pool, self.unpickle_and_run, task_pkl, rerun
200+
self.pool, self.uncloudpickle_and_run, cp.dumps(task), rerun
199201
)
200202

201203
@classmethod
202-
def unpickle_and_run(cls, task_pkl: Path, rerun: bool) -> "Result":
204+
def uncloudpickle_and_run(cls, task_pkl: bytes, rerun: bool) -> "Result":
203205
"""Unpickle and run a task."""
204206
task: Task[DefType] = cp.loads(task_pkl)
205207
return task.run(rerun=rerun)

0 commit comments

Comments
 (0)