Skip to content

Commit 87ba3a0

Browse files
authored
Merge pull request #457 from Bodo-inc/sahil/strm-out-expl
Basic Streaming Output Implementation
2 parents cb0d510 + 9b41b31 commit 87ba3a0

File tree

5 files changed

+200
-43
lines changed

5 files changed

+200
-43
lines changed

ipyparallel/client/asyncresult.py

Lines changed: 91 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import threading
88
import time
99
from concurrent.futures import Future
10+
from contextlib import contextmanager
1011
from datetime import datetime
12+
from functools import partial
1113
from threading import Event
1214

1315
try:
@@ -134,6 +136,49 @@ def _init_futures(self):
134136
self._output_future.add_done_callback(self._resolve_output)
135137
self.add_done_callback(self._finalize_result)
136138

139+
def _iopub_streaming_output_callback(self, eid, msg):
140+
"""Callback registered during AsyncResult.stream_output()"""
141+
msg_type = msg['header']['msg_type']
142+
if msg_type == 'stream':
143+
msg_content = msg['content']
144+
stream_name = msg_content['name']
145+
stream = getattr(sys, stream_name, sys.stdout)
146+
self._display_stream(
147+
msg_content['text'],
148+
f'[{stream_name}:{eid}] ',
149+
file=stream,
150+
)
151+
152+
if get_ipython() is None:
153+
return
154+
155+
if msg_type == 'display_data':
156+
msg_content = msg['content']
157+
_raw_text('[output:%i]' % eid)
158+
self._republish_displaypub(msg_content, eid)
159+
160+
@contextmanager
161+
def stream_output(self):
162+
"""Stream output for this result as it arrives.
163+
164+
Returns a context manager, during which output is streamed.
165+
"""
166+
167+
# Keep a handle on the futures so we can remove the callback later
168+
future_callbacks = {}
169+
for eid, msg_id in zip(self._targets, self.msg_ids):
170+
callback_func = partial(self._iopub_streaming_output_callback, eid)
171+
f = self._client._futures[msg_id]
172+
future_callbacks[f] = callback_func
173+
f.iopub_callbacks.append(callback_func)
174+
175+
try:
176+
yield
177+
finally:
178+
# Remove the callback
179+
for msg_future, callback in future_callbacks.items():
180+
msg_future.iopub_callbacks.remove(callback)
181+
137182
def __repr__(self):
138183
if self._ready:
139184
return "<%s: %s:finished>" % (self.__class__.__name__, self._fname)
@@ -558,21 +603,23 @@ def _display_stream(self, text, prefix='', file=None):
558603
prefix = prefix + '\n'
559604
print("%s%s" % (prefix, text), file=file, end=end)
560605

561-
def _display_single_result(self):
562-
self._display_stream(self.stdout)
563-
self._display_stream(self.stderr, file=sys.stderr)
606+
def _display_single_result(self, result_only=False):
607+
if not result_only:
608+
self._display_stream(self.stdout)
609+
self._display_stream(self.stderr, file=sys.stderr)
564610
if get_ipython() is None:
565611
# displaypub is meaningless outside IPython
566612
return
567613

568-
for output in self.outputs:
569-
self._republish_displaypub(output, self.engine_id)
614+
if not result_only:
615+
for output in self.outputs:
616+
self._republish_displaypub(output, self.engine_id)
570617

571618
if self.execute_result is not None:
572619
display(self.get())
573620

574621
@check_ready
575-
def display_outputs(self, groupby="type"):
622+
def display_outputs(self, groupby="type", result_only=False):
576623
"""republish the outputs of the computation
577624
578625
Parameters
@@ -598,10 +645,15 @@ def display_outputs(self, groupby="type"):
598645
outputs. This is meant for cases of each command producing
599646
several plots, and you would like to see all of the first
600647
plots together, then all of the second plots, and so on.
648+
649+
result_only: boolean [default: False]
650+
Only display the execution result and skip stdout, stderr and
651+
display-outputs. Usually used when using streaming output
652+
since these outputs would have already been displayed.
601653
"""
602654
self.wait_for_output()
603655
if self._single_result:
604-
self._display_single_result()
656+
self._display_single_result(result_only=result_only)
605657
return
606658

