@@ -1103,6 +1103,7 @@ def _send(
1103
1103
track = False ,
1104
1104
header = None ,
1105
1105
metadata = None ,
1106
+ track_outstanding = False ,
1106
1107
):
1107
1108
"""Send a message in the IO thread
1108
1109
@@ -1114,6 +1115,22 @@ def _send(
1114
1115
)
1115
1116
msg_id = msg ['header' ]['msg_id' ]
1116
1117
1118
+ if track_outstanding :
1119
+ # add to outstanding, history
1120
+ self .outstanding .add (msg_id )
1121
+ self .history .append (msg_id )
1122
+
1123
+ if ident :
1124
+ # possibly routed to a specific engine
1125
+ ident_str = ident
1126
+ if isinstance (ident_str , list ):
1127
+ ident_str = ident_str [- 1 ]
1128
+ ident_str = ident_str .decode ("utf-8" )
1129
+ if ident_str in self ._engines .values ():
1130
+ # save for later, in case of engine death
1131
+ self ._outstanding_dict [ident_str ].add (msg_id )
1132
+ self .metadata ['submitted' ] = util .utcnow ()
1133
+
1117
1134
futures = self .create_message_futures (
1118
1135
msg_id ,
1119
1136
async_result = msg_type in {'execute_request' , 'apply_request' },
@@ -1599,19 +1616,9 @@ def send_apply_request(
1599
1616
ident = ident ,
1600
1617
metadata = metadata ,
1601
1618
track = track ,
1619
+ track_outstanding = True ,
1602
1620
)
1603
-
1604
1621
msg_id = future .msg_id
1605
- self .outstanding .add (msg_id )
1606
- if ident :
1607
- # possibly routed to a specific engine
1608
- if isinstance (ident , list ):
1609
- ident = ident [- 1 ]
1610
- ident = ident .decode ("utf-8" )
1611
- if ident in self ._engines .values ():
1612
- # save for later, in case of engine death
1613
- self ._outstanding_dict [ident ].add (msg_id )
1614
- self .history .append (msg_id )
1615
1622
1616
1623
return future
1617
1624
@@ -1637,22 +1644,14 @@ def send_execute_request(
1637
1644
content = dict (code = code , silent = bool (silent ), user_expressions = {})
1638
1645
1639
1646
future = self ._send (
1640
- socket , "execute_request" , content = content , ident = ident , metadata = metadata
1647
+ socket ,
1648
+ "execute_request" ,
1649
+ content = content ,
1650
+ ident = ident ,
1651
+ metadata = metadata ,
1652
+ track_outstanding = True ,
1641
1653
)
1642
1654
1643
- msg_id = future .msg_id
1644
- self .outstanding .add (msg_id )
1645
- if ident :
1646
- # possibly routed to a specific engine
1647
- if isinstance (ident , list ):
1648
- ident = ident [- 1 ]
1649
- ident = ident .decode ("utf-8" )
1650
- if ident in self ._engines .values ():
1651
- # save for later, in case of engine death
1652
- self ._outstanding_dict [ident ].add (msg_id )
1653
- self .history .append (msg_id )
1654
- self .metadata [msg_id ]['submitted' ] = util .utcnow ()
1655
-
1656
1655
return future
1657
1656
1658
1657
# --------------------------------------------------------------------------
0 commit comments