Skip to content

Commit 27c767e

Browse files
authored
Merge pull request #464 from minrk/outstanding-race
populate outstanding, history before sending
2 parents ad60d9b + 947665f commit 27c767e

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

ipyparallel/client/client.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,7 @@ def _send(
11031103
track=False,
11041104
header=None,
11051105
metadata=None,
1106+
track_outstanding=False,
11061107
):
11071108
"""Send a message in the IO thread
11081109
@@ -1114,6 +1115,22 @@ def _send(
11141115
)
11151116
msg_id = msg['header']['msg_id']
11161117

1118+
if track_outstanding:
1119+
# add to outstanding, history
1120+
self.outstanding.add(msg_id)
1121+
self.history.append(msg_id)
1122+
1123+
if ident:
1124+
# possibly routed to a specific engine
1125+
ident_str = ident
1126+
if isinstance(ident_str, list):
1127+
ident_str = ident_str[-1]
1128+
ident_str = ident_str.decode("utf-8")
1129+
if ident_str in self._engines.values():
1130+
# save for later, in case of engine death
1131+
self._outstanding_dict[ident_str].add(msg_id)
1132+
self.metadata['submitted'] = util.utcnow()
1133+
11171134
futures = self.create_message_futures(
11181135
msg_id,
11191136
async_result=msg_type in {'execute_request', 'apply_request'},
@@ -1599,19 +1616,9 @@ def send_apply_request(
15991616
ident=ident,
16001617
metadata=metadata,
16011618
track=track,
1619+
track_outstanding=True,
16021620
)
1603-
16041621
msg_id = future.msg_id
1605-
self.outstanding.add(msg_id)
1606-
if ident:
1607-
# possibly routed to a specific engine
1608-
if isinstance(ident, list):
1609-
ident = ident[-1]
1610-
ident = ident.decode("utf-8")
1611-
if ident in self._engines.values():
1612-
# save for later, in case of engine death
1613-
self._outstanding_dict[ident].add(msg_id)
1614-
self.history.append(msg_id)
16151622

16161623
return future
16171624

@@ -1637,22 +1644,14 @@ def send_execute_request(
16371644
content = dict(code=code, silent=bool(silent), user_expressions={})
16381645

16391646
future = self._send(
1640-
socket, "execute_request", content=content, ident=ident, metadata=metadata
1647+
socket,
1648+
"execute_request",
1649+
content=content,
1650+
ident=ident,
1651+
metadata=metadata,
1652+
track_outstanding=True,
16411653
)
16421654

1643-
msg_id = future.msg_id
1644-
self.outstanding.add(msg_id)
1645-
if ident:
1646-
# possibly routed to a specific engine
1647-
if isinstance(ident, list):
1648-
ident = ident[-1]
1649-
ident = ident.decode("utf-8")
1650-
if ident in self._engines.values():
1651-
# save for later, in case of engine death
1652-
self._outstanding_dict[ident].add(msg_id)
1653-
self.history.append(msg_id)
1654-
self.metadata[msg_id]['submitted'] = util.utcnow()
1655-
16561655
return future
16571656

16581657
# --------------------------------------------------------------------------

0 commit comments

Comments
 (0)