Skip to content

Commit 6b19469

Browse files
committed
Merge pull request #550 from satra/fix/danmultiproc
Fix/danmultiproc
2 parents 8896ae2 + f3fb558 commit 6b19469

File tree

3 files changed

+174
-3
lines changed

3 files changed

+174
-3
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
22
# vi: set ft=python sts=4 ts=4 sw=4 et:
33
"""Parallel workflow execution via multiprocessing
4+
5+
Support for child processes running as non-daemons based on
6+
http://stackoverflow.com/a/8963618/1183453
47
"""
58

6-
from multiprocessing import Pool, cpu_count
9+
from multiprocessing import Process, Pool, cpu_count, pool
710
from traceback import format_exception
811
import sys
912

10-
from .base import (DistributedPluginBase, logger, report_crash)
13+
from .base import (DistributedPluginBase, report_crash)
1114

1215
def run_node(node, updatehash):
1316
result = dict(result=None, traceback=None)
@@ -19,25 +22,49 @@ def run_node(node, updatehash):
1922
result['result'] = node.result
2023
return result
2124

25+
class NonDaemonProcess(Process):
26+
"""A non-daemon process to support internal multiprocessing.
27+
"""
28+
def _get_daemon(self):
29+
return False
30+
31+
def _set_daemon(self, value):
32+
pass
33+
34+
daemon = property(_get_daemon, _set_daemon)
35+
36+
class NonDaemonPool(pool.Pool):
37+
"""A process pool with non-daemon processes.
38+
"""
39+
Process = NonDaemonProcess
40+
2241
class MultiProcPlugin(DistributedPluginBase):
2342
"""Execute workflow with multiprocessing
2443
2544
The plugin_args input to run can be used to control the multiprocessing
2645
execution. Currently supported options are:
2746
2847
- n_procs : number of processes to use
48+
- non_daemon : boolean flag to execute as non-daemon processes
2949
3050
"""
3151

3252
def __init__(self, plugin_args=None):
3353
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
3454
self._taskresult = {}
3555
self._taskid = 0
56+
non_daemon = True
3657
n_procs = cpu_count()
3758
if plugin_args:
3859
if 'n_procs' in plugin_args:
3960
n_procs = plugin_args['n_procs']
40-
self.pool = Pool(processes=n_procs)
61+
if 'non_daemon' in plugin_args:
62+
non_daemon = plugin_args['non_daemon']
63+
if non_daemon:
64+
# run the execution using the non-daemon pool subclass
65+
self.pool = NonDaemonPool(processes=n_procs)
66+
else:
67+
self.pool = Pool(processes=n_procs)
4168

4269
def _get_result(self, taskid):
4370
if taskid not in self._taskresult:
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import os
2+
from tempfile import mkdtemp
3+
from shutil import rmtree
4+
5+
from nipype.testing import assert_equal, assert_true
6+
import nipype.pipeline.engine as pe
7+
from nipype.interfaces.utility import Function
8+
9+
10+
def mytestFunction(insum=0):
11+
'''
12+
Run a multiprocessing job and spawn child processes.
13+
'''
14+
15+
# need to import here since this is executed as an external process
16+
import multiprocessing
17+
import tempfile
18+
import time
19+
import os
20+
21+
numberOfThreads = 2
22+
23+
# list of processes
24+
t = [None] * numberOfThreads
25+
26+
# list of alive flags
27+
a = [None] * numberOfThreads
28+
29+
# list of tempFiles
30+
f = [None] * numberOfThreads
31+
32+
def dummyFunction(filename):
33+
'''
34+
This function writes the value 45 to the given filename.
35+
'''
36+
j = 0
37+
for i in range(0, 10):
38+
j += i
39+
40+
# j is now 45 (0+1+2+3+4+5+6+7+8+9)
41+
42+
with open(filename, 'w') as f:
43+
f.write(str(j))
44+
45+
for n in xrange(numberOfThreads):
46+
47+
# mark thread as alive
48+
a[n] = True
49+
50+
# create a temp file to use as the data exchange container
51+
tmpFile = tempfile.mkstemp('.txt', 'test_engine_')[1]
52+
f[n] = tmpFile # keep track of the temp file
53+
t[n] = multiprocessing.Process(target=dummyFunction,
54+
args=(tmpFile,))
55+
# fire up the job
56+
t[n].start()
57+
58+
59+
# block until all processes are done
60+
allDone = False
61+
while not allDone:
62+
63+
time.sleep(1)
64+
65+
for n in xrange(numberOfThreads):
66+
67+
a[n] = t[n].is_alive()
68+
69+
if not any(a):
70+
# if no thread is alive
71+
allDone = True
72+
73+
# here, all processes are done
74+
75+
# read in all temp files and sum them up
76+
total = insum
77+
for file in f:
78+
with open(file) as fd:
79+
total += int(fd.read())
80+
os.remove(file)
81+
82+
return total
83+
84+
85+
def run_multiproc_nondaemon_with_flag(nondaemon_flag):
86+
'''
87+
Start a pipe with two nodes using the multiproc plugin and passing the nondaemon_flag.
88+
'''
89+
90+
cur_dir = os.getcwd()
91+
temp_dir = mkdtemp(prefix='test_engine_')
92+
os.chdir(temp_dir)
93+
94+
pipe = pe.Workflow(name='pipe')
95+
96+
f1 = pe.Node(interface=Function(function=mytestFunction,
97+
input_names=['insum'],
98+
output_names=['sum_out']),
99+
name='f1')
100+
f2 = pe.Node(interface=Function(function=mytestFunction,
101+
input_names=['insum'],
102+
output_names=['sum_out']),
103+
name='f2')
104+
105+
pipe.connect([(f1, f2, [('sum_out', 'insum')])])
106+
pipe.base_dir = os.getcwd()
107+
f1.inputs.insum = 0
108+
109+
pipe.config = {'execution': {'stop_on_first_crash': True}}
110+
# execute the pipe using the MultiProc plugin with 2 processes and the non_daemon flag
111+
# to enable child processes which start other multiprocessing jobs
112+
execgraph = pipe.run(plugin="MultiProc",
113+
plugin_args={'n_procs': 2,
114+
'non_daemon': nondaemon_flag})
115+
116+
names = ['.'.join((node._hierarchy,node.name)) for node in execgraph.nodes()]
117+
node = execgraph.nodes()[names.index('pipe.f2')]
118+
result = node.get_output('sum_out')
119+
os.chdir(cur_dir)
120+
rmtree(temp_dir)
121+
return result
122+
123+
124+
def test_run_multiproc_nondaemon_false():
125+
'''
126+
This is the entry point for the test. Two times a pipe of several multiprocessing jobs gets
127+
executed. First, without the nondaemon flag. Second, with the nondaemon flag.
128+
129+
Since the processes of the pipe start child processes, the execution only succeeds when the
130+
non_daemon flag is on.
131+
'''
132+
shouldHaveFailed = False
133+
try:
134+
# with nondaemon_flag = False, the execution should fail
135+
run_multiproc_nondaemon_with_flag(False)
136+
except:
137+
shouldHaveFailed = True
138+
yield assert_true, shouldHaveFailed
139+
140+
def test_run_multiproc_nondaemon_true():
141+
# with nondaemon_flag = True, the execution should succeed
142+
result = run_multiproc_nondaemon_with_flag(True)
143+
yield assert_equal, result, 180 # n_procs (2) * numberOfThreads (2) * 45 == 180
144+

nipype/testing/data/Template_1_IXI550_MNI152.nii

Whitespace-only changes.

0 commit comments

Comments
 (0)