Skip to content

Commit 58c90c4

Browse files
committed
enh: better error handling
1 parent 7244d9c commit 58c90c4

File tree

7 files changed

+37
-20
lines changed

7 files changed

+37
-20
lines changed

nipype/interfaces/utility/tests/test_wrappers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def should_fail(tmp):
7373

7474

7575
def test_should_fail(tmpdir):
76-
with pytest.raises(NameError):
76+
with pytest.raises(pe.nodes.NodeExecutionError):
7777
should_fail(tmpdir)
7878

7979

nipype/pipeline/engine/nodes.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@
6464
logger = logging.getLogger("nipype.workflow")
6565

6666

67+
class NodeExecutionError(RuntimeError):
68+
"""A nipype-specific name for exceptions when executing a Node."""
69+
70+
6771
class Node(EngineBase):
6872
"""
6973
Wraps interface objects for use in pipeline
@@ -582,7 +586,7 @@ def _get_inputs(self):
582586
logger.critical("%s", e)
583587

584588
if outputs is None:
585-
raise RuntimeError(
589+
raise NodeExecutionError(
586590
"""\
587591
Error populating the inputs of node "%s": the results file of the source node \
588592
(%s) does not contain any outputs."""
@@ -717,7 +721,7 @@ def _run_command(self, execute, copyfiles=True):
717721
# Write out command line as it happened
718722
(outdir / "command.txt").write_text(f"{result.runtime.cmdline}\n")
719723

720-
exc_tb = getattr(result, "traceback", None)
724+
exc_tb = getattr(result.runtime, "traceback", None)
721725

722726
if not exc_tb:
723727
# Clean working directory if no errors
@@ -743,7 +747,7 @@ def _run_command(self, execute, copyfiles=True):
743747
)
744748

745749
if exc_tb:
746-
raise RuntimeError(
750+
raise NodeExecutionError(
747751
f"Exception raised while executing Node {self.name}.\n\n{result.runtime.traceback}"
748752
)
749753

@@ -1267,7 +1271,7 @@ def _collate_results(self, nodes):
12671271
if code is not None:
12681272
msg += ["Subnode %d failed" % i]
12691273
msg += ["Error: %s" % str(code)]
1270-
raise Exception(
1274+
raise NodeExecutionError(
12711275
"Subnodes of node: %s failed:\n%s" % (self.name, "\n".join(msg))
12721276
)
12731277

nipype/pipeline/engine/tests/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def test_mapnode_crash(tmpdir):
183183
node.config = deepcopy(config._sections)
184184
node.config["execution"]["stop_on_first_crash"] = True
185185
node.base_dir = tmpdir.strpath
186-
with pytest.raises(TypeError):
186+
with pytest.raises(pe.nodes.NodeExecutionError):
187187
node.run()
188188
os.chdir(cwd)
189189

nipype/pipeline/plugins/base.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def run(self, graph, config, updatehash=False):
123123
self.mapnodesubids = {}
124124
# setup polling - TODO: change to threaded model
125125
notrun = []
126+
errors = []
126127

127128
old_progress_stats = None
128129
old_presub_stats = None
@@ -155,14 +156,16 @@ def run(self, graph, config, updatehash=False):
155156
taskid, jobid = self.pending_tasks.pop()
156157
try:
157158
result = self._get_result(taskid)
158-
except Exception:
159+
except Exception as exc:
159160
notrun.append(self._clean_queue(jobid, graph))
161+
errors.append(exc)
160162
else:
161163
if result:
162164
if result["traceback"]:
163165
notrun.append(
164166
self._clean_queue(jobid, graph, result=result)
165167
)
168+
errors.append("".join(result["traceback"]))
166169
else:
167170
self._task_finished_cb(jobid)
168171
self._remove_node_dirs()
@@ -194,6 +197,13 @@ def run(self, graph, config, updatehash=False):
194197
# close any open resources
195198
self._postrun_check()
196199

200+
if errors:
201+
# If one or more nodes failed, re-rise first of them
202+
if isinstance(errors[0], str):
203+
raise RuntimeError(errors[0])
204+
205+
raise errors[0]
206+
197207
def _get_result(self, taskid):
198208
raise NotImplementedError
199209

nipype/pipeline/plugins/linear.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ def run(self, graph, config, updatehash=False):
3434
old_wd = os.getcwd()
3535
notrun = []
3636
donotrun = []
37+
stop_on_first_crash = str2bool(config["execution"]["stop_on_first_crash"])
38+
errors = []
3739
nodes, _ = topological_sort(graph)
3840
for node in nodes:
3941
endstatus = "end"
@@ -43,27 +45,28 @@ def run(self, graph, config, updatehash=False):
4345
if self._status_callback:
4446
self._status_callback(node, "start")
4547
node.run(updatehash=updatehash)
46-
except:
48+
except Exception as exc:
4749
endstatus = "exception"
4850
# bare except, but i really don't know where a
4951
# node might fail
5052
crashfile = report_crash(node)
51-
if str2bool(config["execution"]["stop_on_first_crash"]):
52-
raise
5353
# remove dependencies from queue
5454
subnodes = [s for s in dfs_preorder(graph, node)]
5555
notrun.append(
5656
{"node": node, "dependents": subnodes, "crashfile": crashfile}
5757
)
5858
donotrun.extend(subnodes)
5959
# Delay raising the crash until we cleaned the house
60-
if str2bool(config["execution"]["stop_on_first_crash"]):
61-
os.chdir(old_wd) # Return wherever we were before
62-
report_nodes_not_run(notrun) # report before raising
63-
raise
60+
errors.append(exc)
61+
62+
if stop_on_first_crash:
63+
break
6464
finally:
6565
if self._status_callback:
6666
self._status_callback(node, endstatus)
6767

6868
os.chdir(old_wd) # Return wherever we were before
6969
report_nodes_not_run(notrun)
70+
if errors:
71+
# Re-raise exception of first failed node
72+
raise errors[0]

nipype/pipeline/plugins/tests/test_sgelike.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ def test_crashfile_creation(tmp_path):
2929
sgelike_plugin = SGELikeBatchManagerBase("")
3030
with pytest.raises(RuntimeError) as e:
3131
assert pipe.run(plugin=sgelike_plugin)
32-
assert str(e.value) == "Workflow did not execute cleanly. Check log for details"
32+
assert str(e.value) == "Workflow did not execute cleanly. Check log for details"
3333

34-
crashfiles = tmp_path.glob("crash*crasher*.pklz")
35-
assert len(list(crashfiles)) == 1
34+
crashfiles = (
35+
list(tmp_path.glob("crash*crasher*.pklz"))
36+
+ list(tmp_path.glob("crash*crasher*.txt"))
37+
)
38+
assert len(crashfiles) == 1

nipype/pipeline/plugins/tools.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ def report_nodes_not_run(notrun):
9393
for subnode in info["dependents"]:
9494
logger.debug(subnode._id)
9595
logger.info("***********************************")
96-
raise RuntimeError(
97-
("Workflow did not execute cleanly. " "Check log for details")
98-
)
9996

10097

10198
def create_pyscript(node, updatehash=False, store_exception=True):

0 commit comments

Comments
 (0)