@@ -417,3 +417,48 @@ def test_interrupt_during_pdb_set_trace():
417417 # If we failed to interrupt interrupt, this will timeout:
418418 reply = get_reply (kc , msg_id2 , TIMEOUT )
419419 validate_message (reply , 'execute_reply' , msg_id2 )
420+
421+
422+ def test_control_thread_priority ():
423+
424+ N = 5
425+ with new_kernel () as kc :
426+ msg_id = kc .execute ("pass" )
427+ get_reply (kc , msg_id )
428+
429+ sleep_msg_id = kc .execute ("import asyncio; await asyncio.sleep(2)" )
430+
431+ # submit N shell messages
432+ shell_msg_ids = []
433+ for i in range (N ):
434+ shell_msg_ids .append (kc .execute (f"i = { i } " ))
435+
436+ # ensure all shell messages have arrived at the kernel before any control messages
437+ time .sleep (0.5 )
438+ # at this point, shell messages should be waiting in msg_queue,
439+ # rather than zmq while the kernel is still in the middle of processing
440+ # the first execution
441+
442+ # now send N control messages
443+ control_msg_ids = []
444+ for i in range (N ):
445+ msg = kc .session .msg ("kernel_info_request" , {})
446+ kc .control_channel .send (msg )
447+ control_msg_ids .append (msg ["header" ]["msg_id" ])
448+
449+ # finally, collect the replies on both channels for comparison
450+ sleep_reply = get_reply (kc , sleep_msg_id )
451+ shell_replies = []
452+ for msg_id in shell_msg_ids :
453+ shell_replies .append (get_reply (kc , msg_id ))
454+
455+ control_replies = []
456+ for msg_id in control_msg_ids :
457+ control_replies .append (get_reply (kc , msg_id , channel = "control" ))
458+
459+ # verify that all control messages were handled before all shell messages
460+ shell_dates = [msg ["header" ]["date" ] for msg in shell_replies ]
461+ control_dates = [msg ["header" ]["date" ] for msg in control_replies ]
462+ # comparing first to last ought to be enough, since queues preserve order
463+ # use <= in case of very-fast handling and/or low resolution timers
464+ assert control_dates [- 1 ] <= shell_dates [0 ]
0 commit comments