Skip to content

Commit 9b41b31

Browse files
minrksahil1105
authored andcommitted
review on stream_output
private methods, unregister callbacks, minor cleanup
1 parent 5b6af40 commit 9b41b31

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

ipyparallel/client/asyncresult.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from concurrent.futures import Future
1010
from contextlib import contextmanager
1111
from datetime import datetime
12+
from functools import partial
1213
from threading import Event
1314

1415
try:
@@ -135,17 +136,18 @@ def _init_futures(self):
135136
self._output_future.add_done_callback(self._resolve_output)
136137
self.add_done_callback(self._finalize_result)
137138

138-
def iopub_streaming_output_callback(self, eid, msg):
139+
def _iopub_streaming_output_callback(self, eid, msg):
140+
"""Callback registered during AsyncResult.stream_output()"""
139141
msg_type = msg['header']['msg_type']
140142
if msg_type == 'stream':
141143
msg_content = msg['content']
142144
stream_name = msg_content['name']
143-
if stream_name == 'stdout':
144-
self._display_stream(msg_content['text'], '[stdout:%i] ' % eid)
145-
elif stream_name == 'stderr':
146-
self._display_stream(
147-
msg_content['text'], '[stderr:%i] ' % eid, file=sys.stderr
148-
)
145+
stream = getattr(sys, stream_name, sys.stdout)
146+
self._display_stream(
147+
msg_content['text'],
148+
f'[{stream_name}:{eid}] ',
149+
file=stream,
150+
)
149151

150152
if get_ipython() is None:
151153
return
@@ -157,25 +159,25 @@ def iopub_streaming_output_callback(self, eid, msg):
157159

158160
@contextmanager
159161
def stream_output(self):
160-
"""
161-
Context manager that adds a iopub callback to stream stdout/stderr
162-
(instead of displaying it all at the end).
163-
"""
162+
"""Stream output for this result as it arrives.
164163
165-
from functools import partial
164+
Returns a context manager, during which output is streamed.
165+
"""
166166

167167
# Keep a handle on the futures so we can remove the callback later
168-
msg_futures = []
168+
future_callbacks = {}
169169
for eid, msg_id in zip(self._targets, self.msg_ids):
170-
callback_func = partial(self.iopub_streaming_output_callback, eid)
171-
self._client._futures[msg_id].iopub_callbacks.append(callback_func)
172-
msg_futures.append(self._client._futures[msg_id])
173-
174-
yield
170+
callback_func = partial(self._iopub_streaming_output_callback, eid)
171+
f = self._client._futures[msg_id]
172+
future_callbacks[f] = callback_func
173+
f.iopub_callbacks.append(callback_func)
175174

176-
# Remove the callback
177-
for msg_future in msg_futures:
178-
msg_future.iopub_callbacks.pop()
175+
try:
176+
yield
177+
finally:
178+
# Remove the callback
179+
for msg_future, callback in future_callbacks.items():
180+
msg_future.iopub_callbacks.remove(callback)
179181

180182
def __repr__(self):
181183
if self._ready:

ipyparallel/client/magics.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,9 @@ def output_args(f):
178178
choices=['engine', 'order', 'type'],
179179
default='type',
180180
help="""Group the outputs in a particular way.
181-
181+
182182
Choices are:
183-
183+
184184
**type**: group outputs of all engines by type (stdout, stderr, displaypub, etc.).
185185
**engine**: display all output for each engine together.
186186
**order**: like type, but individual displaypub output from each engine is collated.
@@ -345,8 +345,8 @@ def parallel_execute(
345345
with cm:
346346
result.wait_for_output()
347347
result.get()
348-
# Skip stdout/stderr if streaming stdout/stderr
349-
result.display_outputs(groupby, result_only=bool(stream_output))
348+
# Skip stdout/stderr if streaming output
349+
result.display_outputs(groupby, result_only=stream_output)
350350
else:
351351
# return AsyncResult only on non-blocking submission
352352
return result

0 commit comments

Comments
 (0)