Skip to content

Commit b4b295f

Browse files
authored
Merge pull request #585 from SylvainCorlay/control-thread
Run control channel in separate thread
2 parents 29594ff + 208ad24 commit b4b295f

File tree

9 files changed

+136
-124
lines changed

9 files changed

+136
-124
lines changed

ipykernel/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version_info = (5, 6, 0, 'dev0')
1+
version_info = (6, 0, 0, 'dev0')
22
__version__ = '.'.join(map(str, version_info[:3]))
33

44
# pep440 is annoying, beta/alpha/rc should _not_ have dots or pip/setuptools

ipykernel/control.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from threading import Thread
2+
import zmq
3+
if zmq.pyzmq_version_info() >= (17, 0):
4+
from tornado.ioloop import IOLoop
5+
else:
6+
# deprecated since pyzmq 17
7+
from zmq.eventloop.ioloop import IOLoop
8+
9+
10+
class ControlThread(Thread):
11+
12+
def __init__(self, **kwargs):
13+
Thread.__init__(self, **kwargs)
14+
self.io_loop = IOLoop(make_current=False)
15+
16+
def run(self):
17+
self.io_loop.make_current()
18+
self.io_loop.start()
19+
self.io_loop.close(all_fds=True)

ipykernel/eventloops.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,7 @@ def loop_qt4(kernel):
114114

115115
kernel.app = get_app_qt4([" "])
116116
kernel.app.setQuitOnLastWindowClosed(False)
117-
118-
# Only register the eventloop for the shell stream because doing
119-
# it for the control stream is generating a bunch of unnecessary
120-
# warnings on Windows.
121-
_notify_stream_qt(kernel, kernel.shell_streams[0])
117+
_notify_stream_qt(kernel, kernel.shell_stream)
122118

123119
_loop_qt(kernel.app)
124120

@@ -160,10 +156,9 @@ def loop_wx(kernel):
160156

161157
def wake():
162158
"""wake from wx"""
163-
for stream in kernel.shell_streams:
164-
if stream.flush(limit=1):
165-
kernel.app.ExitMainLoop()
166-
return
159+
if kernel.shell_stream.flush(limit=1):
160+
kernel.app.ExitMainLoop()
161+
return
167162

168163
# We have to put the wx.Timer in a wx.Frame for it to fire properly.
169164
# We make the Frame hidden when we create it in the main app below.
@@ -237,13 +232,12 @@ def process_stream_events(stream, *a, **kw):
237232
# For Tkinter, we create a Tk object and call its withdraw method.
238233
kernel.app_wrapper = BasicAppWrapper(app)
239234

240-
for stream in kernel.shell_streams:
241-
notifier = partial(process_stream_events, stream)
242-
# seems to be needed for tk
243-
notifier.__name__ = "notifier"
244-
app.tk.createfilehandler(stream.getsockopt(zmq.FD), READABLE, notifier)
245-
# schedule initial call after start
246-
app.after(0, notifier)
235+
notifier = partial(process_stream_events, shell_stream)
236+
# seems to be needed for tk
237+
notifier.__name__ = "notifier"
238+
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, notifier)
239+
# schedule initial call after start
240+
app.after(0, notifier)
247241

248242
app.mainloop()
249243

@@ -330,10 +324,9 @@ def handle_int(etype, value, tb):
330324
# don't let interrupts during mainloop invoke crash_handler:
331325
sys.excepthook = handle_int
332326
mainloop(kernel._poll_interval)
333-
for stream in kernel.shell_streams:
334-
if stream.flush(limit=1):
335-
# events to process, return control to kernel
336-
return
327+
if kernel_shell_stream.flush(limit=1):
328+
# events to process, return control to kernel
329+
return
337330
except:
338331
raise
339332
except KeyboardInterrupt:
@@ -371,11 +364,9 @@ def process_stream_events(stream):
371364
if stream.flush(limit=1):
372365
loop.stop()
373366

374-
for stream in kernel.shell_streams:
375-
fd = stream.getsockopt(zmq.FD)
376-
notifier = partial(process_stream_events, stream)
377-
loop.add_reader(fd, notifier)
378-
loop.call_soon(notifier)
367+
notifier = partial(process_stream_events, shell_stream)
368+
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
369+
loop.call_soon(notifier)
379370

380371
while True:
381372
error = None

ipykernel/inprocess/client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#-----------------------------------------------------------------------------
1313

1414
# IPython imports
15-
from ipykernel.inprocess.socket import DummySocket
1615
from traitlets import Type, Instance, default
1716
from jupyter_client.clientabc import KernelClientABC
1817
from jupyter_client.client import KernelClient
@@ -171,10 +170,10 @@ def _dispatch_to_kernel(self, msg):
171170
if kernel is None:
172171
raise RuntimeError('Cannot send request. No kernel exists.')
173172

174-
stream = DummySocket()
173+
stream = kernel.shell_stream
175174
self.session.send(stream, msg)
176175
msg_parts = stream.recv_multipart()
177-
kernel.dispatch_shell(stream, msg_parts)
176+
kernel.dispatch_shell(msg_parts)
178177

179178
idents, reply_msg = self.session.recv(stream, copy=False)
180179
self.shell_channel.call_handlers_later(reply_msg)

ipykernel/inprocess/ipkernel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ class InProcessKernel(IPythonKernel):
4949
#-------------------------------------------------------------------------
5050

