Skip to content

Commit f477778

Browse files
committed
Fix matplotlib use of GUI toolkit event loops
1 parent c7af34c commit f477778

File tree

4 files changed

+68
-136
lines changed

4 files changed

+68
-136
lines changed

ipykernel/eventloops.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ def register_integration(*toolkitnames):
4040
You can provide alternative names for the same toolkit.
4141
4242
The decorated function should take a single argument, the IPython kernel
43-
instance, arrange for the event loop to call ``kernel.do_one_iteration()``
44-
at least every ``kernel._poll_interval`` seconds, and start the event loop.
43+
instance, arrange for the event loop to yield the asyncio loop when a
44+
message is received by the main shell zmq stream or at least every
45+
``kernel._poll_interval`` seconds, and start the event loop.
4546
4647
:mod:`ipykernel.eventloops` provides and registers such functions
4748
for a few common event loops.
@@ -68,6 +69,15 @@ def exit_decorator(exit_func):
6869
return decorator
6970

7071

72+
def get_shell_stream(kernel):
73+
# Return the zmq stream that receives messages for the main shell.
74+
if kernel._supports_kernel_subshells:
75+
manager = kernel.shell_channel_thread.manager
76+
socket_pair = manager.get_shell_channel_to_subshell_pair(None)
77+
return socket_pair.to_stream
78+
return kernel.shell_stream
79+
80+
7181
def _notify_stream_qt(kernel):
7282
import operator
7383
from functools import lru_cache
@@ -87,17 +97,20 @@ def exit_loop():
8797
kernel._qt_notifier.setEnabled(False)
8898
kernel.app.qt_event_loop.quit()
8999

90-
def process_stream_events():
100+
def process_stream_events_wrap(shell_stream):
91101
"""fall back to main loop when there's a socket event"""
92102
# call flush to ensure that the stream doesn't lose events
93103
# due to our consuming of the edge-triggered FD
94104
# flush returns the number of events consumed.
95105
# if there were any, wake it up
96-
if kernel.shell_stream.flush(limit=1):
106+
if shell_stream.flush(limit=1):
97107
exit_loop()
98108

109+
shell_stream = get_shell_stream(kernel)
110+
process_stream_events = partial(process_stream_events_wrap, shell_stream)
111+
99112
if not hasattr(kernel, "_qt_notifier"):
100-
fd = kernel.shell_stream.getsockopt(zmq.FD)
113+
fd = shell_stream.getsockopt(zmq.FD)
101114
kernel._qt_notifier = QtCore.QSocketNotifier(
102115
fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop
103116
)
@@ -177,9 +190,11 @@ def loop_wx(kernel):
177190
# Wx uses milliseconds
178191
poll_interval = int(1000 * kernel._poll_interval)
179192

180-
def wake():
193+
shell_stream = get_shell_stream(kernel)
194+
195+
def wake(shell_stream):
181196
"""wake from wx"""
182-
if kernel.shell_stream.flush(limit=1):
197+
if shell_stream.flush(limit=1):
183198
kernel.app.ExitMainLoop()
184199
return
185200

@@ -201,7 +216,7 @@ def on_timer(self, event):
201216
# wx.Timer to defer back to the tornado event loop.
202217
class IPWxApp(wx.App): # type:ignore[misc]
203218
def OnInit(self):
204-
self.frame = TimerFrame(wake)
219+
self.frame = TimerFrame(partial(wake, shell_stream))
205220
self.frame.Show(False)
206221
return True
207222

@@ -248,14 +263,14 @@ def __init__(self, app):
248263

249264
def exit_loop():
250265
"""fall back to main loop"""
251-
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
266+
app.tk.deletefilehandler(shell_stream.getsockopt(zmq.FD))
252267
app.quit()
253268
app.destroy()
254269
del kernel.app_wrapper
255270

256-
def process_stream_events(*a, **kw):
271+
def process_stream_events_wrap(shell_stream, *a, **kw):
257272
"""fall back to main loop when there's a socket event"""
258-
if kernel.shell_stream.flush(limit=1):
273+
if shell_stream.flush(limit=1):
259274
exit_loop()
260275

261276
# allow for scheduling exits from the loop in case a timeout needs to
@@ -268,9 +283,10 @@ def _schedule_exit(delay):
268283

269284
# For Tkinter, we create a Tk object and call its withdraw method.
270285
kernel.app_wrapper = BasicAppWrapper(app)
271-
app.tk.createfilehandler(
272-
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
273-
)
286+
shell_stream = get_shell_stream(kernel)
287+
process_stream_events = partial(process_stream_events_wrap, shell_stream)
288+
289+
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events)
274290
# schedule initial call after start
275291
app.after(0, process_stream_events)
276292

@@ -283,15 +299,19 @@ def _schedule_exit(delay):
283299

284300
nest_asyncio.apply()
285301

286-
doi = kernel.do_one_iteration
287302
# Tk uses milliseconds
288303
poll_interval = int(1000 * kernel._poll_interval)
289304

