Skip to content

Commit 43c0d56

Browse files
committed
remove MultiProc, MultiprocPlugin is default
1 parent a3c9be7 commit 43c0d56

File tree

9 files changed

+46
-82
lines changed

9 files changed

+46
-82
lines changed

nipype/interfaces/ants/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# -Using -1 gives primary responsibilty to ITKv4 to do the correct
1313
# thread limitings.
1414
# -Using 1 takes a very conservative approach to avoid overloading
15-
# the computer (when running MultiProc) by forcing everything to
15+
# the computer (when running ResourceMultiProc) by forcing everything to
1616
# single threaded. This can be a severe penalty for registration
1717
# performance.
1818
LOCAL_DEFAULT_NUMBER_OF_THREADS = 1

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -714,16 +714,15 @@ def func1(in1):
714714
# set local check
715715
w1.config['execution'] = {'stop_on_first_crash': 'true',
716716
'local_hash_check': 'true',
717-
'crashdump_dir': wd,
718-
'poll_sleep_duration': 2}
717+
'crashdump_dir': wd}
719718

720719
# test output of num_subnodes method when serial is default (False)
721720
yield assert_equal, n1.num_subnodes(), len(n1.inputs.in1)
722721

723722
# test running the workflow on default conditions
724723
error_raised = False
725724
try:
726-
w1.run(plugin='MultiProc')
725+
w1.run(plugin='ResourceMultiProc')
727726
except Exception as e:
728727
from nipype.pipeline.engine.base import logger
729728
logger.info('Exception: %s' % str(e))
@@ -737,7 +736,7 @@ def func1(in1):
737736
# test running the workflow on serial conditions
738737
error_raised = False
739738
try:
740-
w1.run(plugin='MultiProc')
739+
w1.run(plugin='ResourceMultiProc')
741740
except Exception as e:
742741
from nipype.pipeline.engine.base import logger
743742
logger.info('Exception: %s' % str(e))

nipype/pipeline/engine/tests/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def test_function3(arg):
214214

215215
out_dir = mkdtemp()
216216

