Skip to content

Commit f51c6ea

Browse files
committed
Fix matplotlib use of GUI toolkit event loops
1 parent b8f5dfc commit f51c6ea

File tree

4 files changed

+68
-136
lines changed

4 files changed

+68
-136
lines changed

ipykernel/eventloops.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ def register_integration(*toolkitnames):
4040
You can provide alternative names for the same toolkit.
4141
4242
The decorated function should take a single argument, the IPython kernel
43-
instance, arrange for the event loop to call ``kernel.do_one_iteration()``
44-
at least every ``kernel._poll_interval`` seconds, and start the event loop.
43+
instance, arrange for the event loop to yield the asyncio loop when a
44+
message is received by the main shell zmq stream or at least every
45+
``kernel._poll_interval`` seconds, and start the event loop.
4546
4647
:mod:`ipykernel.eventloops` provides and registers such functions
4748
for a few common event loops.
@@ -68,6 +69,15 @@ def exit_decorator(exit_func):
6869
return decorator
6970

7071

72+
def get_shell_stream(kernel):
73+
# Return the zmq stream that receives messages for the main shell.
74+
if kernel._supports_kernel_subshells:
75+
manager = kernel.shell_channel_thread.manager
76+
socket_pair = manager.get_shell_channel_to_subshell_pair(None)
77+
return socket_pair.to_stream
78+
return kernel.shell_stream
79+
80+
7181
def _notify_stream_qt(kernel):
7282
import operator
7383
from functools import lru_cache
@@ -87,17 +97,20 @@ def exit_loop():
8797
kernel._qt_notifier.setEnabled(False)
8898
kernel.app.qt_event_loop.quit()
8999

90-
def process_stream_events():
100+
def process_stream_events_wrap(shell_stream):
91101
"""fall back to main loop when there's a socket event"""
92102
# call flush to ensure that the stream doesn't lose events
93103
# due to our consuming of the edge-triggered FD
94104
# flush returns the number of events consumed.
95105
# if there were any, wake it up
96-
if kernel.shell_stream.flush(limit=1):
106+
if shell_stream.flush(limit=1):
97107
exit_loop()
98108

109+
shell_stream = get_shell_stream(kernel)
110+
process_stream_events = partial(process_stream_events_wrap, shell_stream)
111+
99112
if not hasattr(kernel, "_qt_notifier"):
100-
fd = kernel.shell_stream.getsockopt(zmq.FD)
113+
fd = shell_stream.getsockopt(zmq.FD)
101114
kernel._qt_notifier = QtCore.QSocketNotifier(
102115
fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop
103116
)
@@ -177,9 +190,11 @@ def loop_wx(kernel):
177190
# Wx uses milliseconds
178191
poll_interval = int(1000 * kernel._poll_interval)
179192

180-
def wake():
193+
shell_stream = get_shell_stream(kernel)
194+
195+
def wake(shell_stream):
181196
"""wake from wx"""
182-
if kernel.shell_stream.flush(limit=1):
197+
if shell_stream.flush(limit=1):
183198
kernel.app.ExitMainLoop()
184199
return
185200

@@ -201,7 +216,7 @@ def on_timer(self, event):
201216
# wx.Timer to defer back to the tornado event loop.
202217
class IPWxApp(wx.App): # type:ignore[misc]
203218
def OnInit(self):
204-
self.frame = TimerFrame(wake)
219+
self.frame = TimerFrame(partial(wake, shell_stream))
205220
self.frame.Show(False)
206221
return True
207222

@@ -248,14 +263,14 @@ def __init__(self, app):
248263

249264
def exit_loop():
250265
"""fall back to main loop"""
251-
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
266+
app.tk.deletefilehandler(shell_stream.getsockopt(zmq.FD))
252267
app.quit()
253268
app.destroy()
254269
del kernel.app_wrapper
255270

256-
def process_stream_events(*a, **kw):
271+
def process_stream_events_wrap(shell_stream, *a, **kw):
257272
"""fall back to main loop when there's a socket event"""
258-
if kernel.shell_stream.flush(limit=1):
273+
if shell_stream.flush(limit=1):
259274
exit_loop()
260275

261276
# allow for scheduling exits from the loop in case a timeout needs to
@@ -268,9 +283,10 @@ def _schedule_exit(delay):
268283

269284
# For Tkinter, we create a Tk object and call its withdraw method.
270285
kernel.app_wrapper = BasicAppWrapper(app)
271-
app.tk.createfilehandler(
272-
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
273-
)
286+
shell_stream = get_shell_stream(kernel)
287+
process_stream_events = partial(process_stream_events_wrap, shell_stream)
288+
289+
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events)
274290
# schedule initial call after start
275291
app.after(0, process_stream_events)
276292

@@ -283,15 +299,19 @@ def _schedule_exit(delay):
283299

284300
nest_asyncio.apply()
285301

286-
doi = kernel.do_one_iteration
287302
# Tk uses milliseconds
288303
poll_interval = int(1000 * kernel._poll_interval)
289304