305+
shell_stream = get_shell_stream(kernel)
306+
290307
class TimedAppWrapper:
291-
def __init__(self, app, func):
308+
def __init__(self, app, shell_stream):
292309
self.app = app
310+
self.shell_stream = shell_stream
293311
self.app.withdraw()
294-
self.func = func
312+
313+
async def func(self):
314+
self.shell_stream.flush(limit=1)
295315

296316
def on_timer(self):
297317
loop = asyncio.get_event_loop()
@@ -305,16 +325,18 @@ def start(self):
305325
self.on_timer() # Call it once to get things going.
306326
self.app.mainloop()
307327

308-
kernel.app_wrapper = TimedAppWrapper(app, doi)
328+
kernel.app_wrapper = TimedAppWrapper(app, shell_stream)
309329
kernel.app_wrapper.start()
310330

311331

312332
@loop_tk.exit
313333
def loop_tk_exit(kernel):
314334
"""Exit the tk loop."""
315335
try:
336+
kernel.app_wrapper.app.quit()
316337
kernel.app_wrapper.app.destroy()
317338
del kernel.app_wrapper
339+
kernel.eventloop = None
318340
except (RuntimeError, AttributeError):
319341
pass
320342

@@ -359,6 +381,7 @@ def loop_cocoa(kernel):
359381
from ._eventloop_macos import mainloop, stop
360382

361383
real_excepthook = sys.excepthook
384+
shell_stream = get_shell_stream(kernel)
362385

363386
def handle_int(etype, value, tb):
364387
"""don't let KeyboardInterrupts look like crashes"""
@@ -377,7 +400,7 @@ def handle_int(etype, value, tb):
377400
# don't let interrupts during mainloop invoke crash_handler:
378401
sys.excepthook = handle_int
379402
mainloop(kernel._poll_interval)
380-
if kernel.shell_stream.flush(limit=1):
403+
if shell_stream.flush(limit=1):
381404
# events to process, return control to kernel
382405
return
383406
except BaseException:
@@ -415,13 +438,14 @@ def loop_asyncio(kernel):
415438
loop._should_close = False # type:ignore[attr-defined]
416439

417440
# pause eventloop when there's an event on a zmq socket
418-
def process_stream_events(stream):
441+
def process_stream_events(shell_stream):
419442
"""fall back to main loop when there's a socket event"""
420-
if stream.flush(limit=1):
443+
if shell_stream.flush(limit=1):
421444
loop.stop()
422445

423-
notifier = partial(process_stream_events, kernel.shell_stream)
424-
loop.add_reader(kernel.shell_stream.getsockopt(zmq.FD), notifier)
446+
shell_stream = get_shell_stream(kernel)
447+
notifier = partial(process_stream_events, shell_stream)
448+
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
425449
loop.call_soon(notifier)
426450

427451
while True:

ipykernel/kernelbase.py

Lines changed: 4 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import asyncio
88
import inspect
9-
import itertools
109
import logging
1110
import os
1211
import socket
@@ -42,7 +41,6 @@
4241
from IPython.core.error import StdinNotImplementedError
4342
from jupyter_client.session import Session
4443
from tornado import ioloop
45-
from tornado.queues import Queue, QueueEmpty
4644
from traitlets.config.configurable import SingletonConfigurable
4745
from traitlets.traitlets import (
4846
Any,
@@ -511,11 +509,6 @@ async def advance_eventloop():
511509
if self.eventloop is not eventloop:
512510
self.log.info("exiting eventloop %s", eventloop)
513511
return
514-
if self.msg_queue.qsize():
515-
self.log.debug("Delaying eventloop due to waiting messages")
516-
# still messages to process, make the eventloop wait
517-
schedule_next()
518-
return
519512
self.log.debug("Advancing eventloop %s", eventloop)
520513
try:
521514
eventloop(self)
@@ -534,98 +527,18 @@ def schedule_next():
534527
# already consumed from the queue by process_one and the queue is
535528
# technically empty.
536529
self.log.debug("Scheduling eventloop advance")
537-
self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))
530+
self.io_loop.call_later(0.001, advance_eventloop)
538531

539532
# begin polling the eventloop
540533
schedule_next()
541534