607659
stdouts = self.stdout
@@ -616,53 +668,57 @@ def display_outputs(self, groupby="type"):
616668
for eid, stdout, stderr, outputs, r, execute_result in zip(
617669
targets, stdouts, stderrs, output_lists, results, execute_results
618670
):
619-
self._display_stream(stdout, '[stdout:%i] ' % eid)
620-
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
671+
if not result_only:
672+
self._display_stream(stdout, '[stdout:%i] ' % eid)
673+
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
621674

622675
if get_ipython() is None:
623676
# displaypub is meaningless outside IPython
624677
continue
625678

626-
if outputs or execute_result is not None:
679+
if (outputs and not result_only) or execute_result is not None:
627680
_raw_text('[output:%i]' % eid)
628681

629-
for output in outputs:
630-
self._republish_displaypub(output, eid)
682+
if not result_only:
683+
for output in outputs:
684+
self._republish_displaypub(output, eid)
631685

632686
if execute_result is not None:
633687
display(r)
634688

635689
elif groupby in ('type', 'order'):
636-
# republish stdout:
637-
for eid, stdout in zip(targets, stdouts):
638-
self._display_stream(stdout, '[stdout:%i] ' % eid)
690+
if not result_only:
691+
# republish stdout:
692+
for eid, stdout in zip(targets, stdouts):
693+
self._display_stream(stdout, '[stdout:%i] ' % eid)
639694

640-
# republish stderr:
641-
for eid, stderr in zip(targets, stderrs):
642-
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
695+
# republish stderr:
696+
for eid, stderr in zip(targets, stderrs):
697+
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
643698

644699
if get_ipython() is None:
645700
# displaypub is meaningless outside IPython
646701
return
647702

648-
if groupby == 'order':
649-
output_dict = dict(
650-
(eid, outputs) for eid, outputs in zip(targets, output_lists)
651-
)
652-
N = max(len(outputs) for outputs in output_lists)
653-
for i in range(N):
654-
for eid in targets:
655-
outputs = output_dict[eid]
656-
if len(outputs) >= N:
703+
if not result_only:
704+
if groupby == 'order':
705+
output_dict = dict(
706+
(eid, outputs) for eid, outputs in zip(targets, output_lists)
707+
)
708+
N = max(len(outputs) for outputs in output_lists)
709+
for i in range(N):
710+
for eid in targets:
711+
outputs = output_dict[eid]
712+
if len(outputs) >= N:
713+
_raw_text('[output:%i]' % eid)
714+
self._republish_displaypub(outputs[i], eid)
715+
else:
716+
# republish displaypub output
717+
for eid, outputs in zip(targets, output_lists):
718+
if outputs:
657719
_raw_text('[output:%i]' % eid)
658-
self._republish_displaypub(outputs[i], eid)
659-
else:
660-
# republish displaypub output
661-
for eid, outputs in zip(targets, output_lists):
662-
if outputs:
663-
_raw_text('[output:%i]' % eid)
664-
for output in outputs:
665-
self._republish_displaypub(output, eid)
720+
for output in outputs:
721+
self._republish_displaypub(output, eid)
666722

667723
# finally, add execute_result:
668724
for eid, r, execute_result in zip(targets, results, execute_results):

ipyparallel/client/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,12 @@ def _dispatch_iopub(self, msg):
10711071
# unhandled msg_type (status, etc.)
10721072
pass
10731073

1074+
msg_future = self._futures.get(msg_id, None)
1075+
if msg_future:
1076+
# Run any callback functions
1077+
for callback in msg_future.iopub_callbacks:
1078+
callback(msg)
1079+
10741080
def create_message_futures(self, msg_id, async_result=False, track=False):
10751081
msg_future = MessageFuture(msg_id, track=track)
10761082
futures = [msg_future]

