Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
16e4f96
update Comms and matplotlib while waiting for input
Sep 17, 2019
071e5f6
change import to import Gcf directly
Sep 17, 2019
2d19be4
make matplotlib dependency facultative
Sep 17, 2019
bad10cc
move matplotlib inside function
Sep 17, 2019
b2bd633
move waiting code to method
Sep 19, 2019
ddc6405
Merge branch '_wait_input_request_reply' into Update_comms_matplotlib
Sep 19, 2019
8bc89a5
move import into new method
Sep 19, 2019
0e485a0
use public API
Sep 19, 2019
0ee0cee
Remove do_one_iteration
Sep 22, 2019
05a2959
Merge branch 'repeat_tests' into _wait_input_request_reply
Oct 5, 2019
2915f56
Merge branch '_wait_input_request_reply' into Update_comms_matplotlib
Oct 5, 2019
3487e7d
Update ipykernel/kernelbase.py
impact27 Oct 6, 2019
9397d53
Check if is main thread
Oct 6, 2019
45cedfc
Merge remote-tracking branch 'upstream/master' into Update_comms_matp…
Oct 9, 2019
b90b5d5
Merge remote-tracking branch 'upstream/master' into Update_comms_matp…
Oct 16, 2019
9f33e69
Update all eventloops
Oct 16, 2019
55f44a1
Block while waiting for reply
Oct 23, 2019
188c2a1
add lock to stdin
Oct 24, 2019
c0fc6b8
move code to Kernel
Oct 25, 2019
404c437
remove whitespace
impact27 Oct 25, 2019
922bb55
skip stdin_stream if dummy socket
Oct 26, 2019
f20c034
Merge remote-tracking branch 'upstream/master' into Update_comms_matp…
Nov 25, 2019
94e36a9
Add magic to enable and disable the eventloop while waiting for input
Dec 4, 2019
190e8ba
Merge remote-tracking branch 'upstream/master' into Update_comms_matp…
Dec 5, 2019
99e3d4f
Change PY2 compatibility code
Dec 5, 2019
bf3cb03
Merge remote-tracking branch 'upstream/master' into Update_comms_matp…
Mar 28, 2020
d16d6e4
add io_streams property to kernel
Mar 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def loop_qt4(kernel):
kernel.app = get_app_qt4([" "])
kernel.app.setQuitOnLastWindowClosed(False)

for s in kernel.shell_streams:
for s in kernel.shell_streams + [kernel.stdin_stream]:
_notify_stream_qt(kernel, s)

_loop_qt(kernel.app)
Expand Down Expand Up @@ -159,7 +159,7 @@ def loop_wx(kernel):

def wake():
"""wake from wx"""
for stream in kernel.shell_streams:
for stream in kernel.shell_streams + [kernel.stdin_stream]:
if stream.flush(limit=1):
kernel.app.ExitMainLoop()
return
Expand Down Expand Up @@ -225,7 +225,7 @@ def process_stream_events(stream, *a, **kw):
# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app = app = Tk()
kernel.app.withdraw()
for stream in kernel.shell_streams:
for stream in kernel.shell_streams + [kernel.stdin_stream]:
notifier = partial(process_stream_events, stream)
# seems to be needed for tk
notifier.__name__ = 'notifier'
Expand Down Expand Up @@ -296,7 +296,7 @@ def handle_int(etype, value, tb):
# don't let interrupts during mainloop invoke crash_handler:
sys.excepthook = handle_int
mainloop(kernel._poll_interval)
for stream in kernel.shell_streams:
for stream in kernel.shell_streams + [kernel.stdin_stream]:
if stream.flush(limit=1):
# events to process, return control to kernel
return
Expand Down Expand Up @@ -337,7 +337,7 @@ def process_stream_events(stream):
if stream.flush(limit=1):
loop.stop()

for stream in kernel.shell_streams:
for stream in kernel.shell_streams + [kernel.stdin_stream]:
fd = stream.getsockopt(zmq.FD)
notifier = partial(process_stream_events, stream)
loop.add_reader(fd, notifier)
Expand Down
13 changes: 13 additions & 0 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ def init_kernel(self):
"""Create the Kernel object itself"""
shell_stream = ZMQStream(self.shell_socket)
control_stream = ZMQStream(self.control_socket)
stdin_stream = ZMQStream(self.stdin_socket)

kernel_factory = self.kernel_class.instance

Expand All @@ -446,6 +447,18 @@ def init_kernel(self):
profile_dir=self.profile_dir,
user_ns=self.user_ns,
)
kernel.stdin_stream = stdin_stream

