1+ import builtins
12import dis
23import inspect
4+ import io
5+ import sys
6+ import threading
37import weakref
8+ from functools import partial
49
510import innerscope
611from dask import distributed
712
813from . import reprs
914
15+
16+ def _supports_async_output ():
17+ if reprs .is_kernel () and not reprs .in_terminal ():
18+ try :
19+ import ipywidgets # noqa
20+ except ImportError :
21+ return False
22+ return True
23+ return False
24+
25+
1026_errors_to_locations = {}
1127try :
1228 remotely
@@ -134,11 +150,13 @@ def __enter__(self):
134150 # Try to fine the source if we are in %%time or %%timeit magic
135151 if (
136152 self ._frame .f_code .co_filename in {"<timed exec>" , "<magic-timeit>" }
137- and reprs .in_ipython ()
153+ and reprs .is_kernel ()
138154 ):
139- import IPython
155+ from IPython import get_ipython
140156
141- ip = IPython .get_ipython ()
157+ ip = get_ipython ()
158+ if ip is None :
159+ raise
142160 cell = ip .history_manager ._i00 # The current cell!
143161 lines = cell .splitlines (keepends = True )
144162 # strip the magic
@@ -230,6 +248,7 @@ def _exit(self, exc_type, exc_value, exc_traceback):
230248 self .context_body = get_body (self ._lines [self ._body_start : endline ])
231249 self ._magic_func , names , futures = abracadabra (self )
232250 display_expr = self ._magic_func ._display_expr
251+ has_print = "print" in self ._magic_func ._scoped .builtin_names
233252
234253 if self ._where == "remotely" :
235254 if client is None :
@@ -265,12 +284,19 @@ def _exit(self, exc_type, exc_value, exc_traceback):
265284 weak_futures .add (remote_dict )
266285 magic_func .release () # Let go ASAP
267286 if display_expr :
268- repr_val = client .submit (
287+ repr_future = client .submit (
269288 reprs .repr_afar ,
270289 client .submit (get_afar , remote_dict , "_afar_return_value_" ),
271290 self ._magic_func ._repr_methods ,
272291 )
273- weak_futures .add (repr_val )
292+ weak_futures .add (repr_future )
293+ else :
294+ repr_future = None
295+ if display_expr or has_print or _supports_async_output ():
296+ stdout_future = client .submit (get_afar , remote_dict , "_afar_stdout_" )
297+ weak_futures .add (stdout_future )
298+ stderr_future = client .submit (get_afar , remote_dict , "_afar_stderr_" )
299+ weak_futures .add (stderr_future )
274300 if self ._gather_data :
275301 futures_to_name = {
276302 client .submit (get_afar , remote_dict , name , ** submit_kwargs ): name
@@ -286,16 +312,42 @@ def _exit(self, exc_type, exc_value, exc_traceback):
286312 weak_futures .add (future )
287313 self .data [name ] = future
288314 remote_dict .release () # Let go ASAP
289- if display_expr :
290- reprs .display_repr (repr_val .result ()) # This blocks!
291- repr_val .release ()
315+
316+ if _supports_async_output ():
317+ # Display in `out` cell when data is ready: non-blocking
318+ from IPython .display import display
319+ from ipywidgets import Output
320+
321+ out = Output ()
322+ display (out )
323+ # Can we show `distributed.progress` right here?
324+ stdout_future .add_done_callback (
325+ partial (_display_outputs , out , stderr_future , repr_future )
326+ )
327+ elif display_expr or has_print :
328+ # blocks!
329+ stdout_val = stdout_future .result ()
330+ stdout_future .release ()
331+ if stdout_val :
332+ print (stdout_val , end = "" )
333+ stderr_val = stderr_future .result ()
334+ stderr_future .release ()
335+ if stderr_val :
336+ print (stderr_val , end = "" , file = sys .stderr )
337+ if display_expr :
338+ repr_val = repr_future .result ()
339+ repr_future .release ()
340+ if repr_val is not None :
341+ reprs .display_repr (repr_val )
292342 elif self ._where == "locally" :
293343 # Run locally. This is handy for testing and debugging.
294344 results = self ._magic_func ()
295345 for name in names :
296346 self .data [name ] = results [name ]
297347 if display_expr :
298- reprs .IPython .display .display (results .return_value )
348+ from IPython .dislpay import display
349+
350+ display (results .return_value )
299351 elif self ._where == "later" :
300352 return True
301353 else :
@@ -325,11 +377,28 @@ class Get(Run):
325377 _gather_data = True
326378
327379
380+ def _display_outputs (out , stderr_future , repr_future , stdout_future ):
381+ stdout_val = stdout_future .result ()
382+ stderr_val = stderr_future .result ()
383+ if repr_future is not None :
384+ repr_val = repr_future .result ()
385+ else :
386+ repr_val = None
387+ if stdout_val or stderr_val or repr_val is not None :
388+ with out :
389+ if stdout_val :
390+ print (stdout_val , end = "" )
391+ if stderr_val :
392+ print (stderr_val , end = "" , file = sys .stderr )
393+ if repr_val is not None :
394+ reprs .display_repr (repr_val )
395+
396+
328397def abracadabra (runner ):
329398 # Create a new function from the code block of the context.
330399 # For now, we require that the source code is available.
331400 source = "def _afar_magic_():\n " + "" .join (runner .context_body )
332- func , display_expr = create_func (source , runner ._frame .f_globals , reprs .in_ipython ())
401+ func , display_expr = create_func (source , runner ._frame .f_globals , reprs .is_kernel ())
333402
334403 # If no variable names were given, only get the last assignment
335404 names = runner .names
@@ -405,12 +474,59 @@ def __setstate__(self, state):
405474 self ._scoped = innerscope .scoped_function (func , outer_scope )
406475
407476
477+ # Here's the plan: we'll capture all print statements to stdout and stderr
478+ # on the current thread. But, we need to leave the other threads alone!
479+ # So, use `threading.local` and a lock for some ugly capturing.
480+ class LocalPrint (threading .local ):
481+ printer = None
482+
483+ def __call__ (self , * args , ** kwargs ):
484+ return self .printer (* args , ** kwargs )
485+
486+
487+ class RecordPrint :
488+ n = 0
489+ local_print = LocalPrint ()
490+ print_lock = threading .Lock ()
491+
492+ def __init__ (self ):
493+ self .stdout = io .StringIO ()
494+ self .stderr = io .StringIO ()
495+
496+ def __enter__ (self ):
497+ with self .print_lock :
498+ if RecordPrint .n == 0 :
499+ LocalPrint .printer = builtins .print
500+ builtins .print = self .local_print
501+ RecordPrint .n += 1
502+ self .local_print .printer = self
503+ return self
504+
505+ def __exit__ (self , exc_type , exc_value , exc_traceback ):
506+ with self .print_lock :
507+ RecordPrint .n -= 1
508+ if RecordPrint .n == 0 :
509+ builtins .print = LocalPrint .printer
510+ self .local_print .printer = LocalPrint .printer
511+ return False
512+
513+ def __call__ (self , * args , file = None , ** kwargs ):
514+ if file is None or file is sys .stdout :
515+ file = self .stdout
516+ elif file is sys .stderr :
517+ file = self .stderr
518+ LocalPrint .printer (* args , ** kwargs , file = file )
519+
520+
408521def run_afar (magic_func , names , futures ):
409522 sfunc = magic_func ._scoped .bind (futures )
410- results = sfunc ()
523+ with RecordPrint () as rec :
524+ results = sfunc ()
411525 rv = {key : results [key ] for key in names }
412526 if magic_func ._display_expr :
413527 rv ["_afar_return_value_" ] = results .return_value
528+ rv ["_afar_stdout_" ] = rec .stdout .getvalue ()
529+ rv ["_afar_stderr_" ] = rec .stderr .getvalue ()
414530 return rv
415531
416532
0 commit comments