Skip to content

Commit cb6ef02

Browse files
committed
fix tests
1 parent a503fc8 commit cb6ef02

File tree

11 files changed

+246
-185
lines changed

11 files changed

+246
-185
lines changed

nipype/interfaces/base.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,8 +765,6 @@ def __init__(self, from_file=None, resource_monitor=None, **inputs):
765765
self.__class__.__name__)
766766

767767
self.inputs = self.input_spec(**inputs)
768-
self.estimated_memory_gb = 0.25
769-
self.num_threads = 1
770768

771769
if resource_monitor is not None:
772770
self.resource_monitor = resource_monitor

nipype/pipeline/engine/nodes.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class Node(EngineBase):
7878

7979
def __init__(self, interface, name, iterables=None, itersource=None,
8080
synchronize=False, overwrite=None, needed_outputs=None,
81-
run_without_submitting=False, n_procs=1, mem_gb=None,
81+
run_without_submitting=False, n_procs=1, mem_gb=0.25,
8282
**kwargs):
8383
"""
8484
Parameters
@@ -169,9 +169,8 @@ def __init__(self, interface, name, iterables=None, itersource=None,
169169
self.needed_outputs = []
170170
self.plugin_args = {}
171171

172-
self._interface.num_threads = n_procs
173-
if mem_gb is not None:
174-
self._interface.estimated_memory_gb = mem_gb
172+
self._n_procs = n_procs
173+
self._mem_gb = mem_gb
175174

176175
if needed_outputs:
177176
self.needed_outputs = sorted(needed_outputs)
@@ -270,6 +269,7 @@ def run(self, updatehash=False):
270269
Update the hash stored in the output directory
271270
"""
272271
# check to see if output directory and hash exist
272+
273273
if self.config is None:
274274
self.config = deepcopy(config._sections)
275275
else:
@@ -685,6 +685,24 @@ def _copyfiles_to_wd(self, outdir, execute, linksonly=False):
685685
if execute and linksonly:
686686
rmtree(outdir)
687687

688+
def get_mem_gb(self):
689+
"""Get estimated memory (GB)"""
690+
if hasattr(self._interface, 'estimated_memory_gb'):
691+
self._mem_gb = self._interface.estimated_memory_gb
692+
logger.warning('Setting "estimated_memory_gb" on Interfaces has been '
693+
'deprecated as of nipype 1.0')
694+
del self._interface.estimated_memory_gb
695+
return self._mem_gb
696+
697+
def get_n_procs(self):
698+
"""Get estimated number of processes"""
699+
if hasattr(self._interface, 'num_threads'):
700+
self._n_procs = self._interface.num_threads
701+
logger.warning('Setting "num_threads" on Interfaces has been '
702+
'deprecated as of nipype 1.0')
703+
del self._interface.num_threads
704+
return self._n_procs
705+
688706
def update(self, **opts):
689707
self.inputs.update(**opts)
690708

@@ -1111,8 +1129,8 @@ def _make_nodes(self, cwd=None):
11111129
for i in range(nitems):
11121130
nodename = '_' + self.name + str(i)
11131131
node = Node(deepcopy(self._interface),
1114-
n_procs=self._interface.num_threads,
1115-
mem_gb=self._interface.estimated_memory_gb,
1132+
n_procs=self.get_n_procs(),
1133+
mem_gb=self.get_mem_gb(),
11161134
overwrite=self.overwrite,
11171135
needed_outputs=self.needed_outputs,
11181136
run_without_submitting=self.run_without_submitting,

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -513,17 +513,17 @@ def func1(in1):
513513

514514
mapnode = MapNode(Function(function=func1),
515515
iterfield='in1',
516-
name='mapnode')
516+
name='mapnode',
517+
n_procs=2,
518+
mem_gb=2)
517519
mapnode.inputs.in1 = [1, 2]
518-
mapnode.interface.num_threads = 2
519-
mapnode.interface.estimated_memory_gb = 2
520520

521521
for idx, node in mapnode._make_nodes():
522522
for attr in ('overwrite', 'run_without_submitting', 'plugin_args'):
523523
assert getattr(node, attr) == getattr(mapnode, attr)
524-
for attr in ('num_threads', 'estimated_memory_gb'):
525-
assert (getattr(node._interface, attr) ==
526-
getattr(mapnode._interface, attr))
524+
for attr in ('_n_procs', '_mem_gb'):
525+
assert (getattr(node, attr) ==
526+
getattr(mapnode, attr))
527527

528528

529529
def test_node_hash(tmpdir):

nipype/pipeline/plugins/base.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def __init__(self, plugin_args=None):
8080
self.mapnodesubids = None
8181
self.proc_done = None
8282
self.proc_pending = None
83+
self.pending_tasks = []
8384
self.max_jobs = self.plugin_args.get('max_jobs', np.inf)
8485

8586
def _prerun_check(self, graph):
@@ -95,8 +96,6 @@ def run(self, graph, config, updatehash=False):
9596
self._prerun_check(graph)
9697
# Generate appropriate structures for worker-manager model
9798
self._generate_dependency_list(graph)
98-
self.pending_tasks = []
99-
self.readytorun = []
10099
self.mapnodes = []
101100
self.mapnodesubids = {}
102101
# setup polling - TODO: change to threaded model
@@ -110,6 +109,11 @@ def run(self, graph, config, updatehash=False):
110109
taskid, jobid = self.pending_tasks.pop()
111110
try:
112111
result = self._get_result(taskid)
112+
except Exception:
113+
notrun.append(self._clean_queue(
114+
jobid, graph, result={'result': None,
115+
'traceback': format_exc()}))
116+
else:
113117
if result:
114118
if result['traceback']:
115119
notrun.append(self._clean_queue(jobid, graph,
@@ -120,11 +124,6 @@ def run(self, graph, config, updatehash=False):
120124
self._clear_task(taskid)
121125
else:
122126
toappend.insert(0, (taskid, jobid))
123-
except Exception:
124-
result = {'result': None,
125-
'traceback': format_exc()}
126-
notrun.append(self._clean_queue(jobid, graph,
127-
result=result))
128127

129128
if toappend:
130129
self.pending_tasks.extend(toappend)
@@ -169,12 +168,15 @@ def _clear_task(self, taskid):
169168
raise NotImplementedError
170169

171170
def _clean_queue(self, jobid, graph, result=None):
171+
logger.info('Clearing %d from queue', jobid)
172+
173+
if self._status_callback:
174+
self._status_callback(self.procs[jobid], 'exception')
175+
172176
if str2bool(self._config['execution']['stop_on_first_crash']):
173177
raise RuntimeError("".join(result['traceback']))
174178
crashfile = self._report_crash(self.procs[jobid],
175179
result=result)
176-
if self._status_callback:
177-
self._status_callback(self.procs[jobid], 'exception')
178180
if jobid in self.mapnodesubids:
179181
# remove current jobid
180182
self.proc_pending[jobid] = False

0 commit comments

Comments
 (0)