305+
shell_stream = get_shell_stream(kernel)
306+
290307
class TimedAppWrapper:
291-
def __init__(self, app, func):
308+
def __init__(self, app, shell_stream):
292309
self.app = app
310+
self.shell_stream = shell_stream
293311
self.app.withdraw()
294-
self.func = func
312+
313+
async def func(self):
314+
self.shell_stream.flush(limit=1)
295315

296316
def on_timer(self):
297317
loop = asyncio.get_event_loop()
@@ -305,16 +325,18 @@ def start(self):
305325
self.on_timer() # Call it once to get things going.
306326
self.app.mainloop()
307327

308-
kernel.app_wrapper = TimedAppWrapper(app, doi)
328+
kernel.app_wrapper = TimedAppWrapper(app, shell_stream)
309329
kernel.app_wrapper.start()
310330

311331

312332
@loop_tk.exit
313333
def loop_tk_exit(kernel):
314334
"""Exit the tk loop."""
315335
try:
336+
kernel.app_wrapper.app.quit()
316337
kernel.app_wrapper.app.destroy()
317338
del kernel.app_wrapper
339+
kernel.eventloop = None
318340
except (RuntimeError, AttributeError):
319341
pass
320342

@@ -359,6 +381,7 @@ def loop_cocoa(kernel):
359381
from ._eventloop_macos import mainloop, stop
360382

361383
real_excepthook = sys.excepthook
384+
shell_stream = get_shell_stream(kernel)
362385

363386
def handle_int(etype, value, tb):
364387
"""don't let KeyboardInterrupts look like crashes"""
@@ -377,7 +400,7 @@ def handle_int(etype, value, tb):
377400
# don't let interrupts during mainloop invoke crash_handler:
378401
sys.excepthook = handle_int
379402
mainloop(kernel._poll_interval)
380-
if kernel.shell_stream.flush(limit=1):
403+
if shell_stream.flush(limit=1):
381404
# events to process, return control to kernel
382405
return
383406
except BaseException:
@@ -415,13 +438,14 @@ def loop_asyncio(kernel):
415438
loop._should_close = False # type:ignore[attr-defined]
416439

417440
# pause eventloop when there's an event on a zmq socket
418-
def process_stream_events(stream):
441+
def process_stream_events(shell_stream):
419442
"""fall back to main loop when there's a socket event"""
420-
if stream.flush(limit=1):
443+
if shell_stream.flush(limit=1):
421444
loop.stop()
422445

423-
notifier = partial(process_stream_events, kernel.shell_stream)
424-
loop.add_reader(kernel.shell_stream.getsockopt(zmq.FD), notifier)
446+
shell_stream = get_shell_stream(kernel)
447+
notifier = partial(process_stream_events, shell_stream)
448+
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
425449
loop.call_soon(notifier)
426450

427451
while True:

ipykernel/kernelbase.py

Lines changed: 4 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import asyncio
88
import inspect
9-
import itertools
109
import logging
1110
import os
1211
import socket
@@ -42,7 +41,6 @@
4241
from IPython.core.error import StdinNotImplementedError
4342
from jupyter_client.session import Session
4443
from tornado import ioloop
45-
from tornado.queues import Queue, QueueEmpty
4644
from traitlets.config.configurable import SingletonConfigurable
4745
from traitlets.traitlets import (
4846
Any,
@@ -507,11 +505,6 @@ async def advance_eventloop():
507505
if self.eventloop is not eventloop:
508506
self.log.info("exiting eventloop %s", eventloop)
509507
return
510-
if self.msg_queue.qsize():
511-
self.log.debug("Delaying eventloop due to waiting messages")
512-
# still messages to process, make the eventloop wait
513-
schedule_next()
514-
return
515508
self.log.debug("Advancing eventloop %s", eventloop)
516509
try:
517510
eventloop(self)
@@ -530,98 +523,18 @@ def schedule_next():
530523
# already consumed from the queue by process_one and the queue is
531524
# technically empty.
532525
self.log.debug("Scheduling eventloop advance")
533-
self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))
526+
self.io_loop.call_later(0.001, advance_eventloop)
534527

535528
# begin polling the eventloop
536529
schedule_next()
537530

