Skip to content

Commit 7b4171c

Browse files
Merge pull request #658 from minrk/flush-control-thread
flush control queue prior to handling shell messages
2 parents 31ec597 + f1748ae commit 7b4171c

File tree

4 files changed

+85
-3
lines changed

4 files changed

+85
-3
lines changed

ipykernel/inprocess/ipkernel.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ async def _abort_queues(self):
8787
""" The in-process kernel doesn't abort requests. """
8888
pass
8989

90+
async def _flush_control_queue(self):
91+
"""No need to flush control queues for in-process"""
92+
pass
93+
9094
def _input_request(self, prompt, ident, parent, password=False):
9195
# Flush output before making the request.
9296
self.raw_input_str = None

ipykernel/kernelbase.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Distributed under the terms of the Modified BSD License.
55

66
import asyncio
7+
import concurrent.futures
78
from datetime import datetime
89
from functools import partial
910
import itertools
@@ -240,8 +241,34 @@ def dispatch_control(self, msg):
240241
async def poll_control_queue(self):
241242
while True:
242243
msg = await self.control_queue.get()
244+
# handle tracers from _flush_control_queue
245+
if isinstance(msg, (concurrent.futures.Future, asyncio.Future)):
246+
msg.set_result(None)
247+
continue
243248
await self.process_control(msg)
244249

250+
async def _flush_control_queue(self):
251+
"""Flush the control queue, wait for processing of any pending messages"""
252+
if self.control_thread:
253+
control_loop = self.control_thread.io_loop
254+
# concurrent.futures.Futures are threadsafe
255+
# and can be used to await across threads
256+
tracer_future = concurrent.futures.Future()
257+
awaitable_future = asyncio.wrap_future(tracer_future)
258+
else:
259+
control_loop = self.io_loop
260+
tracer_future = awaitable_future = asyncio.Future()
261+
262+
def _flush():
263+
# control_stream.flush puts messages on the queue
264+
self.control_stream.flush()
265+
# put Future on the queue after all of those,
266+
# so we can wait for all queued messages to be processed
267+
self.control_queue.put(tracer_future)
268+
269+
control_loop.add_callback(_flush)
270+
return awaitable_future
271+
245272
async def process_control(self, msg):
246273
"""dispatch control requests"""
247274
idents, msg = self.session.feed_identities(msg, copy=False)
@@ -292,6 +319,10 @@ def should_handle(self, stream, msg, idents):
292319

293320
async def dispatch_shell(self, msg):
294321
"""dispatch shell requests"""
322+
323+
# flush control queue before handling shell requests
324+
await self._flush_control_queue()
325+
295326
idents, msg = self.session.feed_identities(msg, copy=False)
296327
try:
297328
msg = self.session.deserialize(msg, content=True, copy=False)

ipykernel/tests/test_kernel.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,3 +417,48 @@ def test_interrupt_during_pdb_set_trace():
417417
# If we failed to interrupt interrupt, this will timeout:
418418
reply = get_reply(kc, msg_id2, TIMEOUT)
419419
validate_message(reply, 'execute_reply', msg_id2)
420+
421+
422+
def test_control_thread_priority():
423+
424+
N = 5
425+
with new_kernel() as kc:
426+
msg_id = kc.execute("pass")
427+
get_reply(kc, msg_id)
428+
429+
sleep_msg_id = kc.execute("import asyncio; await asyncio.sleep(2)")
430+
431+
# submit N shell messages
432+
shell_msg_ids = []
433+
for i in range(N):
434+
shell_msg_ids.append(kc.execute(f"i = {i}"))
435+
436+
# ensure all shell messages have arrived at the kernel before any control messages
437+
time.sleep(0.5)
438+
# at this point, shell messages should be waiting in msg_queue,
439+
# rather than zmq while the kernel is still in the middle of processing
440+
# the first execution
441+
442+
# now send N control messages
443+
control_msg_ids = []
444+
for i in range(N):
445+
msg = kc.session.msg("kernel_info_request", {})
446+
kc.control_channel.send(msg)
447+
control_msg_ids.append(msg["header"]["msg_id"])
448+
449+
# finally, collect the replies on both channels for comparison
450+
sleep_reply = get_reply(kc, sleep_msg_id)
451+
shell_replies = []
452+
for msg_id in shell_msg_ids:
453+
shell_replies.append(get_reply(kc, msg_id))
454+
455+
control_replies = []
456+
for msg_id in control_msg_ids:
457+
control_replies.append(get_reply(kc, msg_id, channel="control"))
458+
459+
# verify that all control messages were handled before all shell messages
460+
shell_dates = [msg["header"]["date"] for msg in shell_replies]
461+
control_dates = [msg["header"]["date"] for msg in control_replies]
462+
# comparing first to last ought to be enough, since queues preserve order
463+
# use <= in case of very-fast handling and/or low resolution timers
464+
assert control_dates[-1] <= shell_dates[0]

ipykernel/tests/utils.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ def flush_channels(kc=None):
5353
validate_message(msg)
5454

5555

56-
def get_reply(kc, msg_id, timeout):
57-
timeout = TIMEOUT
56+
def get_reply(kc, msg_id, timeout=TIMEOUT, channel='shell'):
5857
t0 = time()
5958
while True:
60-
reply = kc.get_shell_msg(timeout=timeout)
59+
get_msg = getattr(kc, f'get_{channel}_msg')
60+
reply = get_msg(timeout=timeout)
6161
if reply['parent_header']['msg_id'] == msg_id:
6262
break
63+
# Allow debugging ignored replies
64+
print(f"Ignoring reply not to {msg_id}: {reply}")
6365
t1 = time()
6466
timeout -= t1 - t0
6567
t0 = t1

0 commit comments

Comments
 (0)