@@ -57,6 +57,7 @@ def __init__(self, socket, pipe=False):
57
57
piped from subprocesses.
58
58
"""
59
59
self .socket = socket
60
+ self ._stopped = False
60
61
self .background_socket = BackgroundSocket (self )
61
62
self ._master_pid = os .getpid ()
62
63
self ._pipe_flag = pipe
@@ -83,13 +84,21 @@ def _start_event_gc():
83
84
self ._event_pipe_gc_task = asyncio .ensure_future (self ._run_event_pipe_gc ())
84
85
85
86
self .io_loop .run_sync (_start_event_gc )
86
- self .io_loop .start ()
87
+
88
+ if not self ._stopped :
89
+ # avoid race if stop called before start thread gets here
90
+ # probably only comes up in tests
91
+ self .io_loop .start ()
92
+
87
93
if self ._event_pipe_gc_task is not None :
88
94
# cancel gc task to avoid pending task warnings
89
95
async def _cancel ():
90
96
self ._event_pipe_gc_task .cancel () # type:ignore
91
97
92
- self .io_loop .run_sync (_cancel )
98
+ try :
99
+ self .io_loop .run_sync (_cancel )
100
+ except TimeoutError :
101
+ pass
93
102
self .io_loop .close (all_fds = True )
94
103
95
104
def _setup_event_pipe (self ):
@@ -219,10 +228,16 @@ def start(self):
219
228
220
229
def stop (self ):
221
230
"""Stop the IOPub thread"""
231
+ self ._stopped = True
222
232
if not self .thread .is_alive ():
223
233
return
224
234
self .io_loop .add_callback (self .io_loop .stop )
225
- self .thread .join ()
235
+
236
+ self .thread .join (timeout = 30 )
237
+ if self .thread .is_alive ():
238
+ # avoid infinite hang if stop fails
239
+ msg = "IOPub thread did not terminate in 30 seconds"
240
+ raise TimeoutError (msg )
226
241
# close *all* event pipes, created in any thread
227
242
# event pipes can only be used from other threads while self.thread.is_alive()
228
243
# so after thread.join, this should be safe
0 commit comments