217-
for plugin in ('Linear',): # , 'MultiProc'):
217+
for plugin in ('Linear',): # , 'ResourceMultiProc'):
218218
n1 = pe.Node(niu.Function(input_names=['arg1'],
219219
output_names=['out_file1', 'out_file2', 'dir'],
220220
function=test_function),

nipype/pipeline/plugins/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from .sge import SGEPlugin
1010
from .condor import CondorPlugin
1111
from .dagman import CondorDAGManPlugin
12-
from .multiproc import MultiProcPlugin
1312
from .multiproc import ResourceMultiProcPlugin
1413
from .ipython import IPythonPlugin
1514
from .somaflow import SomaFlowPlugin

nipype/pipeline/plugins/multiproc.py

Lines changed: 34 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -76,68 +76,6 @@ class NonDaemonPool(pool.Pool):
7676
"""
7777
Process = NonDaemonProcess
7878

79-
80-
class MultiProcPlugin(DistributedPluginBase):
81-
"""Execute workflow with multiprocessing
82-
83-
The plugin_args input to run can be used to control the multiprocessing
84-
execution. Currently supported options are:
85-
86-
- n_procs : number of processes to use
87-
- non_daemon : boolean flag to execute as non-daemon processes
88-
89-
"""
90-
91-
def __init__(self, plugin_args=None):
92-
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
93-
self._taskresult = {}
94-
self._taskid = 0
95-
non_daemon = True
96-
n_procs = cpu_count()
97-
if plugin_args:
98-
if 'n_procs' in plugin_args:
99-
n_procs = plugin_args['n_procs']
100-
if 'non_daemon' in plugin_args:
101-
non_daemon = plugin_args['non_daemon']
102-
if non_daemon:
103-
# run the execution using the non-daemon pool subclass
104-
self.pool = NonDaemonPool(processes=n_procs)
105-
else:
106-
self.pool = Pool(processes=n_procs)
107-
108-
109-
def _get_result(self, taskid):
110-
if taskid not in self._taskresult:
111-
raise RuntimeError('Multiproc task %d not found' % taskid)
112-
if not self._taskresult[taskid].ready():
113-
return None
114-
return self._taskresult[taskid].get()
115-
116-
def _submit_job(self, node, updatehash=False):
117-
self._taskid += 1
118-
try:
119-
if node.inputs.terminal_output == 'stream':
120-
node.inputs.terminal_output = 'allatonce'
121-
except:
122-
pass
123-
self._taskresult[self._taskid] = self.pool.apply_async(run_node, (node,
124-
updatehash,))
125-
return self._taskid
126-
127-
def _report_crash(self, node, result=None):
128-
if result and result['traceback']:
129-
node._result = result['result']
130-
node._traceback = result['traceback']
131-
return report_crash(node,
132-
traceback=result['traceback'])
133-
else:
134-
return report_crash(node)
135-
136-
def _clear_task(self, taskid):
137-
del self._taskresult[taskid]
138-
139-
140-
14179
import numpy as np
14280
from copy import deepcopy
14381
from ..engine import (MapNode, str2bool)
@@ -150,8 +88,8 @@ def _clear_task(self, taskid):
15088
def release_lock(args):
15189
semaphore_singleton.semaphore.release()
15290

153-
class ResourceMultiProcPlugin(MultiProcPlugin):
154-
"""Execute workflow with multiprocessing not sending more jobs at once
91+
class ResourceMultiProcPlugin(DistributedPluginBase):
92+
"""Execute workflow with multiprocessing, not sending more jobs at once
15593
than the system can support.
15694
15795
The plugin_args input to run can be used to control the multiprocessing
@@ -167,29 +105,61 @@ class ResourceMultiProcPlugin(MultiProcPlugin):
167105
168106
Currently supported options are:
169107
108+
- non_daemon : boolean flag to execute as non-daemon processes
170109
- num_threads: maximum number of threads to be executed in parallel
171110
- estimated_memory: maximum memory that can be used at once.
172111
173112
"""
174113

175114
def __init__(self, plugin_args=None):
176115
super(ResourceMultiProcPlugin, self).__init__(plugin_args=plugin_args)
116+
self._taskresult = {}
117+
self._taskid = 0
118+
non_daemon = True
177119
self.plugin_args = plugin_args
178120
self.processors = cpu_count()
179121
memory = psutil.virtual_memory()
180122
self.memory = memory.total / (1024*1024*1024)
181123
if self.plugin_args:
124+
if 'non_daemon' in self.plugin_args:
125+
non_daemon = plugin_args['non_daemon']
182126
if 'n_procs' in self.plugin_args:
183127
self.processors = self.plugin_args['n_procs']
184128
if 'memory' in self.plugin_args:
185129
self.memory = self.plugin_args['memory']
186130

131+
if non_daemon:
132+
# run the execution using the non-daemon pool subclass
133+
self.pool = NonDaemonPool(processes=n_procs)
134+
else:
135+
self.pool = Pool(processes=n_procs)
136+
187137
def _wait(self):
188138
if len(self.pending_tasks) > 0:
189139
semaphore_singleton.semaphore.acquire()
190140
semaphore_singleton.semaphore.release()
191141

192142

143+
def _get_result(self, taskid):
144+
if taskid not in self._taskresult:
145+
raise RuntimeError('Multiproc task %d not found' % taskid)
146+
if not self._taskresult[taskid].ready():
147+
return None
148+
return self._taskresult[taskid].get()
149+
150+
151+
def _report_crash(self, node, result=None):
152+
if result and result['traceback']:
153+
node._result = result['result']
154+
node._traceback = result['traceback']
155+
return report_crash(node,
156+
traceback=result['traceback'])
157+
else:
158+
return report_crash(node)
159+
160+
def _clear_task(self, taskid):
161+
del self._taskresult[taskid]
162+
193163
def _submit_job(self, node, updatehash=False):
194164
self._taskid += 1
195165
try:

nipype/pipeline/plugins/tests/test_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ def func(arg1):
3838
wf.add_nodes([funkynode])
3939
wf.base_dir = '/tmp'
4040
41-
wf.run(plugin='MultiProc')
41+
wf.run(plugin='ResourceMultiProc')
4242
'''

nipype/pipeline/plugins/tests/test_callback.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ def test_callback_multiproc_normal():
7575
name='f_node')
7676
wf.add_nodes([f_node])
7777
wf.config['execution']['crashdump_dir'] = wf.base_dir
78-
wf.config['execution']['poll_sleep_duration'] = 2
79-
wf.run(plugin='MultiProc', plugin_args={'status_callback': so.callback})
78+
wf.run(plugin='ResourceMultiProc', plugin_args={'status_callback': so.callback})
8079
assert_equal(len(so.statuses), 2)
8180
for (n, s) in so.statuses:
8281
yield assert_equal, n.name, 'f_node'
@@ -93,9 +92,8 @@ def test_callback_multiproc_exception():
9392
name='f_node')
9493
wf.add_nodes([f_node])
9594
wf.config['execution']['crashdump_dir'] = wf.base_dir
96-
wf.config['execution']['poll_sleep_duration'] = 2
9795
try:
98-
wf.run(plugin='MultiProc',
96+
wf.run(plugin='ResourceMultiProc',
9997
plugin_args={'status_callback': so.callback})
10098
except:
10199
pass

nipype/pipeline/plugins/tests/test_multiproc.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ def test_run_multiproc():
4343
pipe.connect([(mod1, mod2, [('output1', 'input1')])])
4444
pipe.base_dir = os.getcwd()
4545
mod1.inputs.input1 = 1
46-
pipe.config['execution']['poll_sleep_duration'] = 2
47-
execgraph = pipe.run(plugin="MultiProc")
46+
execgraph = pipe.run(plugin="ResourceMultiProc")
4847
names = ['.'.join((node._hierarchy, node.name)) for node in execgraph.nodes()]
4948
node = execgraph.nodes()[names.index('pipe.mod1')]
5049
result = node.get_output('output1')

nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def dummyFunction(filename):
8484

8585
def run_multiproc_nondaemon_with_flag(nondaemon_flag):
8686
'''
87-
Start a pipe with two nodes using the multiproc plugin and passing the nondaemon_flag.
87+
Start a pipe with two nodes using the resource multiproc plugin and passing the nondaemon_flag.
8888
'''
8989

9090
cur_dir = os.getcwd()
@@ -107,11 +107,10 @@ def run_multiproc_nondaemon_with_flag(nondaemon_flag):
107107
f1.inputs.insum = 0
108108

109109
pipe.config['execution']['stop_on_first_crash'] = True
110-
pipe.config['execution']['poll_sleep_duration'] = 2
111110

112-
# execute the pipe using the MultiProc plugin with 2 processes and the non_daemon flag
111+
# execute the pipe using the ResourceMultiProc plugin with 2 processes and the non_daemon flag
113112
# to enable child processes which start other multiprocessing jobs
114-
execgraph = pipe.run(plugin="MultiProc",
113+
execgraph = pipe.run(plugin="ResourceMultiProc",
115114
plugin_args={'n_procs': 2,
116115
'non_daemon': nondaemon_flag})
117116

0 commit comments

Comments
 (0)