Skip to content

Commit 45b5692

Browse files
committed
Merge remote-tracking branch 'upstream/master' into fix/bunchmapnode
* upstream/master: missed skipif fixed multiproc plugin deadlock problem and re-enabled tests && not valid in python... check to make sure keep_temporary_files is defined add auto generated spec test for changed file fix error accessing brainextraction input add new files to ants brain extraction output spec and list outputs
2 parents 134a3d4 + fa866eb commit 45b5692

File tree

10 files changed

+157
-55
lines changed

10 files changed

+157
-55
lines changed

circle.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ test:
4949
- docker run -v /etc/localtime:/etc/localtime:ro -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_dartel Linear /root/examples/ l2pipeline :
5050
timeout: 1600
5151
- docker run -v /etc/localtime:/etc/localtime:ro -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_fsl_reuse Linear /root/examples/ level1_workflow
52-
# Disabled until https://github.com/nipy/nipype/issues/1692 is resolved
53-
# - docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
52+
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
5453
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
5554
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ l2pipeline
5655

nipype/interfaces/ants/segmentation.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,22 @@ class BrainExtractionInputSpec(ANTSCommandInputSpec):
655655
class BrainExtractionOutputSpec(TraitedSpec):
656656
BrainExtractionMask = File(exists=True, desc='brain extraction mask')
657657
BrainExtractionBrain = File(exists=True, desc='brain extraction image')
658+
BrainExtractionCSF = File(exists=True, desc='segmentation mask with only CSF')
659+
BrainExtractionGM = File(exists=True, desc='segmentation mask with only grey matter')
660+
BrainExtractionInitialAffine = File(exists=True, desc='')
661+
BrainExtractionInitialAffineFixed = File(exists=True, desc='')
662+
BrainExtractionInitialAffineMoving = File(exists=True, desc='')
663+
BrainExtractionLaplacian = File(exists=True, desc='')
664+
BrainExtractionPrior0GenericAffine = File(exists=True, desc='')
665+
BrainExtractionPrior1InverseWarp = File(exists=True, desc='')
666+
BrainExtractionPrior1Warp = File(exists=True, desc='')
667+
BrainExtractionPriorWarped = File(exists=True, desc='')
668+
BrainExtractionSegmentation = File(exists=True, desc='segmentation mask with CSF, GM, and WM')
669+
BrainExtractionTemplateLaplacian = File(exists=True, desc='')
670+
BrainExtractionTmp = File(exists=True, desc='')
671+
BrainExtractionWM = File(exists=True, desc='segmenration mask with only white matter')
672+
N4Corrected0 = File(exists=True, desc='N4 bias field corrected image')
673+
N4Truncated0 = File(exists=True, desc='')
658674

659675