538-
async def do_one_iteration(self):
539-
"""Process a single shell message
540-
541-
Any pending control messages will be flushed as well
542-
543-
.. versionchanged:: 5
544-
This is now a coroutine
545-
"""
546-
# flush messages off of shell stream into the message queue
547-
if self.shell_stream and not self._supports_kernel_subshells:
548-
self.shell_stream.flush()
549-
# process at most one shell message per iteration
550-
await self.process_one(wait=False)
551-
552-
async def process_one(self, wait=True):
553-
"""Process one request
554-
555-
Returns None if no message was handled.
556-
"""
557-
if wait:
558-
t, dispatch, args = await self.msg_queue.get()
559-
else:
560-
try:
561-
t, dispatch, args = self.msg_queue.get_nowait()
562-
except (asyncio.QueueEmpty, QueueEmpty):
563-
return
564-
565-
if self.control_thread is None and self.control_stream is not None:
566-
# If there isn't a separate control thread then this main thread handles both shell
567-
# and control messages. Before processing a shell message we need to flush all control
568-
# messages and allow them all to be processed.
569-
await asyncio.sleep(0)
570-
self.control_stream.flush()
571-
572-
socket = self.control_stream.socket
573-
while socket.poll(1):
574-
await asyncio.sleep(0)
575-
self.control_stream.flush()
576-
577-
await dispatch(*args)
578-
579-
async def dispatch_queue(self):
580-
"""Coroutine to preserve order of message handling
581-
582-
Ensures that only one message is processing at a time,
583-
even when the handler is async
584-
"""
585-
586-
while True:
587-
try:
588-
await self.process_one()
589-
except Exception:
590-
self.log.exception("Error in message handler")
591-
592-
_message_counter = Any(
593-
help="""Monotonic counter of messages
594-
""",
595-
)
596-
597-
@default("_message_counter")
598-
def _message_counter_default(self):
599-
return itertools.count()
600-
601-
def schedule_dispatch(self, dispatch, *args):
602-
"""schedule a message for dispatch"""
603-
idx = next(self._message_counter)
604-
605-
self.msg_queue.put_nowait(
606-
(
607-
idx,
608-
dispatch,
609-
args,
610-
)
611-
)
612-
# ensure the eventloop wakes up
613-
self.io_loop.add_callback(lambda: None)
614-
615531
async def _create_control_lock(self):
616532
# This can be removed when minimum python increases to 3.10
617533
self._control_lock = asyncio.Lock()
618534

619535
def start(self):
620536
"""register dispatchers for streams"""
621537
self.io_loop = ioloop.IOLoop.current()
622-
self.msg_queue: Queue[t.Any] = Queue()
623-
if not self.shell_channel_thread:
624-
self.io_loop.add_callback(self.dispatch_queue)
625538

626539
if self.control_stream:
627540
self.control_stream.on_recv(self.dispatch_control, copy=False)
@@ -640,10 +553,7 @@ def start(self):
640553
self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False)
641554
else:
642555
self.shell_stream.on_recv(
643-
partial(
644-
self.schedule_dispatch,
645-
self.dispatch_shell,
646-
),
556+
partial(self.shell_main, None),
647557
copy=False,
648558
)
649559

@@ -1391,23 +1301,18 @@ async def stop_aborting():
13911301
self.log.info("Finishing abort")
13921302
self._aborting = False
13931303

1394-
# put the stop-aborting event on the message queue
1395-
# so that all messages already waiting in the queue are aborted
1396-
# before we reset the flag
1397-
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)
1398-
13991304
if self.stop_on_error_timeout:
14001305
# if we have a delay, give messages this long to arrive on the queue
14011306
# before we stop aborting requests
1402-
self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting)
1307+
self.io_loop.call_later(self.stop_on_error_timeout, stop_aborting)
14031308
# If we have an eventloop, it may interfere with the call_later above.
14041309
# If the loop has a _schedule_exit method, we call that so the loop exits
14051310
# after stop_on_error_timeout, returning to the main io_loop and letting
14061311
# the call_later fire.
14071312
if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
14081313
self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
14091314
else:
1410-
schedule_stop_aborting()
1315+
self.io_loop.add_callback(stop_aborting)
14111316

14121317
def _send_abort_reply(self, stream, msg, idents):
14131318
"""Send a reply to an aborted request"""

tests/test_ipkernel_direct.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,29 +165,37 @@ def test_dispatch_debugpy(ipkernel: IPythonKernel) -> None:
165165
async def test_start(ipkernel: IPythonKernel) -> None:
166166
shell_future: asyncio.Future = asyncio.Future()
167167

168-
async def fake_dispatch_queue():
169-
shell_future.set_result(None)
168+
def fake_publish_status(status, channel):
169+
if status == "starting" and channel == "shell":
170+
shell_future.set_result(None)
170171

171-
ipkernel.dispatch_queue = fake_dispatch_queue # type:ignore
172+
ipkernel._publish_status = fake_publish_status # type:ignore
172173
ipkernel.start()
173-
ipkernel.debugpy_stream = None
174-
ipkernel.start()
175-
await ipkernel.process_one(False)
174+
176175
await shell_future
177176

177+
shell_stream = ipkernel.shell_stream
178+
assert shell_stream is not None
179+
assert not shell_stream.closed()
180+
178181

179182
async def test_start_no_debugpy(ipkernel: IPythonKernel) -> None:
180183
shell_future: asyncio.Future = asyncio.Future()
181184

182-
async def fake_dispatch_queue():
183-
shell_future.set_result(None)
185+
def fake_publish_status(status, channel):
186+
if status == "starting" and channel == "shell":
187+
shell_future.set_result(None)
184188

185-
ipkernel.dispatch_queue = fake_dispatch_queue # type:ignore
189+
ipkernel._publish_status = fake_publish_status # type:ignore
186190
ipkernel.debugpy_stream = None
187191
ipkernel.start()
188192

189193
await shell_future
190194

195+
shell_stream = ipkernel.shell_stream
196+
assert shell_stream is not None
197+
assert not shell_stream.closed()
198+
191199

192200
def test_create_comm():
193201
assert isinstance(_create_comm(), BaseComm)

0 commit comments

Comments
 (0)