5151
shell_class = Type(allow_none=True)
52-
shell_streams = List()
53-
control_stream = Any()
5452
_underlying_iopub_socket = Instance(DummySocket, ())
5553
iopub_thread = Instance(IOPubThread)
5654

55+
shell_stream = Instance(DummySocket, ())
56+
5757
@default('iopub_thread')
5858
def _default_iopub_thread(self):
5959
thread = IOPubThread(self._underlying_iopub_socket)

ipykernel/iostream.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import sys
1212
import threading
1313
import warnings
14+
from weakref import WeakSet
1415
from io import StringIO, TextIOBase
1516

1617
import zmq
@@ -66,6 +67,7 @@ def __init__(self, socket, pipe=False):
6667
self._setup_pipe_in()
6768
self._local = threading.local()
6869
self._events = deque()
70+
self._event_pipes = WeakSet()
6971
self._setup_event_pipe()
7072
self.thread = threading.Thread(target=self._thread_main)
7173
self.thread.daemon = True
@@ -100,6 +102,9 @@ def _event_pipe(self):
100102
event_pipe.linger = 0
101103
event_pipe.connect(self._event_interface)
102104
self._local.event_pipe = event_pipe
105+
# WeakSet so that event pipes will be closed by garbage collection
106+
# when their threads are terminated
107+
self._event_pipes.add(event_pipe)
103108
return event_pipe
104109

105110
def _handle_event(self, msg):
@@ -179,8 +184,11 @@ def stop(self):
179184
return
180185
self.io_loop.add_callback(self.io_loop.stop)
181186
self.thread.join()
182-
if hasattr(self._local, 'event_pipe'):
183-
self._local.event_pipe.close()
187+
# close *all* event pipes, created in any thread
188+
# event pipes can only be used from other threads while self.thread.is_alive()
189+
# so after thread.join, this should be safe
190+
for event_pipe in self._event_pipes:
191+
event_pipe.close()
184192

185193
def close(self):
186194
if self.closed:

ipykernel/ipkernel.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ def start(self):
145145
self.shell.exit_now = False
146146
super(IPythonKernel, self).start()
147147

148-
def set_parent(self, ident, parent):
148+
def set_parent(self, ident, parent, channel='shell'):
149149
"""Overridden from parent to tell the display hook and output streams
150150
about the parent message.
151151
"""
152-
super(IPythonKernel, self).set_parent(ident, parent)
152+
super(IPythonKernel, self).set_parent(ident, parent, channel)
153153
self.shell.set_parent(parent)
154154

155155
def init_metadata(self, parent):
@@ -509,7 +509,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
509509
reply_content['engine_info'] = e_info
510510

511511
self.send_response(self.iopub_socket, 'error', reply_content,
512-
ident=self._topic('error'))
512+
ident=self._topic('error'), channel='shell')
513513
self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
514514
result_buf = []
515515
reply_content['status'] = 'error'

ipykernel/kernelapp.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
# local imports
3838
from .iostream import IOPubThread
39+
from .control import ControlThread
3940
from .heartbeat import Heartbeat
4041
from .ipkernel import IPythonKernel
4142
from .parentpoller import ParentPollerUnix, ParentPollerWindows
@@ -124,6 +125,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
124125
stdin_socket = Any()
125126
iopub_socket = Any()
126127
iopub_thread = Any()
128+
control_thread = Any()
127129

128130
ports = Dict()
129131

@@ -276,6 +278,17 @@ def init_sockets(self):
276278
self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
277279
self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)
278280

281+
if hasattr(zmq, 'ROUTER_HANDOVER'):
282+
# set router-handover to workaround zeromq reconnect problems
283+
# in certain rare circumstances
284+
# see ipython/ipykernel#270 and zeromq/libzmq#2892
285+
self.shell_socket.router_handover = \
286+
self.stdin_socket.router_handover = 1
287+
288+
self.init_control(context)
289+
self.init_iopub(context)
290+
291+
def init_control(self, context):
279292
self.control_socket = context.socket(zmq.ROUTER)
280293
self.control_socket.linger = 1000
281294
self.control_port = self._bind_socket(self.control_socket, self.control_port)
@@ -285,11 +298,9 @@ def init_sockets(self):
285298
# set router-handover to workaround zeromq reconnect problems
286299
# in certain rare circumstances
287300
# see ipython/ipykernel#270 and zeromq/libzmq#2892
288-
self.shell_socket.router_handover = \
289-
self.control_socket.router_handover = \
290-
self.stdin_socket.router_handover = 1
301+
self.control_socket.router_handover = 1
291302

292-
self.init_iopub(context)
303+
self.control_thread = ControlThread(daemon=True)
293304

294305
def init_iopub(self, context):
295306
self.iopub_socket = context.socket(zmq.PUB)
@@ -437,13 +448,13 @@ def init_signal(self):
437448
def init_kernel(self):
438449
"""Create the Kernel object itself"""
439450
shell_stream = ZMQStream(self.shell_socket)
440-
control_stream = ZMQStream(self.control_socket)
441-
451+
control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop)
452+
self.control_thread.start()
442453
kernel_factory = self.kernel_class.instance
443454

444455
kernel = kernel_factory(parent=self, session=self.session,
445456
control_stream=control_stream,
446-
shell_streams=[shell_stream, control_stream],
457+
shell_stream=shell_stream,
447458
iopub_thread=self.iopub_thread,
448459
iopub_socket=self.iopub_socket,
449460
stdin_socket=self.stdin_socket,

0 commit comments

Comments
 (0)