def handle_msg(msg):
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
kernel._stdin_msg = msg

kernel.stdin_stream.on_recv(handle_msg, copy=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all these new lines here should be moved to the Kernel class (present in kernelbase.py). In particular, handle_msg should be part of dispatch_stdin (following what dispatch_control and dispatch_shell do for control_stream and shell_streams, respectively).

However, I don't know what the implications of adding a new stream here are for the Jupyter protocol. Pinging @takluyver for his insights about that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it. I don't want to process the messages recieved on stdin so I don't think dispatch_stdin is a good idea. We are not dispatching the messages, only saving the most recent. A stream is just a wrapper around a socket. I am not adding a socket, so the protocol is unchanged. The only reason I need a stream around the socket is to get a qt event when a message is recieved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, AFAIK @impact27 is right - there shouldn't be a difference to the protocol. A pyzmq stream is a wrapper around a socket to tie it to an event loop. The stdin socket already existed, so whether the code wraps it in a stream or not should be an implementation detail.

(I haven't tried to review the PR, just answering the question I was mentioned for)

kernel.record_ports({
name + '_port': port for name, port in self.ports.items()
})
Expand Down
64 changes: 49 additions & 15 deletions ipykernel/kernelbase.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import time
import uuid
import threading

try:
# jupyter_client >= 5, use tz-aware now
Expand Down Expand Up @@ -41,6 +42,7 @@

from ._version import kernel_protocol_version


CONTROL_PRIORITY = 1
SHELL_PRIORITY = 10
ABORT_PRIORITY = 20
Expand Down Expand Up @@ -170,6 +172,9 @@ def __init__(self, **kwargs):
for msg_type in self.control_msg_types:
self.control_handlers[msg_type] = getattr(self, msg_type)

self._stdin_msg = None
self._stdin_lock = threading.Lock()

@gen.coroutine
def dispatch_control(self, msg):
"""dispatch control requests"""
Expand Down Expand Up @@ -860,6 +865,7 @@ def raw_input(self, prompt=''):
)

def _input_request(self, prompt, ident, parent, password=False):
"""Send an input request to the frontend and wait for the reply."""
# Flush output before making the request.
sys.stderr.flush()
sys.stdout.flush()
Expand All @@ -873,22 +879,15 @@ def _input_request(self, prompt, ident, parent, password=False):
else:
raise

# Send the input request.
content = json_clean(dict(prompt=prompt, password=password))
self.session.send(self.stdin_socket, u'input_request', content, parent,
ident=ident)
with self._stdin_lock:
self._stdin_msg = None
# Send the input request.
content = json_clean(dict(prompt=prompt, password=password))
self.session.send(self.stdin_socket, u'input_request', content, parent,
ident=ident)
# Await a response.
reply = self._wait_input_request_reply()

# Await a response.
while True:
try:
ident, reply = self.session.recv(self.stdin_socket, 0)
except Exception:
self.log.warning("Invalid Message:", exc_info=True)
except KeyboardInterrupt:
# re-raise KeyboardInterrupt, to truncate traceback
raise KeyboardInterrupt
else:
break
try:
value = py3compat.unicode_to_str(reply['content']['value'])
except:
Expand All @@ -899,6 +898,41 @@ def _input_request(self, prompt, ident, parent, password=False):
raise EOFError
return value

def _wait_input_request_reply(self):
"""Wait for an input request reply.

Raises
------
KeyboardInterrupt if a keyboard interrupt is recieved.
"""
# Await a response.
reply = None
while reply is None:
try:
reply = self._input_request_loop_step()
except Exception:
self.log.warning("Invalid Message:", exc_info=True)
except KeyboardInterrupt:
# re-raise KeyboardInterrupt, to truncate traceback
raise KeyboardInterrupt
return reply

def _input_request_loop_step(self):
"""Do one step of the input request loop."""
# Allow GUI event loop to update
if sys.version_info >= (3, 4):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the minimal supported version in the master branch is 3.5 (please check that in setup.py)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doubles as a test for python 2

is_main_thread = (threading.current_thread() is
threading.main_thread())
else:
is_main_thread = isinstance(threading.current_thread(),
threading._MainThread)
if is_main_thread and self.eventloop:
self.eventloop(self)
return self._stdin_msg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and possibly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_input_request_loop_step is called from _wait_input_request_reply which is protected by a lock in _input_request, so I don't think a lock here would do anything.

else:
ident, reply = self.session.recv(self.stdin_socket, 0)
return reply

def _at_shutdown(self):
"""Actions taken at shutdown by the kernel, called by python's atexit.
"""
Expand Down