542-
async def do_one_iteration(self):
543-
"""Process a single shell message
544-
545-
Any pending control messages will be flushed as well
546-
547-
.. versionchanged:: 5
548-
This is now a coroutine
549-
"""
550-
# flush messages off of shell stream into the message queue
551-
if self.shell_stream and not self._supports_kernel_subshells:
552-
self.shell_stream.flush()
553-
# process at most one shell message per iteration
554-
await self.process_one(wait=False)
555-
556-
async def process_one(self, wait=True):
557-
"""Process one request
558-
559-
Returns None if no message was handled.
560-
"""
561-
if wait:
562-
t, dispatch, args = await self.msg_queue.get()
563-
else:
564-
try:
565-
t, dispatch, args = self.msg_queue.get_nowait()
566-
except (asyncio.QueueEmpty, QueueEmpty):
567-
return
568-
569-
if self.control_thread is None and self.control_stream is not None:
570-
# If there isn't a separate control thread then this main thread handles both shell
571-
# and control messages. Before processing a shell message we need to flush all control
572-
# messages and allow them all to be processed.
573-
await asyncio.sleep(0)
574-
self.control_stream.flush()
575-
576-
socket = self.control_stream.socket
577-
while socket.poll(1):
578-
await asyncio.sleep(0)
579-
self.control_stream.flush()
580-
581-
await dispatch(*args)
582-
583-
async def dispatch_queue(self):
584-
"""Coroutine to preserve order of message handling
585-
586-
Ensures that only one message is processing at a time,
587-
even when the handler is async
588-
"""
589-
590-
while True:
591-
try:
592-
await self.process_one()
593-
except Exception:
594-
self.log.exception("Error in message handler")
595-
596-
_message_counter = Any(
597-
help="""Monotonic counter of messages
598-
""",
599-
)
600-
601-
@default("_message_counter")
602-
def _message_counter_default(self):
603-
return itertools.count()
604-
605-
def schedule_dispatch(self, dispatch, *args):
606-
"""schedule a message for dispatch"""
607-
idx = next(self._message_counter)
608-
609-
self.msg_queue.put_nowait(
610-
(
611-
idx,
612-
dispatch,
613-
args,
614-
)
615-
)
616-
# ensure the eventloop wakes up
617-
self.io_loop.add_callback(lambda: None)
618-
619535
async def _create_control_lock(self):
620536
# This can be removed when minimum python increases to 3.10
621537
self._control_lock = asyncio.Lock()
622538

623539
def start(self):
624540
"""register dispatchers for streams"""
625541
self.io_loop = ioloop.IOLoop.current()
626-
self.msg_queue: Queue[t.Any] = Queue()
627-
if not self.shell_channel_thread:
628-
self.io_loop.add_callback(self.dispatch_queue)
629542

630543
if self.control_stream:
631544
self.control_stream.on_recv(self.dispatch_control, copy=False)
@@ -644,10 +557,7 @@ def start(self):
644557
self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False)
645558
else:
646559
self.shell_stream.on_recv(
647-
partial(
648-
self.schedule_dispatch,
649-
self.dispatch_shell,
650-
),
560+
partial(self.shell_main, None),
651561
copy=False,
652562
)
653563

@@ -1410,23 +1320,18 @@ async def stop_aborting():
14101320
self.log.info("Finishing abort")
14111321
self._aborting = False
14121322

1413-
# put the stop-aborting event on the message queue
1414-
# so that all messages already waiting in the queue are aborted
1415-
# before we reset the flag
1416-
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)
1417-
14181323
if self.stop_on_error_timeout:
14191324
# if we have a delay, give messages this long to arrive on the queue
14201325
# before we stop aborting requests
1421-
self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting)
1326+
self.io_loop.call_later(self.stop_on_error_timeout, stop_aborting)
14221327
# If we have an eventloop, it may interfere with the call_later above.
14231328
# If the loop has a _schedule_exit method, we call that so the loop exits
14241329
# after stop_on_error_timeout, returning to the main io_loop and letting
14251330
# the call_later fire.
14261331
if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
14271332
self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
14281333
else:
1429-
schedule_stop_aborting()
1334+
self.io_loop.add_callback(stop_aborting)
14301335

14311336
def _send_abort_reply(self, stream, msg, idents):
14321337
"""Send a reply to an aborted request"""

tests/test_ipkernel_direct.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,29 +165,37 @@ def test_dispatch_debugpy(ipkernel: IPythonKernel) -> None:
165165
async def test_start(ipkernel: IPythonKernel) -> None:
166166
shell_future: asyncio.Future = asyncio.Future()
167167

168-
async def fake_dispatch_queue():
169-
shell_future.set_result(None)
168+
def fake_publish_status(status, channel):
169+
if status == "starting" and channel == "shell":
170+
shell_future.set_result(None)
170171

171-
ipkernel.dispatch_queue = fake_dispatch_queue # type:ignore
172+
ipkernel._publish_status = fake_publish_status # type:ignore
172173
ipkernel.start()
173-
ipkernel.debugpy_stream = None
174-
ipkernel.start()
175-
await ipkernel.process_one(False)
174+
176175
await shell_future
177176

177+
shell_stream = ipkernel.shell_stream
178+
assert shell_stream is not None
179+
assert not shell_stream.closed()
180+
178181

179182
async def test_start_no_debugpy(ipkernel: IPythonKernel) -> None:
180183
shell_future: asyncio.Future = asyncio.Future()
181184

182-
async def fake_dispatch_queue():
183-
shell_future.set_result(None)
185+
def fake_publish_status(status, channel):
186+
if status == "starting" and channel == "shell":
187+
shell_future.set_result(None)
184188

185-
ipkernel.dispatch_queue = fake_dispatch_queue # type:ignore
189+
ipkernel._publish_status = fake_publish_status # type:ignore
186190
ipkernel.debugpy_stream = None
187191
ipkernel.start()
188192

189193
await shell_future
190194

195+
shell_stream = ipkernel.shell_stream
196+
assert shell_stream is not None
197+
assert not shell_stream.closed()
198+
191199

192200
def test_create_comm():
193201
assert isinstance(_create_comm(), BaseComm)

0 commit comments

Comments
 (0)