Skip to content

Commit 7b1a2f4

Browse files
committed
First attempt of a SGE test
1 parent 75ce23d commit 7b1a2f4

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from nipype.pipeline.plugins.base import SGELikeBatchManagerBase
2+
from nipype.interfaces.utility import Function
3+
import nipype.pipeline.engine as pe
4+
from os.path import join
5+
import os
6+
from glob import glob
7+
import pytest
8+
from mock import patch
9+
from tempfile import TemporaryDirectory
10+
import subprocess
11+
12+
def crasher():
13+
raise ValueError()
14+
15+
16+
def submit_batchtask(self, scriptfile, node):
17+
self._pending[1] = node.output_dir()
18+
subprocess.call(["bash", scriptfile])
19+
return 1
20+
21+
def is_pending(taskid):
22+
return False
23+
24+
25+
@patch.object(SGELikeBatchManagerBase, '_submit_batchtask', new=submit_batchtask)
26+
@patch.object(SGELikeBatchManagerBase, '_is_pending', new=is_pending)
27+
def test_crashfile_creation():
28+
cur_dir = os.getcwd()
29+
with TemporaryDirectory(prefix="test_engine_", dir=cur_dir) as tmpdirname:
30+
pipe = pe.Workflow(name="pipe", base_dir=tmpdirname)
31+
pipe.config["execution"]["crashdump_dir"] = tmpdirname
32+
pipe.add_nodes([pe.Node(interface=Function(function=crasher),
33+
name="crasher")])
34+
sgelike_plugin = SGELikeBatchManagerBase("")
35+
with pytest.raises(RuntimeError) as e:
36+
assert pipe.run(plugin=sgelike_plugin)
37+
assert (str(e.value) ==
38+
"Workflow did not execute cleanly. Check log for details")
39+
40+
crashfiles = glob(join(tmpdirname,"crash*crasher*.pklz"))
41+
assert len(crashfiles) == 1
42+
os.chdir(cur_dir)

0 commit comments

Comments
 (0)