Skip to content

Commit 5bac5e5

Browse files
committed
Percolate a control connection throughout the code.
1 parent 846354e commit 5bac5e5

File tree

6 files changed

+37
-16
lines changed

6 files changed

+37
-16
lines changed

jupyter_client/blocking/client.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@
3636
TimeoutError = RuntimeError
3737

3838

39-
def reqrep(meth):
39+
def reqrep(meth, channel='shell'):
4040
def wrapped(self, *args, **kwargs):
4141
reply = kwargs.pop('reply', False)
4242
timeout = kwargs.pop('timeout', None)
4343
msg_id = meth(self, *args, **kwargs)
4444
if not reply:
4545
return msg_id
4646

47-
return self._recv_reply(msg_id, timeout=timeout)
47+
return self._recv_reply(msg_id, timeout=timeout, channel=channel)
4848

4949
if not meth.__doc__:
5050
# python -OO removes docstrings,
@@ -135,17 +135,21 @@ def wait_for_ready(self, timeout=None):
135135
iopub_channel_class = Type(ZMQSocketChannel)
136136
stdin_channel_class = Type(ZMQSocketChannel)
137137
hb_channel_class = Type(HBChannel)
138+
control_channel_class = Type(ZMQSocketChannel)
138139

139140

140-
def _recv_reply(self, msg_id, timeout=None):
141+
def _recv_reply(self, msg_id, timeout=None, channel='shell'):
141142
"""Receive and return the reply for a given request"""
142143
if timeout is not None:
143144
deadline = monotonic() + timeout
144145
while True:
145146
if timeout is not None:
146147
timeout = max(0, deadline - monotonic())
147148
try:
148-
reply = self.get_shell_msg(timeout=timeout)
149+
if channel == 'control':
150+
reply = self.get_control_msg(timeout=timeout)
151+
else:
152+
reply = self.get_shell_msg(timeout=timeout)
149153
except Empty:
150154
raise TimeoutError("Timeout waiting for reply")
151155
if reply['parent_header'].get('msg_id') != msg_id:
@@ -154,13 +158,16 @@ def _recv_reply(self, msg_id, timeout=None):
154158
return reply
155159

156160

161+
# replies come on the shell channel
157162
execute = reqrep(KernelClient.execute)
158163
history = reqrep(KernelClient.history)
159164
complete = reqrep(KernelClient.complete)
160165
inspect = reqrep(KernelClient.inspect)
161166
kernel_info = reqrep(KernelClient.kernel_info)
162167
comm_info = reqrep(KernelClient.comm_info)
163-
shutdown = reqrep(KernelClient.shutdown)
168+
169+
# replies come on the control channel
170+
shutdown = reqrep(KernelClient.shutdown, channel='control')
164171

165172

166173
def _stdin_hook_default(self, msg):

jupyter_client/clientabc.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,16 @@ def hb_channel_class(self):
4747
def stdin_channel_class(self):
4848
pass
4949

50+
@abc.abstractproperty
51+
def control_channel_class(self):
52+
pass
53+
5054
#--------------------------------------------------------------------------
5155
# Channel management methods
5256
#--------------------------------------------------------------------------
5357

5458
@abc.abstractmethod
55-
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
59+
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
5660
pass
5761

5862
@abc.abstractmethod
@@ -78,3 +82,7 @@ def stdin_channel(self):
7882
@abc.abstractproperty
7983
def hb_channel(self):
8084
pass
85+
86+
@abc.abstractproperty
87+
def control_channel(self):
88+
pass

jupyter_client/connect.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def find_connection_file(filename='kernel-*.json', path=None, profile=None):
226226
def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
227227
"""tunnel connections to a kernel via ssh
228228
229-
This will open four SSH tunnels from localhost on this machine to the
229+
This will open five SSH tunnels from localhost on this machine to the
230230
ports associated with the kernel. They can be either direct
231231
localhost-localhost tunnels, or if an intermediate server is necessary,
232232
the kernel must be listening on a public IP.
@@ -246,8 +246,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
246246
Returns
247247
-------
248248
249-
(shell, iopub, stdin, hb) : ints
250-
The four ports on localhost that have been forwarded to the kernel.
249+
(shell, iopub, stdin, hb, control) : ints
250+
The five ports on localhost that have been forwarded to the kernel.
251251
"""
252252
from .ssh import tunnel
253253
if isinstance(connection_info, string_types):
@@ -257,8 +257,8 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
257257

