1212import threading
1313import warnings
1414from weakref import WeakSet
15+ import traceback
1516from io import StringIO , TextIOBase
17+ import io
1618
1719import zmq
1820if zmq .pyzmq_version_info () >= (17 , 0 ):
3638# IO classes
3739#-----------------------------------------------------------------------------
3840
41+
3942class IOPubThread (object ):
4043 """An object for sending IOPub messages in a background thread
4144
@@ -284,7 +287,54 @@ class OutStream(TextIOBase):
284287 topic = None
285288 encoding = 'UTF-8'
286289
287- def __init__ (self , session , pub_thread , name , pipe = None , echo = None ):
290+
291+ def fileno (self ):
292+ """
293+ Things like subprocess will peak and write to the fileno() of stderr/stdout.
294+ """
295+ if getattr (self , "_original_stdstream_copy" , None ) is not None :
296+ return self ._original_stdstream_copy
297+ else :
298+ raise io .UnsupportedOperation ("fileno" )
299+
300+ def _watch_pipe_fd (self ):
301+ """
302+ We've redirected standards steams 0 and 1 into a pipe.
303+
304+ We need to watch in a thread and redirect them to the right places.
305+
306+ 1) the ZMQ channels to show in notebook interfaces,
307+ 2) the original stdout/err, to capture errors in terminals.
308+
309+ We cannot schedule this on the ioloop thread, as this might be blocking.
310+
311+ """
312+
313+ try :
314+ bts = os .read (self ._fid , 1000 )
315+ while bts and self ._should_watch :
316+ self .write (bts .decode ())
317+ os .write (self ._original_stdstream_copy , bts )
318+ bts = os .read (self ._fid , 1000 )
319+ except Exception :
320+ self ._exc = sys .exc_info ()
321+
322+ def __init__ (
323+ self , session , pub_thread , name , pipe = None , echo = None , * , watchfd = True
324+ ):
325+ """
326+ Parameters
327+ ----------
328+ name : str {'stderr', 'stdout'}
329+ the name of the standard stream to replace
330+ watchfd : bool (default, True)
331+ Watch the file descripttor corresponding to the replaced stream.
332+ This is useful if you know some underlying code will write directly
333+ the file descriptor by its number. It will spawn a watching thread,
334+ that will swap the give file descriptor for a pipe, read from the
335+ pipe, and insert this into the current Stream.
336+
337+ """
288338 if pipe is not None :
289339 warnings .warn (
290340 "pipe argument to OutStream is deprecated and ignored" ,
@@ -296,8 +346,12 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None):
296346 self .session = session
297347 if not isinstance (pub_thread , IOPubThread ):
298348 # Backward-compat: given socket, not thread. Wrap in a thread.
299- warnings .warn ("OutStream should be created with IOPubThread, not %r" % pub_thread ,
300- DeprecationWarning , stacklevel = 2 )
349+ warnings .warn (
350+ "Since IPykernel 4.3, OutStream should be created with "
351+ "IOPubThread, not %r" % pub_thread ,
352+ DeprecationWarning ,
353+ stacklevel = 2 ,
354+ )
301355 pub_thread = IOPubThread (pub_thread )
302356 pub_thread .start ()
303357 self .pub_thread = pub_thread
@@ -311,19 +365,48 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None):
311365 self ._new_buffer ()
312366 self .echo = None
313367
368+ if (
369+ watchfd
370+ and (sys .platform .startswith ("linux" ) or sys .platform .startswith ("darwin" ))
371+ and ("PYTEST_CURRENT_TEST" not in os .environ )
372+ ):
373+ # Pytest set its own capture. Dont redirect from within pytest.
374+
375+ self ._should_watch = True
376+ self ._setup_stream_redirects (name )
377+
314378 if echo :
315379 if hasattr (echo , 'read' ) and hasattr (echo , 'write' ):
316380 self .echo = echo
317381 else :
318382 raise ValueError ("echo argument must be a file like object" )
319383
384+ def _setup_stream_redirects (self , name ):
385+ pr , pw = os .pipe ()
386+ fno = getattr (sys , name ).fileno ()
387+ self ._original_stdstream_copy = os .dup (fno )
388+ os .dup2 (pw , fno )
389+
390+ self ._fid = pr
391+
392+ self ._exc = None
393+ self .watch_fd_thread = threading .Thread (target = self ._watch_pipe_fd )
394+ self .watch_fd_thread .daemon = True
395+ self .watch_fd_thread .start ()
396+
320397 def _is_master_process (self ):
321398 return os .getpid () == self ._master_pid
322399
323400 def set_parent (self , parent ):
324401 self .parent_header = extract_header (parent )
325402
326403 def close (self ):
404+ if sys .platform .startswith ("linux" ) or sys .platform .startswith ("darwin" ):
405+ self ._should_watch = False
406+ self .watch_fd_thread .join ()
407+ if self ._exc :
408+ etype , value , tb = self ._exc
409+ traceback .print_exception (etype , value , tb )
327410 self .pub_thread = None
328411
329412 @property
0 commit comments