Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ipykernel/inprocess/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@ def test_stdout(self):
out, err = assemble_output(kc.get_iopub_msg)
assert out == 'bar\n'

@pytest.mark.skip(
reason="Currently don't capture during test as pytest does its own capturing"
)
def test_capfd(self):
"""Does correctly capture fd"""
kernel = InProcessKernel()

with capture_output() as io:
kernel.shell.run_cell('print("foo")')
assert io.stdout == "foo\n"

kc = BlockingInProcessKernelClient(kernel=kernel, session=kernel.session)
kernel.frontends.append(kc)
kc.execute("import os")
kc.execute('os.system("echo capfd")')
out, err = assemble_output(kc.iopub_channel)
assert out == "capfd\n"

def test_getpass_stream(self):
"Tests that kernel getpass accept the stream parameter"
kernel = InProcessKernel()
Expand Down
89 changes: 86 additions & 3 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import threading
import warnings
from weakref import WeakSet
import traceback
from io import StringIO, TextIOBase
import io

import zmq
if zmq.pyzmq_version_info() >= (17, 0):
Expand All @@ -36,6 +38,7 @@
# IO classes
#-----------------------------------------------------------------------------


class IOPubThread(object):
"""An object for sending IOPub messages in a background thread

Expand Down Expand Up @@ -285,7 +288,54 @@ class OutStream(TextIOBase):
topic = None
encoding = 'UTF-8'

def __init__(self, session, pub_thread, name, pipe=None, echo=None):

def fileno(self):
"""
Things like subprocess will peak and write to the fileno() of stderr/stdout.
"""
if getattr(self, "_original_stdstream_copy", None) is not None:
return self._original_stdstream_copy
else:
raise io.UnsupportedOperation("fileno")

def _watch_pipe_fd(self):
"""
We've redirected standards steams 0 and 1 into a pipe.

We need to watch in a thread and redirect them to the right places.

1) the ZMQ channels to show in notebook interfaces,
2) the original stdout/err, to capture errors in terminals.

We cannot schedule this on the ioloop thread, as this might be blocking.

"""

try:
bts = os.read(self._fid, 1000)
while bts and self._should_watch:
self.write(bts.decode())
os.write(self._original_stdstream_copy, bts)
bts = os.read(self._fid, 1000)
except Exception:
self._exc = sys.exc_info()

def __init__(
self, session, pub_thread, name, pipe=None, echo=None, *, watchfd=True
):
"""
Parameters
----------
name : str {'stderr', 'stdout'}
the name of the standard stream to replace
watchfd : bool (default, True)
Watch the file descripttor corresponding to the replaced stream.
This is useful if you know some underlying code will write directly
the file descriptor by its number. It will spawn a watching thread,
that will swap the give file descriptor for a pipe, read from the
pipe, and insert this into the current Stream.

"""
if pipe is not None:
warnings.warn(
"pipe argument to OutStream is deprecated and ignored",
Expand All @@ -297,8 +347,12 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None):
self.session = session
if not isinstance(pub_thread, IOPubThread):
# Backward-compat: given socket, not thread. Wrap in a thread.
warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread,
DeprecationWarning, stacklevel=2)
warnings.warn(
"Since IPykernel 4.3, OutStream should be created with "
"IOPubThread, not %r" % pub_thread,
DeprecationWarning,
stacklevel=2,
)
pub_thread = IOPubThread(pub_thread)
pub_thread.start()
self.pub_thread = pub_thread
Expand All @@ -312,19 +366,48 @@ def __init__(self, session, pub_thread, name, pipe=None, echo=None):
self._new_buffer()
self.echo = None

if (
watchfd
and (sys.platform.startswith("linux") or sys.platform.startswith("darwin"))
and ("PYTEST_CURRENT_TEST" not in os.environ)
):
# Pytest set its own capture. Dont redirect from within pytest.

self._should_watch = True
self._setup_stream_redirects(name)

if echo:
if hasattr(echo, 'read') and hasattr(echo, 'write'):
self.echo = echo
else:
raise ValueError("echo argument must be a file like object")

def _setup_stream_redirects(self, name):
pr, pw = os.pipe()
fno = getattr(sys, name).fileno()
self._original_stdstream_copy = os.dup(fno)
os.dup2(pw, fno)

self._fid = pr

self._exc = None
self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd)
self.watch_fd_thread.daemon = True
self.watch_fd_thread.start()

def _is_master_process(self):
return os.getpid() == self._master_pid

def set_parent(self, parent):
self.parent_header = extract_header(parent)

def close(self):
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
self._should_watch = False
self.watch_fd_thread.join()
if self._exc:
etype, value, tb = self._exc
traceback.print_exception(etype, value, tb)
self.pub_thread = None

@property
Expand Down
22 changes: 18 additions & 4 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
import signal
import traceback
import logging
from io import TextIOWrapper, FileIO
from logging import StreamHandler

import tornado
from tornado import ioloop

import zmq
from zmq.eventloop import ioloop as zmq_ioloop
from zmq.eventloop.zmqstream import ZMQStream

from IPython.core.application import (
Expand Down Expand Up @@ -414,9 +415,22 @@ def init_io(self):
echo=e_stdout)
if sys.stderr is not None:
sys.stderr.flush()
sys.stderr = outstream_factory(self.session, self.iopub_thread,
'stderr',
echo=e_stderr)
sys.stderr = outstream_factory(
self.session, self.iopub_thread, "stderr", echo=e_stderr
)
if hasattr(sys.stderr, "_original_stdstream_copy"):

for handler in self.log.handlers:
if isinstance(handler, StreamHandler) and (
handler.stream.buffer.fileno() == 2
):
self.log.debug(
"Seeing logger to stderr, rerouting to raw filedescriptor."
)

handler.stream = TextIOWrapper(
FileIO(sys.stderr._original_stdstream_copy, "w")
)
if self.displayhook_class:
displayhook_factory = import_item(str(self.displayhook_class))
self.displayhook = displayhook_factory(self.session, self.iopub_socket)
Expand Down
10 changes: 4 additions & 6 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ def should_handle(self, stream, msg, idents):
"""
msg_id = msg['header']['msg_id']
if msg_id in self.aborted:
msg_type = msg['header']['msg_type']
# is it safe to assume a msg_id will not be resubmitted?
self.aborted.remove(msg_id)
self._send_abort_reply(stream, msg, idents)
Expand Down Expand Up @@ -584,8 +583,7 @@ def complete_request(self, stream, ident, parent):