258258
cf = connection_info
259259

260-
lports = tunnel.select_random_ports(4)
261-
rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port']
260+
lports = tunnel.select_random_ports(5)
261+
rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port'], cf['control_port']
262262

263263
remote_ip = cf['ip']
264264

jupyter_client/consoleapp.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
shell = 'JupyterConsoleApp.shell_port',
7373
iopub = 'JupyterConsoleApp.iopub_port',
7474
stdin = 'JupyterConsoleApp.stdin_port',
75+
control = 'JupyterConsoleApp.control_port',
7576
existing = 'JupyterConsoleApp.existing',
7677
f = 'JupyterConsoleApp.connection_file',
7778

@@ -222,7 +223,8 @@ def init_ssh(self):
222223
shell_port=self.shell_port,
223224
iopub_port=self.iopub_port,
224225
stdin_port=self.stdin_port,
225-
hb_port=self.hb_port
226+
hb_port=self.hb_port,
227+
control_port=self.control_port
226228
)
227229

228230
self.log.info("Forwarding connections to %s via %s"%(ip, self.sshserver))
@@ -236,7 +238,7 @@ def init_ssh(self):
236238
self.log.error("Could not setup tunnels", exc_info=True)
237239
self.exit(1)
238240

239-
self.shell_port, self.iopub_port, self.stdin_port, self.hb_port = newports
241+
self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port = newports
240242

241243
cf = self.connection_file
242244
root, ext = os.path.splitext(cf)
@@ -275,6 +277,7 @@ def init_kernel_manager(self):
275277
iopub_port=self.iopub_port,
276278
stdin_port=self.stdin_port,
277279
hb_port=self.hb_port,
280+
control_port=self.control_port,
278281
connection_file=self.connection_file,
279282
kernel_name=self.kernel_name,
280283
parent=self,
@@ -302,6 +305,7 @@ def init_kernel_manager(self):
302305
self.iopub_port=km.iopub_port
303306
self.stdin_port=km.stdin_port
304307
self.hb_port=km.hb_port
308+
self.control_port=km.control_port
305309
self.connection_file = km.connection_file
306310

307311
atexit.register(self.kernel_manager.cleanup_connection_file)
@@ -318,6 +322,7 @@ def init_kernel_client(self):
318322
iopub_port=self.iopub_port,
319323
stdin_port=self.stdin_port,
320324
hb_port=self.hb_port,
325+
control_port=self.control_port,
321326
connection_file=self.connection_file,
322327
parent=self,
323328
)

jupyter_client/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def shutdown_kernel(self, now=False, restart=False):
297297
298298
This attempts to shutdown the kernels cleanly by:
299299
300-
1. Sending it a shutdown message over the shell channel.
300+
1. Sending it a shutdown message over the control channel.
301301
2. If that fails, the kernel is shutdown forcibly by sending it
302302
a signal.
303303

jupyter_client/threaded.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,14 @@ def ioloop(self):
230230

231231
ioloop_thread = Instance(IOLoopThread, allow_none=True)
232232

233-
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
233+
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
234234
self.ioloop_thread = IOLoopThread()
235235
self.ioloop_thread.start()
236236

237237
if shell:
238238
self.shell_channel._inspect = self._check_kernel_info_reply
239239

240-
super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
240+
super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb, control)
241241

242242
def _check_kernel_info_reply(self, msg):
243243
"""This is run in the ioloop thread when the kernel info reply is received
@@ -255,3 +255,4 @@ def stop_channels(self):
255255
shell_channel_class = Type(ThreadedZMQSocketChannel)
256256
stdin_channel_class = Type(ThreadedZMQSocketChannel)
257257
hb_channel_class = Type(HBChannel)
258+
control_channel_class = Type(ThreadedZMQSocketChannel)

0 commit comments

Comments
 (0)