diff --git a/docs/messaging.rst b/docs/messaging.rst index 7c533a7de..7eb6435a6 100644 --- a/docs/messaging.rst +++ b/docs/messaging.rst @@ -122,9 +122,28 @@ A message is defined by the following four-dictionary structure:: 'buffers': list, } +.. note:: + + The ``session`` id in a message header identifies a unique entity with state, + such as a kernel process or client process. + + A client session id, in message headers from a client, should be unique among + all clients connected to a kernel. When a client reconnects to a kernel, it + should use the same client session id in its message headers. When a client + restarts, it should generate a new client session id. + + A kernel session id, in message headers from a kernel, should identify a + particular kernel process. If a kernel is restarted, the kernel session id + should be regenerated. + + The session id in a message header can be used to identify the sending entity. + For example, if a client disconnects and reconnects to a kernel, and messages + from the kernel have a different kernel session id than prior to the disconnect, + the client should assume that the kernel was restarted. + .. versionchanged:: 5.0 - ``version`` key added to the header. + ``version`` key added to the header. .. versionchanged:: 5.1 @@ -139,8 +158,9 @@ Compatibility ============= Kernels must implement the :ref:`execute ` and :ref:`kernel info -` messages in order to be usable. All other message types -are optional, although we recommend implementing :ref:`completion +` messages, along with the associated busy and idle +:ref:`status` messages. All other message types are +optional, although we recommend implementing :ref:`completion ` if possible. Kernels do not need to send any reply for messages they don't handle, and frontends should provide sensible behaviour if no reply arrives (except for the required execution and kernel info messages). @@ -934,8 +954,7 @@ multiple cases: The client sends a shutdown request to the kernel, and once it receives the reply message (which is otherwise empty), it can assume that the kernel has -completed shutdown safely. The request can be sent on either the `control` or -`shell` channels. +completed shutdown safely. The request is sent on the `control` channel. Upon their own shutdown, client applications will typically execute a last minute sanity check and forcefully terminate any kernel that is still alive, to @@ -959,6 +978,12 @@ Message type: ``shutdown_reply``:: socket, they simply send a forceful process termination signal, since a dead process is unlikely to respond in any useful way to messages. +.. versionchanged:: 5.4 + + Sending a ``shutdown_request`` message on the ``shell`` channel is deprecated. + + + .. _msging_interrupt: Kernel interrupt @@ -1191,6 +1216,8 @@ Message type: ``error``:: ``pyerr`` renamed to ``error`` +.. _status: + Kernel status ------------- @@ -1229,14 +1256,6 @@ between the busy and idle status messages associated with a given request. Busy and idle messages should be sent before/after handling every request, not just execution. -.. note:: - - Extra status messages are added between the notebook webserver and websocket clients - that are not sent by the kernel. These are: - - - restarting (kernel has died, but will be automatically restarted) - - dead (kernel has died, restarting has failed) - Clear output ------------ diff --git a/jupyter_client/blocking/client.py b/jupyter_client/blocking/client.py index c0196ba36..87e0e769e 100644 --- a/jupyter_client/blocking/client.py +++ b/jupyter_client/blocking/client.py @@ -36,7 +36,7 @@ TimeoutError = RuntimeError -def reqrep(meth): +def reqrep(meth, channel='shell'): def wrapped(self, *args, **kwargs): reply = kwargs.pop('reply', False) timeout = kwargs.pop('timeout', None) @@ -44,7 +44,7 @@ def wrapped(self, *args, **kwargs): if not reply: return msg_id - return self._recv_reply(msg_id, timeout=timeout) + return self._recv_reply(msg_id, timeout=timeout, channel=channel) if not meth.__doc__: # python -OO removes docstrings, @@ -135,9 +135,10 @@ def wait_for_ready(self, timeout=None): iopub_channel_class = Type(ZMQSocketChannel) stdin_channel_class = Type(ZMQSocketChannel) hb_channel_class = Type(HBChannel) + control_channel_class = Type(ZMQSocketChannel) - def _recv_reply(self, msg_id, timeout=None): + def _recv_reply(self, msg_id, timeout=None, channel='shell'): """Receive and return the reply for a given request""" if timeout is not None: deadline = monotonic() + timeout @@ -145,7 +146,10 @@ def _recv_reply(self, msg_id, timeout=None): if timeout is not None: timeout = max(0, deadline - monotonic()) try: - reply = self.get_shell_msg(timeout=timeout) + if channel == 'control': + reply = self.get_control_msg(timeout=timeout) + else: + reply = self.get_shell_msg(timeout=timeout) except Empty: raise TimeoutError("Timeout waiting for reply") if reply['parent_header'].get('msg_id') != msg_id: @@ -154,13 +158,16 @@ def _recv_reply(self, msg_id, timeout=None): return reply + # replies come on the shell channel execute = reqrep(KernelClient.execute) history = reqrep(KernelClient.history) complete = reqrep(KernelClient.complete) inspect = reqrep(KernelClient.inspect) kernel_info = reqrep(KernelClient.kernel_info) comm_info = reqrep(KernelClient.comm_info) - shutdown = reqrep(KernelClient.shutdown) + + # replies come on the control channel + shutdown = reqrep(KernelClient.shutdown, channel='control') def _stdin_hook_default(self, msg): diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 763af85a7..01a558f55 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -35,12 +35,13 @@ def validate_string_dict(dct): class KernelClient(ConnectionFileMixin): """Communicates with a single kernel on any host via zmq channels. - There are four channels associated with each kernel: + There are five channels associated with each kernel: * shell: for request/reply calls to the kernel. * iopub: for the kernel to publish results to frontends. * hb: for monitoring the kernel's heartbeat. * stdin: for frontends to reply to raw_input calls in the kernel. + * control: for kernel management calls to the kernel. The messages that can be sent on these channels are exposed as methods of the client (KernelClient.execute, complete, history, etc.). These methods only @@ -58,12 +59,14 @@ def _context_default(self): iopub_channel_class = Type(ChannelABC) stdin_channel_class = Type(ChannelABC) hb_channel_class = Type(HBChannelABC) + control_channel_class = Type(ChannelABC) # Protected traits _shell_channel = Any() _iopub_channel = Any() _stdin_channel = Any() _hb_channel = Any() + _control_channel = Any() # flag for whether execute requests should be allowed to call raw_input: allow_stdin = True @@ -84,11 +87,15 @@ def get_stdin_msg(self, *args, **kwargs): """Get a message from the stdin channel""" return self.stdin_channel.get_msg(*args, **kwargs) + def get_control_msg(self, *args, **kwargs): + """Get a message from the control channel""" + return self.control_channel.get_msg(*args, **kwargs) + #-------------------------------------------------------------------------- # Channel management methods #-------------------------------------------------------------------------- - def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True): """Starts the channels for this kernel. This will create the channels if they do not exist and then start @@ -109,6 +116,8 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): self.allow_stdin = False if hb: self.hb_channel.start() + if control: + self.control_channel.start() def stop_channels(self): """Stops all the running channels for this kernel. @@ -123,12 +132,15 @@ def stop_channels(self): self.stdin_channel.stop() if self.hb_channel.is_alive(): self.hb_channel.stop() + if self.control_channel.is_alive(): + self.control_channel.stop() @property def channels_running(self): """Are any of the channels created and running?""" return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or - self.stdin_channel.is_alive() or self.hb_channel.is_alive()) + self.stdin_channel.is_alive() or self.hb_channel.is_alive() or + self.control_channel.is_alive()) ioloop = None # Overridden in subclasses that use pyzmq event loop @@ -179,6 +191,18 @@ def hb_channel(self): ) return self._hb_channel + @property + def control_channel(self): + """Get the control channel object for this kernel.""" + if self._control_channel is None: + url = self._make_url('control') + self.log.debug("connecting control channel to %s", url) + socket = self.connect_control(identity=self.session.bsession) + self._control_channel = self.control_channel_class( + socket, self.session, self.ioloop + ) + return self._control_channel + def is_alive(self): """Is the kernel process still running?""" from .manager import KernelManager @@ -383,8 +407,24 @@ def _handle_kernel_info_reply(self, msg): if adapt_version != major_protocol_version: self.session.adapt_version = adapt_version + def is_complete(self, code): + """Ask the kernel whether some code is complete and ready to execute.""" + msg = self.session.msg('is_complete_request', {'code': code}) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def input(self, string): + """Send a string of raw input to the kernel. + + This should only be called in response to the kernel sending an + ``input_request`` message on the stdin channel. + """ + content = dict(value=string) + msg = self.session.msg('input_reply', content) + self.stdin_channel.send(msg) + def shutdown(self, restart=False): - """Request an immediate kernel shutdown. + """Request an immediate kernel shutdown on the control channel. Upon receipt of the (empty) reply, client code can safely assume that the kernel has shut down and it's safe to forcefully terminate it if @@ -401,24 +441,7 @@ def shutdown(self, restart=False): # Send quit message to kernel. Once we implement kernel-side setattr, # this should probably be done that way, but for now this will do. msg = self.session.msg('shutdown_request', {'restart':restart}) - self.shell_channel.send(msg) - return msg['header']['msg_id'] - - def is_complete(self, code): - """Ask the kernel whether some code is complete and ready to execute.""" - msg = self.session.msg('is_complete_request', {'code': code}) - self.shell_channel.send(msg) + self.control_channel.send(msg) return msg['header']['msg_id'] - def input(self, string): - """Send a string of raw input to the kernel. - - This should only be called in response to the kernel sending an - ``input_request`` message on the stdin channel. - """ - content = dict(value=string) - msg = self.session.msg('input_reply', content) - self.stdin_channel.send(msg) - - KernelClientABC.register(KernelClient) diff --git a/jupyter_client/clientabc.py b/jupyter_client/clientabc.py index 7a718284a..9a47d2fcb 100644 --- a/jupyter_client/clientabc.py +++ b/jupyter_client/clientabc.py @@ -47,12 +47,16 @@ def hb_channel_class(self): def stdin_channel_class(self): pass + @abc.abstractproperty + def control_channel_class(self): + pass + #-------------------------------------------------------------------------- # Channel management methods #-------------------------------------------------------------------------- @abc.abstractmethod - def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True): pass @abc.abstractmethod @@ -78,3 +82,7 @@ def stdin_channel(self): @abc.abstractproperty def hb_channel(self): pass + + @abc.abstractproperty + def control_channel(self): + pass diff --git a/jupyter_client/connect.py b/jupyter_client/connect.py index 2cc89de35..81d6d852a 100644 --- a/jupyter_client/connect.py +++ b/jupyter_client/connect.py @@ -226,7 +226,7 @@ def find_connection_file(filename='kernel-*.json', path=None, profile=None): def tunnel_to_kernel(connection_info, sshserver, sshkey=None): """tunnel connections to a kernel via ssh - This will open four SSH tunnels from localhost on this machine to the + This will open five SSH tunnels from localhost on this machine to the ports associated with the kernel. They can be either direct localhost-localhost tunnels, or if an intermediate server is necessary, the kernel must be listening on a public IP. @@ -246,8 +246,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None): Returns ------- - (shell, iopub, stdin, hb) : ints - The four ports on localhost that have been forwarded to the kernel. + (shell, iopub, stdin, hb, control) : ints + The five ports on localhost that have been forwarded to the kernel. """ from .ssh import tunnel if isinstance(connection_info, string_types): @@ -257,8 +257,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None): cf = connection_info - lports = tunnel.select_random_ports(4) - rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port'] + lports = tunnel.select_random_ports(5) + rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port'], cf['control_port'] remote_ip = cf['ip'] diff --git a/jupyter_client/consoleapp.py b/jupyter_client/consoleapp.py index 2e27a2918..e8c8e769e 100644 --- a/jupyter_client/consoleapp.py +++ b/jupyter_client/consoleapp.py @@ -72,6 +72,7 @@ shell = 'JupyterConsoleApp.shell_port', iopub = 'JupyterConsoleApp.iopub_port', stdin = 'JupyterConsoleApp.stdin_port', + control = 'JupyterConsoleApp.control_port', existing = 'JupyterConsoleApp.existing', f = 'JupyterConsoleApp.connection_file', @@ -222,7 +223,8 @@ def init_ssh(self): shell_port=self.shell_port, iopub_port=self.iopub_port, stdin_port=self.stdin_port, - hb_port=self.hb_port + hb_port=self.hb_port, + control_port=self.control_port ) self.log.info("Forwarding connections to %s via %s"%(ip, self.sshserver)) @@ -236,7 +238,7 @@ def init_ssh(self): self.log.error("Could not setup tunnels", exc_info=True) self.exit(1) - self.shell_port, self.iopub_port, self.stdin_port, self.hb_port = newports + self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port = newports cf = self.connection_file root, ext = os.path.splitext(cf) @@ -275,6 +277,7 @@ def init_kernel_manager(self): iopub_port=self.iopub_port, stdin_port=self.stdin_port, hb_port=self.hb_port, + control_port=self.control_port, connection_file=self.connection_file, kernel_name=self.kernel_name, parent=self, @@ -302,6 +305,7 @@ def init_kernel_manager(self): self.iopub_port=km.iopub_port self.stdin_port=km.stdin_port self.hb_port=km.hb_port + self.control_port=km.control_port self.connection_file = km.connection_file atexit.register(self.kernel_manager.cleanup_connection_file) @@ -318,6 +322,7 @@ def init_kernel_client(self): iopub_port=self.iopub_port, stdin_port=self.stdin_port, hb_port=self.hb_port, + control_port=self.control_port, connection_file=self.connection_file, parent=self, ) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index bf94ad002..05b1bb53d 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -297,7 +297,7 @@ def shutdown_kernel(self, now=False, restart=False): This attempts to shutdown the kernels cleanly by: - 1. Sending it a shutdown message over the shell channel. + 1. Sending it a shutdown message over the control channel. 2. If that fails, the kernel is shutdown forcibly by sending it a signal. diff --git a/jupyter_client/threaded.py b/jupyter_client/threaded.py index 83a6ad0eb..801ac7acc 100644 --- a/jupyter_client/threaded.py +++ b/jupyter_client/threaded.py @@ -230,14 +230,14 @@ def ioloop(self): ioloop_thread = Instance(IOLoopThread, allow_none=True) - def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True): self.ioloop_thread = IOLoopThread() self.ioloop_thread.start() if shell: self.shell_channel._inspect = self._check_kernel_info_reply - super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb) + super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb, control) def _check_kernel_info_reply(self, msg): """This is run in the ioloop thread when the kernel info reply is received @@ -255,3 +255,4 @@ def stop_channels(self): shell_channel_class = Type(ThreadedZMQSocketChannel) stdin_channel_class = Type(ThreadedZMQSocketChannel) hb_channel_class = Type(HBChannel) + control_channel_class = Type(ThreadedZMQSocketChannel)