660676
class BrainExtraction(ANTSCommand):
@@ -685,6 +701,72 @@ def _list_outputs(self):
685701
self.inputs.out_prefix +
686702
'BrainExtractionBrain.' +
687703
self.inputs.image_suffix)
704+
if isdefined(self.inputs.keep_temporary_files) and self.inputs.keep_temporary_files != 0:
705+
outputs['BrainExtractionCSF'] = os.path.join(
706+
os.getcwd(),
707+
self.inputs.out_prefix + 'BrainExtractionCSF.' + self.inputs.image_suffix
708+
)
709+
outputs['BrainExtractionGM'] = os.path.join(
710+
os.getcwd(),
711+
self.inputs.out_prefix + 'BrainExtractionGM.' + self.inputs.image_suffix
712+
)
713+
outputs['BrainExtractionInitialAffine'] = os.path.join(
714+
os.getcwd(),
715+
self.inputs.out_prefix +'BrainExtractionInitialAffine.mat'
716+
)
717+
outputs['BrainExtractionInitialAffineFixed'] = os.path.join(
718+
os.getcwd(),
719+
self.inputs.out_prefix + 'BrainExtractionInitialAffineFixed.' + self.inputs.image_suffix
720+
)
721+
outputs['BrainExtractionInitialAffineMoving'] = os.path.join(
722+
os.getcwd(),
723+
self.inputs.out_prefix + 'BrainExtractionInitialAffineMoving.' + self.inputs.image_suffix
724+
)
725+
outputs['BrainExtractionLaplacian'] = os.path.join(
726+
os.getcwd(),
727+
self.inputs.out_prefix + 'BrainExtractionLaplacian.' + self.inputs.image_suffix
728+
)
729+
outputs['BrainExtractionPrior0GenericAffine'] = os.path.join(
730+
os.getcwd(),
731+
self.inputs.out_prefix + 'BrainExtractionPrior0GenericAffine.mat'
732+
)
733+
outputs['BrainExtractionPrior1InverseWarp'] = os.path.join(
734+
os.getcwd(),
735+
self.inputs.out_prefix + 'BrainExtractionPrior1InverseWarp.' + self.inputs.image_suffix
736+
)
737+
outputs['BrainExtractionPrior1Warp'] = os.path.join(
738+
os.getcwd(),
739+
self.inputs.out_prefix + 'BrainExtractionPrior1Warp.' + self.inputs.image_suffix
740+
)
741+
outputs['BrainExtractionPriorWarped'] = os.path.join(
742+
os.getcwd(),
743+
self.inputs.out_prefix + 'BrainExtractionPriorWarped.' + self.inputs.image_suffix
744+
)
745+
outputs['BrainExtractionSegmentation'] = os.path.join(
746+
os.getcwd(),
747+
self.inputs.out_prefix + 'BrainExtractionSegmentation.' + self.inputs.image_suffix
748+
)
749+
outputs['BrainExtractionTemplateLaplacian'] = os.path.join(
750+
os.getcwd(),
751+
self.inputs.out_prefix + 'BrainExtractionTemplateLaplacian.' + self.inputs.image_suffix
752+
)
753+
outputs['BrainExtractionTmp'] = os.path.join(
754+
os.getcwd(),
755+
self.inputs.out_prefix + 'BrainExtractionTmp.' + self.inputs.image_suffix
756+
)
757+
outputs['BrainExtractionWM'] = os.path.join(
758+
os.getcwd(),
759+
self.inputs.out_prefix + 'BrainExtractionWM.' + self.inputs.image_suffix
760+
)
761+
outputs['N4Corrected0'] = os.path.join(
762+
os.getcwd(),
763+
self.inputs.out_prefix + 'N4Corrected0.' + self.inputs.image_suffix
764+
)
765+
outputs['N4Truncated0'] = os.path.join(
766+
os.getcwd(),
767+
self.inputs.out_prefix + 'N4Truncated0.' + self.inputs.image_suffix
768+
)
769+
688770
return outputs
689771

690772

nipype/interfaces/ants/tests/test_auto_BrainExtraction.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,23 @@ def test_BrainExtraction_inputs():
5555

5656
def test_BrainExtraction_outputs():
5757
output_map = dict(BrainExtractionBrain=dict(),
58+
BrainExtractionCSF=dict(),
59+
BrainExtractionGM=dict(),
60+
BrainExtractionInitialAffine=dict(),
61+
BrainExtractionInitialAffineFixed=dict(),
62+
BrainExtractionInitialAffineMoving=dict(),
63+
BrainExtractionLaplacian=dict(),
5864
BrainExtractionMask=dict(),
65+
BrainExtractionPrior0GenericAffine=dict(),
66+
BrainExtractionPrior1InverseWarp=dict(),
67+
BrainExtractionPrior1Warp=dict(),
68+
BrainExtractionPriorWarped=dict(),
69+
BrainExtractionSegmentation=dict(),
70+
BrainExtractionTemplateLaplacian=dict(),
71+
BrainExtractionTmp=dict(),
72+
BrainExtractionWM=dict(),
73+
N4Corrected0=dict(),
74+
N4Truncated0=dict(),
5975
)
6076
outputs = BrainExtraction.output_spec()
6177

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ def _use_gb_ram(num_gb):
119119

120120

