From ae2f441a2d0903c2394d7aa6e606c58b47f5ab05 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Mon, 5 Apr 2021 09:17:25 -0700 Subject: [PATCH 1/8] Try to capture stdout and err going directly to 1/2 filedescriptor. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This try to fix a long standing issue that stdout and stderr going directly to the filedescriptor are not shown in notebooks. This is annoying when using wrappers around c-libraries, or calling system commands as those will not be seen from within notebook. Here we redirect and split the filedescriptor and watch those in threads and redirect both to the original FD (terminal), and ZMQ (notebook). Thus output sent to fd 1 & 2 will be shown BOTH in terminal that launched the notebook server and in notebook themselves. One of the concern is that now logs and errors internal to ipykernel may appear in the notebook themselves, so may confuse user; though these should be limited to error and debug; and we can workaround this by setting the log handler to not be stdout/err. This still seem like a big hack to me, and I don't like thread. I did not manage to make reading the FD non-blocking; so this cannot be put in the io-thread – at least I'm not sure how. So adds 2 extra threads to the kernel. This might need to be turn off by default for now until further testing. Locally this seem to work with things like: - os.system("echo HELLO WORLD") - c-extensions writing directly to fd 1 and 2 (when properly flushed). I have no clue how filedescriptor work on windows, so this only change behavior on linux and mac. --- ipykernel/iostream.py | 73 +++++++++++++++++++++++++++++++++++++++-- ipykernel/kernelbase.py | 6 ++-- 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index ab6a2f707..716126b78 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -12,6 +12,7 @@ import threading import warnings from weakref import WeakSet +import traceback from io import StringIO, TextIOBase import zmq @@ -285,7 +286,44 @@ class OutStream(TextIOBase): topic = None encoding = 'UTF-8' - def __init__(self, session, pub_thread, name, pipe=None, echo=None): + def _watch_pipe_fd(self): + """ + We've redirected standards steams 0 and 1 into a pipe. + + We need to watch in a thread and redirect them to the right places. + + 1) the ZMQ channels to show in notebook interfaces, + 2) the original stdout/err, to capture errors in terminals. + + We cannot schedule this on the ioloop thread, as this might be blocking. + + """ + + try: + bts = os.read(self._fid, 1000) + while bts and self._should_watch: + self.write(bts.decode()) + os.write(self._original_stdstream_copy, bts) + bts = os.read(self._fid, 1000) + except Exception: + self._exc = sys.exc_info() + + def __init__( + self, session, pub_thread, name, pipe=None, echo=None, *, watchfd=True + ): + """ + Parameters + ---------- + name : str {'stderr', 'stdout'} + the name of the standard stream to replace + watchfd : bool (default, True) + Watch the file descripttor corresponding to the replaced stream. + This is useful if you know some underlying code will write directly + the file descriptor by its number. It will spawn a watching thread, + that will swap the give file descriptor for a pipe, read from the + pipe, and insert this into the current Stream. + + """ if pipe is not None: warnings.warn( "pipe argument to OutStream is deprecated and ignored", @@ -297,8 +335,12 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None): self.session = session if not isinstance(pub_thread, IOPubThread): # Backward-compat: given socket, not thread. Wrap in a thread. - warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread, - DeprecationWarning, stacklevel=2) + warnings.warn( + "Since IPykernel 4.3, OutStream should be created with " + "IOPubThread, not %r" % pub_thread, + DeprecationWarning, + stacklevel=2, + ) pub_thread = IOPubThread(pub_thread) pub_thread.start() self.pub_thread = pub_thread @@ -312,12 +354,31 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None): self._new_buffer() self.echo = None + if watchfd and ( + sys.platform.startswith("linux") or sys.platform.startswith("darwin") + ): + self._should_watch = True + self._setup_stream_redirects(name) + if echo: if hasattr(echo, 'read') and hasattr(echo, 'write'): self.echo = echo else: raise ValueError("echo argument must be a file like object") + def _setup_stream_redirects(self, name): + pr, pw = os.pipe() + fno = getattr(sys, name).fileno() + self._original_stdstream_copy = os.dup(fno) + os.dup2(pw, fno) + + self._fid = pr + + self._exc = None + self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd) + self.watch_fd_thread.daemon = True + self.watch_fd_thread.start() + def _is_master_process(self): return os.getpid() == self._master_pid @@ -325,6 +386,12 @@ def set_parent(self, parent): self.parent_header = extract_header(parent) def close(self): + if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + self._should_watch = False + self.watch_fd_thread.join() + if self._exc: + etype, value, tb = self._exc + traceback.print_exception(etype, value, tb) self.pub_thread = None @property diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 3e99a36a5..e071c6936 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -825,9 +825,9 @@ def _send_abort_reply(self, stream, msg, idents): """Send a reply to an aborted request""" self.log.info("Aborting:") self.log.info("%s", msg) - reply_type = msg['header']['msg_type'].rsplit('_', 1)[0] + '_reply' - status = {'status': 'aborted'} - md = {'engine': self.ident} + reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply" + status = {"status": "aborted"} + md = {"engine": self.ident} md.update(status) self.session.send( stream, reply_type, metadata=md, From f1b8a56747f27c7e2bda9f4a175b12b0cc80a2ca Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Mon, 5 Apr 2021 14:05:50 -0700 Subject: [PATCH 2/8] Modify all log handlers to route messages directly screen. Bypass the original fd 2 for stderr, and use the new piped one. --- ipykernel/kernelapp.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index ff2fdfd90..a967ccbde 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -10,6 +10,8 @@ import signal import traceback import logging +from io import TextIOWrapper, FileIO +from logging import StreamHandler import tornado from tornado import ioloop @@ -414,9 +416,24 @@ def init_io(self): echo=e_stdout) if sys.stderr is not None: sys.stderr.flush() - sys.stderr = outstream_factory(self.session, self.iopub_thread, - 'stderr', - echo=e_stderr) + sys.stderr = outstream_factory( + self.session, self.iopub_thread, "stderr", echo=e_stderr + ) + self.log.error("this %s", hasattr(sys.stderr, "_original_stdstream_copy")) + if hasattr(sys.stderr, "_original_stdstream_copy"): + + for handler in self.log.handlers: + if isinstance(handler, StreamHandler) and ( + handler.stream.buffer.fileno() == 2 + ): + self.log.debug( + "Seeing logger to stderr, rerouting to raw filedescriptor." + ) + + handler.stream = TextIOWrapper( + FileIO(sys.stderr._original_stdstream_copy, "w") + ) + self.log.error("Redirected to raw FD.") if self.displayhook_class: displayhook_factory = import_item(str(self.displayhook_class)) self.displayhook = displayhook_factory(self.session, self.iopub_socket) From 799722ec83c8d4e1f372ad97ccbc224b9bbbce93 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Mon, 12 Apr 2021 08:19:52 -0700 Subject: [PATCH 3/8] add tests --- ipykernel/inprocess/tests/test_kernel.py | 21 +++++++++++++++++++++ ipykernel/tests/test_kernel.py | 14 +++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/ipykernel/inprocess/tests/test_kernel.py b/ipykernel/inprocess/tests/test_kernel.py index 4d42e797b..1b84aa12a 100644 --- a/ipykernel/inprocess/tests/test_kernel.py +++ b/ipykernel/inprocess/tests/test_kernel.py @@ -99,6 +99,27 @@ def test_stdout(self): out, err = assemble_output(kc.get_iopub_msg) assert out == 'bar\n' + @pytest.mark.skipif( + sys.platform == 'win32', + reason="not ment to work on windows" + ) + def test_capfd(self): + """ Does correctly capture fd + """ + kernel = InProcessKernel() + + with capture_output() as io: + kernel.shell.run_cell('print("foo")') + assert io.stdout == 'foo\n' + + kc = BlockingInProcessKernelClient(kernel=kernel, session=kernel.session) + kernel.frontends.append(kc) + kc.execute('import os') + kc.execute('os.system("echo capfd")') + out, err = assemble_output(kc.iopub_channel) + assert out == 'capfd\n' + + def test_getpass_stream(self): "Tests that kernel getpass accept the stream parameter" kernel = InProcessKernel() diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 334748d1a..3d55f683b 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -28,7 +28,7 @@ def _check_master(kc, expected=True, stream="stdout"): execute(kc=kc, code="import sys") flush_channels(kc) - msg_id, content = execute(kc=kc, code="print (sys.%s._is_master_process())" % stream) + msg_id, content = execute(kc=kc, code="print(sys.%s._is_master_process())" % stream) stdout, stderr = assemble_output(kc.get_iopub_msg) assert stdout.strip() == repr(expected) @@ -52,6 +52,18 @@ def test_simple_print(): _check_master(kc, expected=True) +@pytest.mark.skipif(sys.platform == "win32", reason="Not meant to work on windows") +def test_capture_fd(): + """simple print statement in kernel""" + with kernel() as kc: + iopub = kc.iopub_channel + msg_id, content = execute(kc=kc, code="import os; os.system('echo capsys')") + stdout, stderr = assemble_output(iopub) + assert stdout == "capsys\n" + assert stderr == "" + _check_master(kc, expected=True) + + def test_sys_path(): """test that sys.path doesn't get messed up by default""" with kernel() as kc: From 12c8ecefdabda937de183373e6ba32ae39bea0d6 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Mon, 12 Apr 2021 08:40:25 -0700 Subject: [PATCH 4/8] Make sure replaced streams have fileno. This is important for example for subprocess that will peak at the filedescriptor. --- ipykernel/iostream.py | 10 ++++++++++ ipykernel/kernelapp.py | 2 -- ipykernel/tests/test_kernel.py | 17 ++++++++++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 716126b78..1948fc994 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -286,6 +286,16 @@ class OutStream(TextIOBase): topic = None encoding = 'UTF-8' + + def fileno(self): + """ + Things like subprocess will peak and write to the fileno() of stderr/stdout. + """ + if getattr(self, '_original_stdstream_copy', None) is not None: + return self._original_stdstream_copy + else: + raise UnsupportedOperation('fileno') + def _watch_pipe_fd(self): """ We've redirected standards steams 0 and 1 into a pipe. diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index a967ccbde..78fa0e70c 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -419,7 +419,6 @@ def init_io(self): sys.stderr = outstream_factory( self.session, self.iopub_thread, "stderr", echo=e_stderr ) - self.log.error("this %s", hasattr(sys.stderr, "_original_stdstream_copy")) if hasattr(sys.stderr, "_original_stdstream_copy"): for handler in self.log.handlers: @@ -433,7 +432,6 @@ def init_io(self): handler.stream = TextIOWrapper( FileIO(sys.stderr._original_stdstream_copy, "w") ) - self.log.error("Redirected to raw FD.") if self.displayhook_class: displayhook_factory = import_item(str(self.displayhook_class)) self.displayhook = displayhook_factory(self.session, self.iopub_socket) diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 3d55f683b..4e4b286d2 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -45,7 +45,7 @@ def test_simple_print(): """simple print statement in kernel""" with kernel() as kc: iopub = kc.iopub_channel - msg_id, content = execute(kc=kc, code="print ('hi')") + msg_id, content = execute(kc=kc, code="print('hi')") stdout, stderr = assemble_output(kc.get_iopub_msg) assert stdout == 'hi\n' assert stderr == '' @@ -64,6 +64,21 @@ def test_capture_fd(): _check_master(kc, expected=True) +@pytest.mark.skipif(sys.platform == "win32", reason="Not meant to work on windows") +def test_subprocess_peek_at_stream_fileno(): + """""" + with kernel() as kc: + iopub = kc.iopub_channel + msg_id, content = execute( + kc=kc, + code="import subprocess, sys; subprocess.run(['python', '-c', 'import os; os.system(\"echo CAP1\"); print(\"CAP2\")'], stderr=sys.stderr)", + ) + stdout, stderr = assemble_output(iopub) + assert stdout == "CAP1\nCAP2\n" + assert stderr == "" + _check_master(kc, expected=True) + + def test_sys_path(): """test that sys.path doesn't get messed up by default""" with kernel() as kc: From 310f0cd8214c4b305b72098b0e920a9be83c0bfb Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Mon, 12 Apr 2021 09:06:43 -0700 Subject: [PATCH 5/8] dont capture with pytest --- ipykernel/inprocess/tests/test_kernel.py | 5 ++--- ipykernel/iostream.py | 9 +++++++-- ipykernel/tests/test_kernel.py | 4 +++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/ipykernel/inprocess/tests/test_kernel.py b/ipykernel/inprocess/tests/test_kernel.py index 1b84aa12a..6728cbdb4 100644 --- a/ipykernel/inprocess/tests/test_kernel.py +++ b/ipykernel/inprocess/tests/test_kernel.py @@ -99,9 +99,8 @@ def test_stdout(self): out, err = assemble_output(kc.get_iopub_msg) assert out == 'bar\n' - @pytest.mark.skipif( - sys.platform == 'win32', - reason="not ment to work on windows" + @pytest.mark.skip( + reason="Currently don't capture during test as pytest does its own capturing" ) def test_capfd(self): """ Does correctly capture fd diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 1948fc994..f0766a433 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -14,6 +14,7 @@ from weakref import WeakSet import traceback from io import StringIO, TextIOBase +import os import zmq if zmq.pyzmq_version_info() >= (17, 0): @@ -364,9 +365,13 @@ def __init__( self._new_buffer() self.echo = None - if watchfd and ( - sys.platform.startswith("linux") or sys.platform.startswith("darwin") + if ( + watchfd + and (sys.platform.startswith("linux") or sys.platform.startswith("darwin")) + and ("PYTEST_CURRENT_TEST" not in os.environ) ): + # Pytest set its own capture. Dont redirect from within pytest. + self._should_watch = True self._setup_stream_redirects(name) diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 4e4b286d2..4b9788619 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -52,7 +52,9 @@ def test_simple_print(): _check_master(kc, expected=True) -@pytest.mark.skipif(sys.platform == "win32", reason="Not meant to work on windows") +@pytest.mark.skip( + reason="Currently don't capture during test as pytest does its own capturing" +) def test_capture_fd(): """simple print statement in kernel""" with kernel() as kc: From 192fc6caddb123ef2c10b1d6f88f574f7d484dfb Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Mon, 12 Apr 2021 09:14:31 -0700 Subject: [PATCH 6/8] fix test --- ipykernel/iostream.py | 3 ++- ipykernel/tests/test_kernel.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index f0766a433..063db479a 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -14,6 +14,7 @@ from weakref import WeakSet import traceback from io import StringIO, TextIOBase +import io import os import zmq @@ -295,7 +296,7 @@ def fileno(self): if getattr(self, '_original_stdstream_copy', None) is not None: return self._original_stdstream_copy else: - raise UnsupportedOperation('fileno') + raise io.UnsupportedOperation("fileno") def _watch_pipe_fd(self): """ diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 4b9788619..6c918aa88 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -66,7 +66,9 @@ def test_capture_fd(): _check_master(kc, expected=True) -@pytest.mark.skipif(sys.platform == "win32", reason="Not meant to work on windows") +@pytest.mark.skip( + reason="Currently don't capture during test as pytest does its own capturing" +) def test_subprocess_peek_at_stream_fileno(): """""" with kernel() as kc: From 79496f0db1ca3a698d596074e341f45d014079c1 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Tue, 13 Apr 2021 16:30:09 -0700 Subject: [PATCH 7/8] reformat --- ipykernel/inprocess/tests/test_kernel.py | 10 ++++------ ipykernel/iostream.py | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/ipykernel/inprocess/tests/test_kernel.py b/ipykernel/inprocess/tests/test_kernel.py index 6728cbdb4..c2904a006 100644 --- a/ipykernel/inprocess/tests/test_kernel.py +++ b/ipykernel/inprocess/tests/test_kernel.py @@ -103,21 +103,19 @@ def test_stdout(self): reason="Currently don't capture during test as pytest does its own capturing" ) def test_capfd(self): - """ Does correctly capture fd - """ + """Does correctly capture fd""" kernel = InProcessKernel() with capture_output() as io: kernel.shell.run_cell('print("foo")') - assert io.stdout == 'foo\n' + assert io.stdout == "foo\n" kc = BlockingInProcessKernelClient(kernel=kernel, session=kernel.session) kernel.frontends.append(kc) - kc.execute('import os') + kc.execute("import os") kc.execute('os.system("echo capfd")') out, err = assemble_output(kc.iopub_channel) - assert out == 'capfd\n' - + assert out == "capfd\n" def test_getpass_stream(self): "Tests that kernel getpass accept the stream parameter" diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 063db479a..0697c6ef8 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -15,7 +15,6 @@ import traceback from io import StringIO, TextIOBase import io -import os import zmq if zmq.pyzmq_version_info() >= (17, 0): @@ -39,6 +38,7 @@ # IO classes #----------------------------------------------------------------------------- + class IOPubThread(object): """An object for sending IOPub messages in a background thread @@ -293,7 +293,7 @@ def fileno(self): """ Things like subprocess will peak and write to the fileno() of stderr/stdout. """ - if getattr(self, '_original_stdstream_copy', None) is not None: + if getattr(self, "_original_stdstream_copy", None) is not None: return self._original_stdstream_copy else: raise io.UnsupportedOperation("fileno") From 8a160a53e298d4cfb3d38a7ac670c5ddd0de5be0 Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Tue, 13 Apr 2021 16:35:27 -0700 Subject: [PATCH 8/8] flake 8 cleanup --- ipykernel/kernelapp.py | 1 - ipykernel/kernelbase.py | 4 +--- ipykernel/tests/test_kernel.py | 6 ------ 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 78fa0e70c..a340befe8 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -17,7 +17,6 @@ from tornado import ioloop import zmq -from zmq.eventloop import ioloop as zmq_ioloop from zmq.eventloop.zmqstream import ZMQStream from IPython.core.application import ( diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e071c6936..e28baed39 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -231,7 +231,6 @@ def should_handle(self, stream, msg, idents): """ msg_id = msg['header']['msg_id'] if msg_id in self.aborted: - msg_type = msg['header']['msg_type'] # is it safe to assume a msg_id will not be resubmitted? self.aborted.remove(msg_id) self._send_abort_reply(stream, msg, idents) @@ -584,8 +583,7 @@ def complete_request(self, stream, ident, parent): matches = yield gen.maybe_future(self.do_complete(code, cursor_pos)) matches = json_clean(matches) - completion_msg = self.session.send(stream, 'complete_reply', - matches, parent, ident) + self.session.send(stream, "complete_reply", matches, parent, ident) def do_complete(self, code, cursor_pos): """Override in subclasses to find completions. diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 6c918aa88..9fe09f43b 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -44,7 +44,6 @@ def _check_status(content): def test_simple_print(): """simple print statement in kernel""" with kernel() as kc: - iopub = kc.iopub_channel msg_id, content = execute(kc=kc, code="print('hi')") stdout, stderr = assemble_output(kc.get_iopub_msg) assert stdout == 'hi\n' @@ -116,7 +115,6 @@ def test_sys_path_profile_dir(): def test_subprocess_print(): """printing from forked mp.Process""" with new_kernel() as kc: - iopub = kc.iopub_channel _check_master(kc, expected=True) flush_channels(kc) @@ -144,7 +142,6 @@ def test_subprocess_print(): def test_subprocess_noprint(): """mp.Process without print doesn't trigger iostream mp_mode""" with kernel() as kc: - iopub = kc.iopub_channel np = 5 code = '\n'.join([ @@ -171,7 +168,6 @@ def test_subprocess_noprint(): def test_subprocess_error(): """error in mp.Process doesn't crash""" with new_kernel() as kc: - iopub = kc.iopub_channel code = '\n'.join([ "import multiprocessing as mp", @@ -344,8 +340,6 @@ def test_unc_paths(): file_path = os.path.splitdrive(os.path.dirname(drive_file_path))[1] unc_file_path = os.path.join(unc_root, file_path[1:]) - iopub = kc.iopub_channel - kc.execute("cd {0:s}".format(unc_file_path)) reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) assert reply['content']['status'] == 'ok'