3
3
from __future__ import absolute_import
4
4
import atexit
5
5
import errno
6
- from threading import Thread
6
+ import sys
7
+ from threading import Thread , Event
7
8
import time
8
9
9
10
# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
@@ -41,9 +42,15 @@ def __init__(self, socket, session, loop):
41
42
self .socket = socket
42
43
self .session = session
43
44
self .ioloop = loop
45
+ evt = Event ()
44
46
45
- self .stream = zmqstream .ZMQStream (self .socket , self .ioloop )
46
- self .stream .on_recv (self ._handle_recv )
47
+ def setup_stream ():
48
+ self .stream = zmqstream .ZMQStream (self .socket , self .ioloop )
49
+ self .stream .on_recv (self ._handle_recv )
50
+ evt .set ()
51
+
52
+ self .ioloop .add_callback (setup_stream )
53
+ evt .wait ()
47
54
48
55
_is_alive = False
49
56
def is_alive (self ):
@@ -142,11 +149,11 @@ class IOLoopThread(Thread):
142
149
"""Run a pyzmq ioloop in a thread to send and receive messages
143
150
"""
144
151
_exiting = False
152
+ ioloop = None
145
153
146
- def __init__ (self , loop ):
154
+ def __init__ (self ):
147
155
super (IOLoopThread , self ).__init__ ()
148
156
self .daemon = True
149
- self .ioloop = loop or ioloop .IOLoop ()
150
157
151
158
@staticmethod
152
159
@atexit .register
@@ -156,8 +163,26 @@ def _notice_exit():
156
163
if IOLoopThread is not None :
157
164
IOLoopThread ._exiting = True
158
165
166
+ def start (self ):
167
+ """Start the IOLoop thread
168
+
169
+ Don't return until self.ioloop is defined,
170
+ which is created in the thread
171
+ """
172
+ self ._start_event = Event ()
173
+ Thread .start (self )
174
+ self ._start_event .wait ()
175
+
159
176
def run (self ):
160
177
"""Run my loop, ignoring EINTR events in the poller"""
178
+ if 'asyncio' in sys .modules :
179
+ # tornado may be using asyncio,
180
+ # ensure an eventloop exists for this thread
181
+ import asyncio
182
+ asyncio .set_event_loop (asyncio .new_event_loop ())
183
+ self .ioloop = ioloop .IOLoop ()
184
+ # signal that self.ioloop is defined
185
+ self ._start_event .set ()
161
186
while True :
162
187
try :
163
188
self .ioloop .start ()
@@ -182,9 +207,10 @@ def stop(self):
182
207
:meth:`~threading.Thread.start` is called again.
183
208
"""
184
209
if self .ioloop is not None :
185
- self .ioloop .stop ( )
210
+ self .ioloop .add_callback ( self . ioloop . stop )
186
211
self .join ()
187
212
self .close ()
213
+ self .ioloop = None
188
214
189
215
def close (self ):
190
216
if self .ioloop is not None :
@@ -198,22 +224,19 @@ class ThreadedKernelClient(KernelClient):
198
224
""" A KernelClient that provides thread-safe sockets with async callbacks on message replies.
199
225
"""
200
226
201
- _ioloop = None
202
227
@property
203
228
def ioloop (self ):
204
- if self ._ioloop is None :
205
- self ._ioloop = ioloop .IOLoop ()
206
- return self ._ioloop
229
+ return self .ioloop_thread .ioloop
207
230
208
231
ioloop_thread = Instance (IOLoopThread , allow_none = True )
209
232
210
233
def start_channels (self , shell = True , iopub = True , stdin = True , hb = True ):
234
+ self .ioloop_thread = IOLoopThread ()
235
+ self .ioloop_thread .start ()
236
+
211
237
if shell :
212
238
self .shell_channel ._inspect = self ._check_kernel_info_reply
213
239
214
- self .ioloop_thread = IOLoopThread (self .ioloop )
215
- self .ioloop_thread .start ()
216
-
217
240
super (ThreadedKernelClient , self ).start_channels (shell , iopub , stdin , hb )
218
241
219
242
def _check_kernel_info_reply (self , msg ):
0 commit comments