diff --git a/ipykernel/inprocess/tests/test_kernel.py b/ipykernel/inprocess/tests/test_kernel.py index 4d42e797b..c2904a006 100644 --- a/ipykernel/inprocess/tests/test_kernel.py +++ b/ipykernel/inprocess/tests/test_kernel.py @@ -99,6 +99,24 @@ def test_stdout(self): out, err = assemble_output(kc.get_iopub_msg) assert out == 'bar\n' + @pytest.mark.skip( + reason="Currently don't capture during test as pytest does its own capturing" + ) + def test_capfd(self): + """Does correctly capture fd""" + kernel = InProcessKernel() + + with capture_output() as io: + kernel.shell.run_cell('print("foo")') + assert io.stdout == "foo\n" + + kc = BlockingInProcessKernelClient(kernel=kernel, session=kernel.session) + kernel.frontends.append(kc) + kc.execute("import os") + kc.execute('os.system("echo capfd")') + out, err = assemble_output(kc.iopub_channel) + assert out == "capfd\n" + def test_getpass_stream(self): "Tests that kernel getpass accept the stream parameter" kernel = InProcessKernel() diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index ab6a2f707..0697c6ef8 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -12,7 +12,9 @@ import threading import warnings from weakref import WeakSet +import traceback from io import StringIO, TextIOBase +import io import zmq if zmq.pyzmq_version_info() >= (17, 0): @@ -36,6 +38,7 @@ # IO classes #----------------------------------------------------------------------------- + class IOPubThread(object): """An object for sending IOPub messages in a background thread @@ -285,7 +288,54 @@ class OutStream(TextIOBase): topic = None encoding = 'UTF-8' - def __init__(self, session, pub_thread, name, pipe=None, echo=None): + + def fileno(self): + """ + Things like subprocess will peak and write to the fileno() of stderr/stdout. + """ + if getattr(self, "_original_stdstream_copy", None) is not None: + return self._original_stdstream_copy + else: + raise io.UnsupportedOperation("fileno") + + def _watch_pipe_fd(self): + """ + We've redirected standards steams 0 and 1 into a pipe. + + We need to watch in a thread and redirect them to the right places. + + 1) the ZMQ channels to show in notebook interfaces, + 2) the original stdout/err, to capture errors in terminals. + + We cannot schedule this on the ioloop thread, as this might be blocking. + + """ + + try: + bts = os.read(self._fid, 1000) + while bts and self._should_watch: + self.write(bts.decode()) + os.write(self._original_stdstream_copy, bts) + bts = os.read(self._fid, 1000) + except Exception: + self._exc = sys.exc_info() + + def __init__( + self, session, pub_thread, name, pipe=None, echo=None, *, watchfd=True + ): + """ + Parameters + ---------- + name : str {'stderr', 'stdout'} + the name of the standard stream to replace + watchfd : bool (default, True) + Watch the file descripttor corresponding to the replaced stream. + This is useful if you know some underlying code will write directly + the file descriptor by its number. It will spawn a watching thread, + that will swap the give file descriptor for a pipe, read from the + pipe, and insert this into the current Stream. + + """ if pipe is not None: warnings.warn( "pipe argument to OutStream is deprecated and ignored", @@ -297,8 +347,12 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None): self.session = session if not isinstance(pub_thread, IOPubThread): # Backward-compat: given socket, not thread. Wrap in a thread. - warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread, - DeprecationWarning, stacklevel=2) + warnings.warn( + "Since IPykernel 4.3, OutStream should be created with " + "IOPubThread, not %r" % pub_thread, + DeprecationWarning, + stacklevel=2, + ) pub_thread = IOPubThread(pub_thread) pub_thread.start() self.pub_thread = pub_thread @@ -312,12 +366,35 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None): self._new_buffer() self.echo = None + if ( + watchfd + and (sys.platform.startswith("linux") or sys.platform.startswith("darwin")) + and ("PYTEST_CURRENT_TEST" not in os.environ) + ): + # Pytest set its own capture. Dont redirect from within pytest. + + self._should_watch = True + self._setup_stream_redirects(name) + if echo: if hasattr(echo, 'read') and hasattr(echo, 'write'): self.echo = echo else: raise ValueError("echo argument must be a file like object") + def _setup_stream_redirects(self, name): + pr, pw = os.pipe() + fno = getattr(sys, name).fileno() + self._original_stdstream_copy = os.dup(fno) + os.dup2(pw, fno) + + self._fid = pr + + self._exc = None + self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd) + self.watch_fd_thread.daemon = True + self.watch_fd_thread.start() + def _is_master_process(self): return os.getpid() == self._master_pid @@ -325,6 +402,12 @@ def set_parent(self, parent): self.parent_header = extract_header(parent) def close(self): + if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + self._should_watch = False + self.watch_fd_thread.join() + if self._exc: + etype, value, tb = self._exc + traceback.print_exception(etype, value, tb) self.pub_thread = None @property diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index ff2fdfd90..a340befe8 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -10,12 +10,13 @@ import signal import traceback import logging +from io import TextIOWrapper, FileIO +from logging import StreamHandler import tornado from tornado import ioloop import zmq -from zmq.eventloop import ioloop as zmq_ioloop from zmq.eventloop.zmqstream import ZMQStream from IPython.core.application import ( @@ -414,9 +415,22 @@ def init_io(self): echo=e_stdout) if sys.stderr is not None: sys.stderr.flush() - sys.stderr = outstream_factory(self.session, self.iopub_thread, - 'stderr', - echo=e_stderr) + sys.stderr = outstream_factory( + self.session, self.iopub_thread, "stderr", echo=e_stderr + ) + if hasattr(sys.stderr, "_original_stdstream_copy"): + + for handler in self.log.handlers: + if isinstance(handler, StreamHandler) and ( + handler.stream.buffer.fileno() == 2 + ): + self.log.debug( + "Seeing logger to stderr, rerouting to raw filedescriptor." + ) + + handler.stream = TextIOWrapper( + FileIO(sys.stderr._original_stdstream_copy, "w") + ) if self.displayhook_class: displayhook_factory = import_item(str(self.displayhook_class)) self.displayhook = displayhook_factory(self.session, self.iopub_socket) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 3e99a36a5..e28baed39 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -231,7 +231,6 @@ def should_handle(self, stream, msg, idents): """ msg_id = msg['header']['msg_id'] if msg_id in self.aborted: - msg_type = msg['header']['msg_type'] # is it safe to assume a msg_id will not be resubmitted? self.aborted.remove(msg_id) self._send_abort_reply(stream, msg, idents) @@ -584,8 +583,7 @@ def complete_request(self, stream, ident, parent): matches = yield gen.maybe_future(self.do_complete(code, cursor_pos)) matches = json_clean(matches) - completion_msg = self.session.send(stream, 'complete_reply', - matches, parent, ident) + self.session.send(stream, "complete_reply", matches, parent, ident) def do_complete(self, code, cursor_pos): """Override in subclasses to find completions. @@ -825,9 +823,9 @@ def _send_abort_reply(self, stream, msg, idents): """Send a reply to an aborted request""" self.log.info("Aborting:") self.log.info("%s", msg) - reply_type = msg['header']['msg_type'].rsplit('_', 1)[0] + '_reply' - status = {'status': 'aborted'} - md = {'engine': self.ident} + reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply" + status = {"status": "aborted"} + md = {"engine": self.ident} md.update(status) self.session.send( stream, reply_type, metadata=md, diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 334748d1a..9fe09f43b 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -28,7 +28,7 @@ def _check_master(kc, expected=True, stream="stdout"): execute(kc=kc, code="import sys") flush_channels(kc) - msg_id, content = execute(kc=kc, code="print (sys.%s._is_master_process())" % stream) + msg_id, content = execute(kc=kc, code="print(sys.%s._is_master_process())" % stream) stdout, stderr = assemble_output(kc.get_iopub_msg) assert stdout.strip() == repr(expected) @@ -44,14 +44,44 @@ def _check_status(content): def test_simple_print(): """simple print statement in kernel""" with kernel() as kc: - iopub = kc.iopub_channel - msg_id, content = execute(kc=kc, code="print ('hi')") + msg_id, content = execute(kc=kc, code="print('hi')") stdout, stderr = assemble_output(kc.get_iopub_msg) assert stdout == 'hi\n' assert stderr == '' _check_master(kc, expected=True) +@pytest.mark.skip( + reason="Currently don't capture during test as pytest does its own capturing" +) +def test_capture_fd(): + """simple print statement in kernel""" + with kernel() as kc: + iopub = kc.iopub_channel + msg_id, content = execute(kc=kc, code="import os; os.system('echo capsys')") + stdout, stderr = assemble_output(iopub) + assert stdout == "capsys\n" + assert stderr == "" + _check_master(kc, expected=True) + + +@pytest.mark.skip( + reason="Currently don't capture during test as pytest does its own capturing" +) +def test_subprocess_peek_at_stream_fileno(): + """""" + with kernel() as kc: + iopub = kc.iopub_channel + msg_id, content = execute( + kc=kc, + code="import subprocess, sys; subprocess.run(['python', '-c', 'import os; os.system(\"echo CAP1\"); print(\"CAP2\")'], stderr=sys.stderr)", + ) + stdout, stderr = assemble_output(iopub) + assert stdout == "CAP1\nCAP2\n" + assert stderr == "" + _check_master(kc, expected=True) + + def test_sys_path(): """test that sys.path doesn't get messed up by default""" with kernel() as kc: @@ -85,7 +115,6 @@ def test_sys_path_profile_dir(): def test_subprocess_print(): """printing from forked mp.Process""" with new_kernel() as kc: - iopub = kc.iopub_channel _check_master(kc, expected=True) flush_channels(kc) @@ -113,7 +142,6 @@ def test_subprocess_print(): def test_subprocess_noprint(): """mp.Process without print doesn't trigger iostream mp_mode""" with kernel() as kc: - iopub = kc.iopub_channel np = 5 code = '\n'.join([ @@ -140,7 +168,6 @@ def test_subprocess_noprint(): def test_subprocess_error(): """error in mp.Process doesn't crash""" with new_kernel() as kc: - iopub = kc.iopub_channel code = '\n'.join([ "import multiprocessing as mp", @@ -313,8 +340,6 @@ def test_unc_paths(): file_path = os.path.splitdrive(os.path.dirname(drive_file_path))[1] unc_file_path = os.path.join(unc_root, file_path[1:]) - iopub = kc.iopub_channel - kc.execute("cd {0:s}".format(unc_file_path)) reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) assert reply['content']['status'] == 'ok'