1
+ import os
2
+ import multiprocessing
3
+ from tempfile import mkdtemp
4
+ from tempfile import mkstemp
5
+ from shutil import rmtree
6
+
7
+ from inspect import *
8
+
9
+ from nipype .testing import assert_equal , assert_true
10
+ import nipype .pipeline .engine as pe
11
+ from nipype .interfaces .utility import Function
12
+
13
+ class TestInterface ():
14
+
15
+ def testFunction (sum = 0 ):
16
+ '''
17
+ Run a multiprocessing job and spawn child processes.
18
+ '''
19
+
20
+ # need to import here since this is executed as an external process
21
+ import multiprocessing
22
+ import tempfile
23
+ import time
24
+ import os
25
+ import test_multiproc_nondaemon_dummy as d
26
+
27
+ numberOfThreads = 2
28
+
29
+ # list of processes
30
+ t = [None ] * numberOfThreads
31
+
32
+ # list of alive flags
33
+ a = [None ] * numberOfThreads
34
+
35
+ # list of tempFiles
36
+ f = [None ] * numberOfThreads
37
+
38
+ for n in xrange ( numberOfThreads ):
39
+
40
+ # mark thread as alive
41
+ a [n ] = True
42
+
43
+ # create a temp file to use as the data exchange container
44
+ tmpFile = tempfile .mkstemp ('.txt' ,'test_engine_' )[1 ]
45
+ f [n ] = tmpFile # keep track of the temp file
46
+ t [n ] = multiprocessing .Process (target = d .dummyFunction , args = (tmpFile ,))
47
+ # fire up the job
48
+ t [n ].start ()
49
+
50
+
51
+ # block until all processes are done
52
+ allDone = False
53
+ while not allDone :
54
+
55
+ time .sleep (1 )
56
+
57
+ for n in xrange (numberOfThreads ):
58
+
59
+ a [n ] = t [n ].is_alive ()
60
+
61
+ if not any (a ):
62
+ # if no thread is alive
63
+ allDone = True
64
+
65
+ # here, all processes are done
66
+
67
+ # read in all temp files and sum them up
68
+ for file in f :
69
+ with open (file ) as fd :
70
+ sum += int (fd .read ())
71
+ os .remove (file )
72
+
73
+ return sum
74
+
75
+
76
+ def test_run_multiproc_nondaemon_with_flag (nondaemon_flag ):
77
+ '''
78
+ Start a pipe with two nodes using the multiproc plugin and passing the nondaemon_flag.
79
+ '''
80
+
81
+ cur_dir = os .getcwd ()
82
+ temp_dir = mkdtemp (prefix = 'test_engine_' )
83
+ os .chdir (temp_dir )
84
+
85
+ pipe = pe .Workflow (name = 'pipe' )
86
+
87
+ f1 = pe .Node (interface = Function (function = TestInterface .testFunction , input_names = ['sum' ], output_names = ['sum_out' ]), name = 'f1' )
88
+ f2 = pe .Node (interface = Function (function = TestInterface .testFunction , input_names = ['sum' ], output_names = ['sum_out' ]), name = 'f2' )
89
+
90
+ pipe .connect ([(f1 ,f2 ,[('sum_out' ,'sum' )])])
91
+ pipe .base_dir = os .getcwd ()
92
+ f1 .inputs .sum = 0
93
+
94
+ # execute the pipe using the MultiProc plugin with 2 processes and the non_daemon flag
95
+ # to enable child processes which start other multiprocessing jobs
96
+ execgraph = pipe .run (plugin = "MultiProc" , plugin_args = {'n_procs' :2 , 'non_daemon' :nondaemon_flag })
97
+
98
+ names = ['.' .join ((node ._hierarchy ,node .name )) for node in execgraph .nodes ()]
99
+ node = execgraph .nodes ()[names .index ('pipe.f2' )]
100
+ result = node .get_output ('sum_out' )
101
+ yield assert_equal , result , 180 # n_procs (2) * numberOfThreads (2) * 45 == 180
102
+ os .chdir (cur_dir )
103
+ rmtree (temp_dir )
104
+
105
+
106
+ def test_run_multiproc_nondaemon ():
107
+ '''
108
+ This is the entry point for the test. Two times a pipe of several multiprocessing jobs gets
109
+ executed. First, without the nondaemon flag. Second, with the nondaemon flag.
110
+
111
+ Since the processes of the pipe start child processes, the execution only succeeds when the
112
+ non_daemon flag is on.
113
+ '''
114
+ shouldHaveFailed = False
115
+
116
+ try :
117
+ # with nondaemon_flag = False, the execution should fail
118
+ test_run_multiproc_nondaemon_with_flag (False )
119
+ except :
120
+ shouldHaveFailed = True
121
+
122
+ # with nondaemon_flag = True, the execution should succeed
123
+ test_run_multiproc_nondaemon_with_flag (True )
124
+
125
+ yield assert_true , shouldHaveFailed
126
+
0 commit comments