Skip to content

Commit 64aca4d

Browse files
committed
threadsafety in IOLoopThread
- avoid instantiating an IOLoop outside the thread in which it will be used, which sometimes causes problems. - ensure asyncio eventloop is defined in the thread, if asyncio might be in use
1 parent 7b45731 commit 64aca4d

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

jupyter_client/threaded.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from __future__ import absolute_import
44
import atexit
55
import errno
6-
from threading import Thread
6+
import sys
7+
from threading import Thread, Event
78
import time
89

910
# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
@@ -41,9 +42,15 @@ def __init__(self, socket, session, loop):
4142
self.socket = socket
4243
self.session = session
4344
self.ioloop = loop
45+
evt = Event()
4446

45-
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
46-
self.stream.on_recv(self._handle_recv)
47+
def setup_stream():
48+
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
49+
self.stream.on_recv(self._handle_recv)
50+
evt.set()
51+
52+
self.ioloop.add_callback(setup_stream)
53+
evt.wait()
4754

4855
_is_alive = False
4956
def is_alive(self):
@@ -142,11 +149,11 @@ class IOLoopThread(Thread):
142149
"""Run a pyzmq ioloop in a thread to send and receive messages
143150
"""
144151
_exiting = False
152+
ioloop = None
145153

146-
def __init__(self, loop):
154+
def __init__(self):
147155
super(IOLoopThread, self).__init__()
148156
self.daemon = True
149-
self.ioloop = loop or ioloop.IOLoop()
150157

151158
@staticmethod
152159
@atexit.register
@@ -156,8 +163,26 @@ def _notice_exit():
156163
if IOLoopThread is not None:
157164
IOLoopThread._exiting = True
158165

166+
def start(self):
167+
"""Start the IOLoop thread
168+
169+
Don't return until self.ioloop is defined,
170+
which is created in the thread
171+
"""
172+
self._start_event = Event()
173+
Thread.start(self)
174+
self._start_event.wait()
175+
159176
def run(self):
160177
"""Run my loop, ignoring EINTR events in the poller"""
178+
if 'asyncio' in sys.modules:
179+
# tornado may be using asyncio,
180+
# ensure an eventloop exists for this thread
181+
import asyncio
182+
asyncio.set_event_loop(asyncio.new_event_loop())
183+
self.ioloop = ioloop.IOLoop()
184+
# signal that self.ioloop is defined
185+
self._start_event.set()
161186
while True:
162187
try:
163188
self.ioloop.start()
@@ -185,6 +210,7 @@ def stop(self):
185210
self.ioloop.add_callback(self.ioloop.stop)
186211
self.join()
187212
self.close()
213+
self.ioloop = None
188214

189215
def close(self):
190216
if self.ioloop is not None:
@@ -198,22 +224,19 @@ class ThreadedKernelClient(KernelClient):
198224
""" A KernelClient that provides thread-safe sockets with async callbacks on message replies.
199225
"""
200226

201-
_ioloop = None
202227
@property
203228
def ioloop(self):
204-
if self._ioloop is None:
205-
self._ioloop = ioloop.IOLoop()
206-
return self._ioloop
229+
return self.ioloop_thread.ioloop
207230

208231
ioloop_thread = Instance(IOLoopThread, allow_none=True)
209232

210233
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
234+
self.ioloop_thread = IOLoopThread()
235+
self.ioloop_thread.start()
236+
211237
if shell:
212238
self.shell_channel._inspect = self._check_kernel_info_reply
213239

214-
self.ioloop_thread = IOLoopThread(self.ioloop)
215-
self.ioloop_thread.start()
216-
217240
super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
218241

219242
def _check_kernel_info_reply(self, msg):

0 commit comments

Comments
 (0)