1
1
import os
2
- import multiprocessing
3
2
from tempfile import mkdtemp
4
- from tempfile import mkstemp
5
3
from shutil import rmtree
6
4
7
- from inspect import *
8
-
9
5
from nipype .testing import assert_equal , assert_true
10
6
import nipype .pipeline .engine as pe
11
7
from nipype .interfaces .utility import Function
12
8
13
- class TestInterface ():
14
-
15
- def testFunction (sum = 0 ):
16
- '''
17
- Run a multiprocessing job and spawn child processes.
18
- '''
19
-
20
- # need to import here since this is executed as an external process
21
- import multiprocessing
22
- import tempfile
23
- import time
24
- import os
25
- import test_multiproc_nondaemon_dummy as d
26
-
27
- numberOfThreads = 2
28
-
29
- # list of processes
30
- t = [None ] * numberOfThreads
31
-
32
- # list of alive flags
33
- a = [None ] * numberOfThreads
34
-
35
- # list of tempFiles
36
- f = [None ] * numberOfThreads
37
-
38
- for n in xrange ( numberOfThreads ):
39
-
40
- # mark thread as alive
41
- a [n ] = True
42
-
43
- # create a temp file to use as the data exchange container
44
- tmpFile = tempfile .mkstemp ('.txt' ,'test_engine_' )[1 ]
45
- f [n ] = tmpFile # keep track of the temp file
46
- t [n ] = multiprocessing .Process (target = d .dummyFunction , args = (tmpFile ,))
47
- # fire up the job
48
- t [n ].start ()
49
-
50
-
51
- # block until all processes are done
52
- allDone = False
53
- while not allDone :
54
-
55
- time .sleep (1 )
56
-
57
- for n in xrange (numberOfThreads ):
58
-
59
- a [n ] = t [n ].is_alive ()
60
-
61
- if not any (a ):
62
- # if no thread is alive
63
- allDone = True
64
-
65
- # here, all processes are done
66
-
67
- # read in all temp files and sum them up
68
- for file in f :
69
- with open (file ) as fd :
70
- sum += int (fd .read ())
71
- os .remove (file )
72
-
73
- return sum
74
-
75
-
76
- def test_run_multiproc_nondaemon_with_flag (nondaemon_flag ):
9
+ def dummyFunction (filename ):
10
+ '''
11
+ This function writes the value 45 to the given filename.
12
+ '''
13
+ j = 0
14
+ for i in range (0 , 10 ):
15
+ j += i
16
+
17
+ # j is now 45 (0+1+2+3+4+5+6+7+8+9)
18
+
19
+ with open (filename , 'w' ) as f :
20
+ f .write (str (j ))
21
+
22
+ def mytestFunction (insum = 0 ):
23
+ '''
24
+ Run a multiprocessing job and spawn child processes.
25
+ '''
26
+
27
+ # need to import here since this is executed as an external process
28
+ import multiprocessing
29
+ import tempfile
30
+ import time
31
+ import os
32
+
33
+ numberOfThreads = 2
34
+
35
+ # list of processes
36
+ t = [None ] * numberOfThreads
37
+
38
+ # list of alive flags
39
+ a = [None ] * numberOfThreads
40
+
41
+ # list of tempFiles
42
+ f = [None ] * numberOfThreads
43
+
44
+ for n in xrange (numberOfThreads ):
45
+
46
+ # mark thread as alive
47
+ a [n ] = True
48
+
49
+ # create a temp file to use as the data exchange container
50
+ tmpFile = tempfile .mkstemp ('.txt' , 'test_engine_' )[1 ]
51
+ f [n ] = tmpFile # keep track of the temp file
52
+ t [n ] = multiprocessing .Process (target = dummyFunction ,
53
+ args = (tmpFile ,))
54
+ # fire up the job
55
+ t [n ].start ()
56
+
57
+
58
+ # block until all processes are done
59
+ allDone = False
60
+ while not allDone :
61
+
62
+ time .sleep (1 )
63
+
64
+ for n in xrange (numberOfThreads ):
65
+
66
+ a [n ] = t [n ].is_alive ()
67
+
68
+ if not any (a ):
69
+ # if no thread is alive
70
+ allDone = True
71
+
72
+ # here, all processes are done
73
+
74
+ # read in all temp files and sum them up
75
+ total = 0
76
+ for file in f :
77
+ with open (file ) as fd :
78
+ total += int (fd .read ())
79
+ os .remove (file )
80
+
81
+ return total
82
+
83
+
84
+ def run_multiproc_nondaemon_with_flag (nondaemon_flag ):
77
85
'''
78
86
Start a pipe with two nodes using the multiproc plugin and passing the nondaemon_flag.
79
87
'''
@@ -84,26 +92,36 @@ def test_run_multiproc_nondaemon_with_flag(nondaemon_flag):
84
92
85
93
pipe = pe .Workflow (name = 'pipe' )
86
94
87
- f1 = pe .Node (interface = Function (function = TestInterface .testFunction , input_names = ['sum' ], output_names = ['sum_out' ]), name = 'f1' )
88
- f2 = pe .Node (interface = Function (function = TestInterface .testFunction , input_names = ['sum' ], output_names = ['sum_out' ]), name = 'f2' )
95
+ f1 = pe .Node (interface = Function (function = mytestFunction ,
96
+ input_names = ['insum' ],
97
+ output_names = ['sum_out' ]),
98
+ name = 'f1' )
99
+ f2 = pe .Node (interface = Function (function = mytestFunction ,
100
+ input_names = ['insum' ],
101
+ output_names = ['sum_out' ]),
102
+ name = 'f2' )
89
103
90
- pipe .connect ([(f1 ,f2 ,[('sum_out' ,'sum ' )])])
104
+ pipe .connect ([(f1 , f2 , [('sum_out' , 'insum ' )])])
91
105
pipe .base_dir = os .getcwd ()
92
- f1 .inputs .sum = 0
93
-
106
+ f1 .inputs .insum = 0
107
+
108
+ pipe .config = {'execution' : {'stop_on_first_error' : True }}
94
109
# execute the pipe using the MultiProc plugin with 2 processes and the non_daemon flag
95
110
# to enable child processes which start other multiprocessing jobs
96
- execgraph = pipe .run (plugin = "MultiProc" , plugin_args = {'n_procs' :2 , 'non_daemon' :nondaemon_flag })
111
+ execgraph = pipe .run (plugin = "MultiProc" ,
112
+ plugin_args = {'n_procs' : 2 ,
113
+ 'non_daemon' : nondaemon_flag })
97
114
98
115
names = ['.' .join ((node ._hierarchy ,node .name )) for node in execgraph .nodes ()]
99
116
node = execgraph .nodes ()[names .index ('pipe.f2' )]
100
117
result = node .get_output ('sum_out' )
101
- yield assert_equal , result , 180 # n_procs (2) * numberOfThreads (2) * 45 == 180
102
118
os .chdir (cur_dir )
103
119
rmtree (temp_dir )
120
+ if nondaemon_flag :
121
+ yield assert_equal , result , 180 # n_procs (2) * numberOfThreads (2) * 45 == 180
122
+
104
123
105
-
106
- def test_run_multiproc_nondaemon ():
124
+ def test_run_multiproc_nondaemon_false ():
107
125
'''
108
126
This is the entry point for the test. Two times a pipe of several multiprocessing jobs gets
109
127
executed. First, without the nondaemon flag. Second, with the nondaemon flag.
@@ -115,12 +133,12 @@ def test_run_multiproc_nondaemon():
115
133
116
134
try :
117
135
# with nondaemon_flag = False, the execution should fail
118
- test_run_multiproc_nondaemon_with_flag (False )
136
+ run_multiproc_nondaemon_with_flag (False )
119
137
except :
120
138
shouldHaveFailed = True
121
-
139
+ yield assert_true , shouldHaveFailed
140
+
141
+ def test_run_multiproc_nondaemon_true ():
122
142
# with nondaemon_flag = True, the execution should succeed
123
- test_run_multiproc_nondaemon_with_flag (True )
143
+ run_multiproc_nondaemon_with_flag (True )
124
144
125
- yield assert_true , shouldHaveFailed
126
-
0 commit comments