@@ -113,24 +113,27 @@ class OutputThread(threading.Thread):
113
113
stdout/stderr buffers fill up.
114
114
"""
115
115
116
- def __init__ (self ):
116
+ def __init__ (self , event , logger ):
117
117
super (OutputThread , self ).__init__ ()
118
118
self .read_fd , self .write_fd = os .pipe ()
119
119
self .pipe_fobj = os .fdopen (self .read_fd )
120
120
self .out = []
121
+ self .event = event
122
+ self .logger = logger
121
123
self .start ()
122
124
123
125
def run (self ):
124
126
"""
125
127
It might happen that after the process is gone, the thread
126
- still has data to read from the pipe. Should probably introduce
127
- a boolean and set it to True under the 'if not line' block
128
- below and make the caller wait for it to become True.
128
+ still has data to read from the pipe. Hence, event is used
129
+ to synchronize with the caller.
129
130
"""
130
131
while True :
131
132
line = self .pipe_fobj .readline ()
132
133
if not line :
134
+ self .logger .debug ("end of output" )
133
135
self .pipe_fobj .close ()
136
+ self .event .set ()
134
137
return
135
138
136
139
self .out .append (line )
@@ -142,6 +145,7 @@ def fileno(self):
142
145
return self .write_fd
143
146
144
147
def close (self ):
148
+ self .logger .debug ("closed" )
145
149
os .close (self .write_fd )
146
150
147
151
orig_work_dir = None
@@ -163,7 +167,8 @@ def close(self):
163
167
return
164
168
165
169
timeout_thread = None
166
- output_thread = OutputThread ()
170
+ event = threading .Event ()
171
+ output_thread = OutputThread (event , self .logger )
167
172
try :
168
173
start_time = time .time ()
169
174
try :
@@ -183,18 +188,20 @@ def close(self):
183
188
self .pid = p .pid
184
189
185
190
if self .timeout :
186
- condition = threading .Condition ()
191
+ time_condition = threading .Condition ()
187
192
self .logger .debug ("Setting timeout to {}" .format (self .timeout ))
188
193
timeout_thread = TimeoutThread (self .logger , self .timeout ,
189
- condition , p )
194
+ time_condition , p )
190
195
191
196
self .logger .debug ("Waiting for process with PID {}" .format (p .pid ))
192
197
p .wait ()
198
+ self .logger .debug ("done waiting" )
193
199
194
200
if self .timeout :
195
201
e = timeout_thread .get_exception ()
196
202
if e :
197
203
raise e
204
+
198
205
except KeyboardInterrupt as e :
199
206
self .logger .info ("Got KeyboardException while processing " ,
200
207
exc_info = True )
@@ -211,9 +218,16 @@ def close(self):
211
218
self .logger .debug ("{} -> {}" .format (self .cmd , self .getretcode ()))
212
219
finally :
213
220
if self .timeout != 0 and timeout_thread :
214
- with condition :
215
- condition .notifyAll ()
221
+ with time_condition :
222
+ time_condition .notifyAll ()
223
+
224
+ # The subprocess module does not close the write pipe descriptor
225
+ # it fetched via OutputThread's fileno() so in order to gracefully
226
+ # exit the read loop we have to close it here ourselves.
216
227
output_thread .close ()
228
+ self .logger .debug ("Waiting on output thread to finish reading" )
229
+ event .wait ()
230
+
217
231
self .out = output_thread .getoutput ()
218
232
elapsed_time = time .time () - start_time
219
233
self .logger .debug ("Command {} took {} seconds" .
0 commit comments