Skip to content

Commit 6f241d2

Browse files
committed
flush control queue prior to handling shell messages
preserves control channel priority over shell channel - when control thread is running, resolves message-processing races - when control thread is not running, ensures control messages are processed before shell requests
1 parent 317159f commit 6f241d2

File tree

1 file changed

+33
-2
lines changed

1 file changed

+33
-2
lines changed

ipykernel/kernelbase.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Distributed under the terms of the Modified BSD License.
55

66
import asyncio
7+
import concurrent.futures
78
from datetime import datetime
89
from functools import partial
910
import itertools
@@ -213,8 +214,34 @@ def dispatch_control(self, msg):
213214
async def poll_control_queue(self):
214215
while True:
215216
msg = await self.control_queue.get()
217+
# handle tracers from _flush_control_queue
218+
if isinstance(msg, (concurrent.futures.Future, asyncio.Future)):
219+
msg.set_result(None)
220+
continue
216221
await self.process_control(msg)
217222

223+
async def _flush_control_queue(self):
224+
"""Flush the control queue, wait for processing of any pending messages"""
225+
if self.control_thread:
226+
control_loop = self.control_thread.io_loop
227+
# concurrent.futures.Futures are threadsafe
228+
# and can be used to await across threads
229+
tracer_future = concurrent.futures.Future()
230+
awaitable_future = asyncio.wrap_future(tracer_future)
231+
else:
232+
control_loop = self.io_loop
233+
tracer_future = awaitable_future = asyncio.Future()
234+
235+
def _flush():
236+
# control_stream.flush puts messages on the queue
237+
self.control_stream.flush()
238+
# put Future on the queue after all of those,
239+
# so we can wait for all queued messages to be processed
240+
self.control_queue.put(tracer_future)
241+
242+
control_loop.add_callback(_flush)
243+
return awaitable_future
244+
218245
async def process_control(self, msg):
219246
"""dispatch control requests"""
220247
idents, msg = self.session.feed_identities(msg, copy=False)
@@ -265,6 +292,10 @@ def should_handle(self, stream, msg, idents):
265292

266293
async def dispatch_shell(self, msg):
267294
"""dispatch shell requests"""
295+
296+
# flush control queue before handling shell requests
297+
await self._flush_control_queue()
298+
268299
idents, msg = self.session.feed_identities(msg, copy=False)
269300
try:
270301
msg = self.session.deserialize(msg, content=True, copy=False)
@@ -630,7 +661,7 @@ async def inspect_request(self, stream, ident, parent):
630661
content.get('detail_level', 0),
631662
)
632663
if inspect.isawaitable(reply_content):
633-
reply_content = await reply_content
664+
reply_content = await reply_content
634665

635666
# Before we send this object over, we scrub it for JSON usage
636667
reply_content = json_clean(reply_content)
@@ -944,7 +975,7 @@ def _input_request(self, prompt, ident, parent, password=False):
944975
raise KeyboardInterrupt("Interrupted by user") from None
945976
except Exception as e:
946977
self.log.warning("Invalid Message:", exc_info=True)
947-
978+
948979
try:
949980
value = reply["content"]["value"]
950981
except Exception:

0 commit comments

Comments
 (0)