ipyparallel/client/futures.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def __init__(self, msg_id, track=False):
1818
self.track = track
1919
self._tracker = None
2020
self.tracker = Future()
21+
self.iopub_callbacks = []
2122
if not track:
2223
self.tracker.set_result(None)
2324
self.add_done_callback(lambda f: self._evt.set())

ipyparallel/client/magics.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
from __future__ import print_function
3030

3131
import ast
32+
from contextlib import contextmanager
33+
34+
# Python 3.6 doesn't have nullcontext, so we define our own
35+
@contextmanager
36+
def nullcontext():
37+
yield
38+
3239

3340
# -----------------------------------------------------------------------------
3441
# Copyright (C) 2008 The IPython Development Team
@@ -97,6 +104,20 @@ def exec_args(f):
97104
dest='block',
98105
help="use non-blocking (async) execution",
99106
),
107+
magic_arguments.argument(
108+
'--stream',
109+
action="store_const",
110+
const=True,
111+
dest='stream',
112+
help="stream stdout/stderr in real-time (only valid when using blocking execution)",
113+
),
114+
magic_arguments.argument(
115+
'--no-stream',
116+
action="store_const",
117+
const=False,
118+
dest='stream',
119+
help="do not stream stdout/stderr in real-time",
120+
),
100121
magic_arguments.argument(
101122
'-t',
102123
'--targets',
@@ -157,9 +178,9 @@ def output_args(f):
157178
choices=['engine', 'order', 'type'],
158179
default='type',
159180
help="""Group the outputs in a particular way.
160-
181+
161182
Choices are:
162-
183+
163184
**type**: group outputs of all engines by type (stdout, stderr, displaypub, etc.).
164185
**engine**: display all output for each engine together.
165186
**order**: like type, but individual displaypub output from each engine is collated.
@@ -199,6 +220,8 @@ class ParallelMagics(Magics):
199220
last_result = None
200221
# verbose flag
201222
verbose = False
223+
# streaming output flag
224+
stream_ouput = True
202225

203226
def __init__(self, shell, view, suffix=''):
204227
self.view = view
@@ -242,6 +265,8 @@ def pxconfig(self, line):
242265
self.view.block = args.block
243266
if args.set_verbose is not None:
244267
self.verbose = args.set_verbose
268+
if args.stream is not None:
269+
self.stream_ouput = args.stream
245270

246271
@magic_arguments.magic_arguments()
247272
@output_args
@@ -290,11 +315,14 @@ def px(self, line=''):
290315
"""
291316
return self.parallel_execute(line)
292317

293-
def parallel_execute(self, cell, block=None, groupby='type', save_name=None):
318+
def parallel_execute(
319+
self, cell, block=None, groupby='type', save_name=None, stream_output=None
320+
):
294321
"""implementation used by %px and %%parallel"""
295322

296323
# defaults:
297324
block = self.view.block if block is None else block
325+
stream_output = self.stream_ouput if stream_output is None else stream_output
298326

299327
base = "Parallel" if block else "Async parallel"
300328

@@ -313,8 +341,12 @@ def parallel_execute(self, cell, block=None, groupby='type', save_name=None):
313341
self.shell.user_ns[save_name] = result
314342

315343
if block:
316-
result.get()
317-
result.display_outputs(groupby)
344+
cm = result.stream_output() if stream_output else nullcontext()
345+
with cm:
346+
result.wait_for_output()
347+
result.get()
348+
# Skip stdout/stderr if streaming output
349+
result.display_outputs(groupby, result_only=stream_output)
318350
else:
319351
# return AsyncResult only on non-blocking submission
320352
return result
@@ -354,6 +386,7 @@ def cell_px(self, line='', cell=None):
354386
block=block,
355387
groupby=args.groupby,
356388
save_name=args.save_name,
389+
stream_output=args.stream,
357390
)
358391
finally:
359392
if args.targets:

0 commit comments

Comments
 (0)