@@ -671,7 +671,7 @@ async def execute_request(self, stream, ident, parent):
671
671
self .log .debug ("%s" , reply_msg )
672
672
673
673
if not silent and reply_msg ['content' ]['status' ] == 'error' and stop_on_error :
674
- await self ._abort_queues ()
674
+ self ._abort_queues ()
675
675
676
676
def do_execute (self , code , silent , store_history = True ,
677
677
user_expressions = None , allow_stdin = False ):
@@ -974,13 +974,31 @@ def _topic(self, topic):
974
974
975
975
_aborting = Bool (False )
976
976
977
- async def _abort_queues (self ):
978
- self .shell_stream .flush ()
977
+ def _abort_queues (self ):
978
+ # while this flag is true,
979
+ # execute requests will be aborted
979
980
self ._aborting = True
981
+ self .log .info ("Aborting queue" )
982
+
983
+ # flush streams, so all currently waiting messages
984
+ # are added to the queue
985
+ self .shell_stream .flush ()
986
+
987
+ # Callback to signal that we are done aborting
980
988
def stop_aborting ():
981
989
self .log .info ("Finishing abort" )
982
990
self ._aborting = False
983
- asyncio .get_event_loop ().call_later (self .stop_on_error_timeout , stop_aborting )
991
+
992
+ # put the stop-aborting event on the message queue
993
+ # so that all messages already waiting in the queue are aborted
994
+ # before we reset the flag
995
+ schedule_stop_aborting = partial (self .schedule_dispatch , stop_aborting )
996
+
997
+ # if we have a delay, give messages this long to arrive on the queue
998
+ # before we stop aborting requests
999
+ asyncio .get_event_loop ().call_later (
1000
+ self .stop_on_error_timeout , schedule_stop_aborting
1001
+ )
984
1002
985
1003
def _send_abort_reply (self , stream , msg , idents ):
986
1004
"""Send a reply to an aborted request"""
0 commit comments