matches = yield gen.maybe_future(self.do_complete(code, cursor_pos))
matches = json_clean(matches)
completion_msg = self.session.send(stream, 'complete_reply',
matches, parent, ident)
self.session.send(stream, "complete_reply", matches, parent, ident)

def do_complete(self, code, cursor_pos):
"""Override in subclasses to find completions.
Expand Down Expand Up @@ -825,9 +823,9 @@ def _send_abort_reply(self, stream, msg, idents):
"""Send a reply to an aborted request"""
self.log.info("Aborting:")
self.log.info("%s", msg)
reply_type = msg['header']['msg_type'].rsplit('_', 1)[0] + '_reply'
status = {'status': 'aborted'}
md = {'engine': self.ident}
reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply"
status = {"status": "aborted"}
md = {"engine": self.ident}
md.update(status)
self.session.send(
stream, reply_type, metadata=md,
Expand Down
41 changes: 33 additions & 8 deletions ipykernel/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
def _check_master(kc, expected=True, stream="stdout"):
execute(kc=kc, code="import sys")
flush_channels(kc)
msg_id, content = execute(kc=kc, code="print (sys.%s._is_master_process())" % stream)
msg_id, content = execute(kc=kc, code="print(sys.%s._is_master_process())" % stream)
stdout, stderr = assemble_output(kc.get_iopub_msg)
assert stdout.strip() == repr(expected)

Expand All @@ -44,14 +44,44 @@ def _check_status(content):
def test_simple_print():
"""simple print statement in kernel"""
with kernel() as kc:
iopub = kc.iopub_channel
msg_id, content = execute(kc=kc, code="print ('hi')")
msg_id, content = execute(kc=kc, code="print('hi')")
stdout, stderr = assemble_output(kc.get_iopub_msg)
assert stdout == 'hi\n'
assert stderr == ''
_check_master(kc, expected=True)


@pytest.mark.skip(
reason="Currently don't capture during test as pytest does its own capturing"
)
def test_capture_fd():
"""simple print statement in kernel"""
with kernel() as kc:
iopub = kc.iopub_channel
msg_id, content = execute(kc=kc, code="import os; os.system('echo capsys')")
stdout, stderr = assemble_output(iopub)
assert stdout == "capsys\n"
assert stderr == ""
_check_master(kc, expected=True)


@pytest.mark.skip(
reason="Currently don't capture during test as pytest does its own capturing"
)
def test_subprocess_peek_at_stream_fileno():
""""""
with kernel() as kc:
iopub = kc.iopub_channel
msg_id, content = execute(
kc=kc,
code="import subprocess, sys; subprocess.run(['python', '-c', 'import os; os.system(\"echo CAP1\"); print(\"CAP2\")'], stderr=sys.stderr)",
)
stdout, stderr = assemble_output(iopub)
assert stdout == "CAP1\nCAP2\n"
assert stderr == ""
_check_master(kc, expected=True)


def test_sys_path():
"""test that sys.path doesn't get messed up by default"""
with kernel() as kc:
Expand Down Expand Up @@ -85,7 +115,6 @@ def test_sys_path_profile_dir():
def test_subprocess_print():
"""printing from forked mp.Process"""
with new_kernel() as kc:
iopub = kc.iopub_channel

_check_master(kc, expected=True)
flush_channels(kc)
Expand Down Expand Up @@ -113,7 +142,6 @@ def test_subprocess_print():
def test_subprocess_noprint():
"""mp.Process without print doesn't trigger iostream mp_mode"""
with kernel() as kc:
iopub = kc.iopub_channel

np = 5
code = '\n'.join([
Expand All @@ -140,7 +168,6 @@ def test_subprocess_noprint():
def test_subprocess_error():
"""error in mp.Process doesn't crash"""
with new_kernel() as kc:
iopub = kc.iopub_channel

code = '\n'.join([
"import multiprocessing as mp",
Expand Down Expand Up @@ -313,8 +340,6 @@ def test_unc_paths():
file_path = os.path.splitdrive(os.path.dirname(drive_file_path))[1]
unc_file_path = os.path.join(unc_root, file_path[1:])

iopub = kc.iopub_channel

kc.execute("cd {0:s}".format(unc_file_path))
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
assert reply['content']['status'] == 'ok'
Expand Down