121121
# Test case for the run function
122-
@pytest.mark.skipif(sys.version_info < (3, 0),
123-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
124122
class TestRuntimeProfiler():
125123
'''
126124
This class is a test case for the runtime profiler

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -626,8 +626,6 @@ def func1(in1):
626626
assert not error_raised
627627

628628

629-
@pytest.mark.skipif(sys.version_info < (3, 0),
630-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
631629
def test_serial_input(tmpdir):
632630
wd = str(tmpdir)
633631
os.chdir(wd)

nipype/pipeline/plugins/base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,17 @@ def run(self, graph, config, updatehash=False):
272272
self._remove_node_dirs()
273273
report_nodes_not_run(notrun)
274274

275-
275+
# close any open resources
276+
self._close()
276277

277278
def _wait(self):
278279
sleep(float(self._config['execution']['poll_sleep_duration']))
279280

281+
def _close(self):
282+
# close any open resources, this could raise NotImplementedError
283+
# but I didn't want to break other plugins
284+
return True
285+
280286
def _get_result(self, taskid):
281287
raise NotImplementedError
282288

nipype/pipeline/plugins/multiproc.py

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
# Import packages
1313
from multiprocessing import Process, Pool, cpu_count, pool
14+
import threading
1415
from traceback import format_exception
1516
import sys
1617

@@ -20,14 +21,13 @@
2021
from ... import logging, config
2122
from ...utils.misc import str2bool
2223
from ..engine import MapNode
23-
from ..plugins import semaphore_singleton
2424
from .base import (DistributedPluginBase, report_crash)
2525

2626
# Init logger
2727
logger = logging.getLogger('workflow')
2828

2929
# Run node
30-
def run_node(node, updatehash):
30+
def run_node(node, updatehash, taskid):
3131
"""Function to execute node.run(), catch and log any errors and
3232
return the result dictionary
3333
@@ -45,7 +45,7 @@ def run_node(node, updatehash):
4545
"""
4646

4747
# Init variables
48-
result = dict(result=None, traceback=None)
48+
result = dict(result=None, traceback=None, taskid=taskid)
4949

5050
# Try and execute the node via node.run()
5151
try:
@@ -77,10 +77,6 @@ class NonDaemonPool(pool.Pool):
7777
Process = NonDaemonProcess
7878

7979

80-
def release_lock(args):
81-
semaphore_singleton.semaphore.release()
82-
83-
8480
# Get total system RAM
8581
def get_system_total_memory_gb():
8682
"""Function to get the total RAM of the running system in GB
@@ -136,12 +132,18 @@ def __init__(self, plugin_args=None):
136132
# Init variables and instance attributes
137133
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
138134
self._taskresult = {}
135+
self._task_obj = {}
139136
self._taskid = 0
140137
non_daemon = True
141138
self.plugin_args = plugin_args
142139
self.processors = cpu_count()
143140
self.memory_gb = get_system_total_memory_gb()*0.9 # 90% of system memory
144141

142+
self._timeout=2.0
143+
self._event = threading.Event()
144+
145+
146+
145147
# Check plugin args
146148
if self.plugin_args:
147149
if 'non_daemon' in self.plugin_args:
@@ -150,6 +152,9 @@ def __init__(self, plugin_args=None):
150152
self.processors = self.plugin_args['n_procs']
151153
if 'memory_gb' in self.plugin_args:
152154
self.memory_gb = self.plugin_args['memory_gb']
155+
156+
logger.debug("MultiProcPlugin starting %d threads in pool"%(self.processors))
157+
153158
# Instantiate different thread pools for non-daemon processes
154159
if non_daemon:
155160
# run the execution using the non-daemon pool subclass
@@ -159,14 +164,23 @@ def __init__(self, plugin_args=None):
159164

160165
def _wait(self):
161166
if len(self.pending_tasks) > 0:
162-
semaphore_singleton.semaphore.acquire()
167+
if self._config['execution']['poll_sleep_duration']:
168+
self._timeout = float(self._config['execution']['poll_sleep_duration'])
169+
sig_received=self._event.wait(self._timeout)
170+
if not sig_received:
171+
logger.debug('MultiProcPlugin timeout before signal received. Deadlock averted??')
172+
self._event.clear()
173+
174+
def _async_callback(self, args):
175+
self._taskresult[args['taskid']]=args
176+
self._event.set()
163177

164178
def _get_result(self, taskid):
165179
if taskid not in self._taskresult:
166-
raise RuntimeError('Multiproc task %d not found' % taskid)
167-
if not self._taskresult[taskid].ready():
168-
return None
169-
return self._taskresult[taskid].get()
180+
result=None
181+
else:
182+
result=self._taskresult[taskid]
183+
return result
170184

171185
def _report_crash(self, node, result=None):
172186
if result and result['traceback']:
@@ -178,36 +192,50 @@ def _report_crash(self, node, result=None):
178192
return report_crash(node)
179193

180194
def _clear_task(self, taskid):
181-
del self._taskresult[taskid]
195+
del self._task_obj[taskid]
182196

183197
def _submit_job(self, node, updatehash=False):
184198
self._taskid += 1
185199
if hasattr(node.inputs, 'terminal_output'):
186200
if node.inputs.terminal_output == 'stream':
187201
node.inputs.terminal_output = 'allatonce'
188202

189-
self._taskresult[self._taskid] = \
203+
self._task_obj[self._taskid] = \
190204
self.pool.apply_async(run_node,
191-
(node, updatehash),
192-
callback=release_lock)
205+
(node, updatehash, self._taskid),
206+
callback=self._async_callback)
193207
return self._taskid
194208

209+
def _close(self):
210+
self.pool.close()
211+
return True
212+
195213
def _send_procs_to_workers(self, updatehash=False, graph=None):
196214
""" Sends jobs to workers when system resources are available.
197215
Check memory (gb) and cores usage before running jobs.
198216
"""
199217
executing_now = []
200218

201219
# Check to see if a job is available
202-
jobids = np.flatnonzero((self.proc_pending == True) & \
220+
currently_running_jobids = np.flatnonzero((self.proc_pending == True) & \
203221
(self.depidx.sum(axis=0) == 0).__array__())
204222

205223
# Check available system resources by summing all threads and memory used
206224
busy_memory_gb = 0
207225
busy_processors = 0
208-
for jobid in jobids:
209-
busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
210-
busy_processors += self.procs[jobid]._interface.num_threads
226+
for jobid in currently_running_jobids:
227+
if self.procs[jobid]._interface.estimated_memory_gb <= self.memory_gb and \
228+
self.procs[jobid]._interface.num_threads <= self.processors:
229+
230+
busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
231+
busy_processors += self.procs[jobid]._interface.num_threads
232+
233+
else:
234+
raise ValueError("Resources required by jobid %d (%f GB, %d threads)"
235+
"exceed what is available on the system (%f GB, %d threads)"%(jobid,
236+
self.procs[jobid].__interface.estimated_memory_gb,
237+
self.procs[jobid].__interface.num_threads,
238+
self.memory_gb,self.processors))
211239

212240
free_memory_gb = self.memory_gb - busy_memory_gb
213241
free_processors = self.processors - busy_processors
@@ -271,8 +299,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
271299
hash_exists, _, _, _ = self.procs[
272300
jobid].hash_exists()
273301
logger.debug('Hash exists %s' % str(hash_exists))
274-
if (hash_exists and (self.procs[jobid].overwrite == False or \
275-
(self.procs[jobid].overwrite == None and \
302+
if (hash_exists and (self.procs[jobid].overwrite == False or
303+
(self.procs[jobid].overwrite == None and
276304
not self.procs[jobid]._interface.always_run))):
277305
self._task_finished_cb(jobid)
278306
self._remove_node_dirs()
@@ -299,7 +327,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
299327
self._remove_node_dirs()
300328

301329
else:
302-
logger.debug('submitting %s' % str(jobid))
330+
logger.debug('MultiProcPlugin submitting %s' % str(jobid))
303331
tid = self._submit_job(deepcopy(self.procs[jobid]),
304332
updatehash=updatehash)
305333
if tid is None:

nipype/pipeline/plugins/tests/test_callback.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ def test_callback_exception(tmpdir):
6464
assert so.statuses[0][1] == 'start'
6565
assert so.statuses[1][1] == 'exception'
6666

67-
68-
@pytest.mark.skipif(sys.version_info < (3, 0),
69-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
7067
def test_callback_multiproc_normal(tmpdir):
7168
so = Status()
7269
wf = pe.Workflow(name='test', base_dir=str(tmpdir))
@@ -83,9 +80,6 @@ def test_callback_multiproc_normal(tmpdir):
8380
assert so.statuses[0][1] == 'start'
8481
assert so.statuses[1][1] == 'end'
8582

86-
87-
@pytest.mark.skipif(sys.version_info < (3, 0),
88-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
8983
def test_callback_multiproc_exception(tmpdir):
9084
so = Status()
9185
wf = pe.Workflow(name='test', base_dir=str(tmpdir))

nipype/pipeline/plugins/tests/test_multiproc.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ def _list_outputs(self):
3232
outputs['output1'] = [1, self.inputs.input1]
3333
return outputs
3434

35-
36-
@pytest.mark.skipif(sys.version_info < (3, 0),
37-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
3835
def test_run_multiproc(tmpdir):
3936
os.chdir(str(tmpdir))
4037

@@ -118,9 +115,6 @@ def find_metrics(nodes, last_node):
118115

119116
return total_memory, total_threads
120117

121-
122-
@pytest.mark.skipif(sys.version_info < (3, 0),
123-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
124118
def test_no_more_memory_than_specified():
125119
LOG_FILENAME = 'callback.log'
126120
my_logger = logging.getLogger('callback')
@@ -179,10 +173,6 @@ def test_no_more_memory_than_specified():
179173

180174
os.remove(LOG_FILENAME)
181175

182-
183-
@pytest.mark.skipif(sys.version_info < (3, 0),
184-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
185-
@pytest.mark.skipif(nib.runtime_profile == False, reason="runtime_profile=False")
186176
def test_no_more_threads_than_specified():
187177
LOG_FILENAME = 'callback.log'
188178
my_logger = logging.getLogger('callback')

0 commit comments

Comments
 (0)