Skip to content

Commit 7649e39

Browse files
authored
Merge pull request #92 from kevin-bates/port-gateway-from-notebook
Port gateway from notebook
2 parents 2f4b09a + 898ac31 commit 7649e39

File tree

11 files changed

+1361
-59
lines changed

11 files changed

+1361
-59
lines changed

docs/source/public_server.rst

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,35 @@ single-tab mode:
343343
});
344344
345345
346+
Using a gateway server for kernel management
347+
--------------------------------------------
348+
349+
You are now able to redirect the management of your kernels to a Gateway Server
350+
(i.e., `Jupyter Kernel Gateway <https://jupyter-kernel-gateway.readthedocs.io/en/latest/>`_ or
351+
`Jupyter Enterprise Gateway <https://jupyter-enterprise-gateway.readthedocs.io/en/latest/>`_)
352+
simply by specifying a Gateway url via the following command-line option:
353+
354+
.. code-block:: bash
355+
356+
$ jupyter notebook --gateway-url=http://my-gateway-server:8888
357+
358+
the environment:
359+
360+
.. code-block:: bash
361+
362+
JUPYTER_GATEWAY_URL=http://my-gateway-server:8888
363+
364+
or in :file:`jupyter_notebook_config.py`:
365+
366+
.. code-block:: python
367+
368+
c.GatewayClient.url = http://my-gateway-server:8888
369+
370+
When provided, all kernel specifications will be retrieved from the specified Gateway server and all
371+
kernels will be managed by that server. This option enables the ability to target kernel processes
372+
against managed clusters while allowing for the notebook's management to remain local to the Notebook
373+
server.
374+
346375
Known issues
347376
------------
348377

jupyter_server/gateway/__init__.py

Whitespace-only changes.

jupyter_server/gateway/handlers.py

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# Copyright (c) Jupyter Development Team.
2+
# Distributed under the terms of the Modified BSD License.
3+
4+
import os
5+
import logging
6+
import mimetypes
7+
8+
from ..base.handlers import APIHandler, JupyterHandler
9+
from ..utils import url_path_join
10+
11+
from tornado import gen, web
12+
from tornado.concurrent import Future
13+
from tornado.ioloop import IOLoop, PeriodicCallback
14+
from tornado.websocket import WebSocketHandler, websocket_connect
15+
from tornado.httpclient import HTTPRequest
16+
from tornado.escape import url_escape, json_decode, utf8
17+
18+
from ipython_genutils.py3compat import cast_unicode
19+
from jupyter_client.session import Session
20+
from traitlets.config.configurable import LoggingConfigurable
21+
22+
from .managers import GatewayClient
23+
24+
# Keepalive ping interval (default: 30 seconds)
25+
GATEWAY_WS_PING_INTERVAL_SECS = int(os.getenv('GATEWAY_WS_PING_INTERVAL_SECS', 30))
26+
27+
28+
class WebSocketChannelsHandler(WebSocketHandler, JupyterHandler):
29+
30+
session = None
31+
gateway = None
32+
kernel_id = None
33+
ping_callback = None
34+
35+
def set_default_headers(self):
36+
"""Undo the set_default_headers in IPythonHandler which doesn't make sense for websockets"""
37+
pass
38+
39+
def get_compression_options(self):
40+
# use deflate compress websocket
41+
return {}
42+
43+
def authenticate(self):
44+
"""Run before finishing the GET request
45+
46+
Extend this method to add logic that should fire before
47+
the websocket finishes completing.
48+
"""
49+
# authenticate the request before opening the websocket
50+
if self.get_current_user() is None:
51+
self.log.warning("Couldn't authenticate WebSocket connection")
52+
raise web.HTTPError(403)
53+
54+
if self.get_argument('session_id', False):
55+
self.session.session = cast_unicode(self.get_argument('session_id'))
56+
else:
57+
self.log.warning("No session ID specified")
58+
59+
def initialize(self):
60+
self.log.debug("Initializing websocket connection %s", self.request.path)
61+
self.session = Session(config=self.config)
62+
self.gateway = GatewayWebSocketClient(gateway_url=GatewayClient.instance().url)
63+
64+
@gen.coroutine
65+
def get(self, kernel_id, *args, **kwargs):
66+
self.authenticate()
67+
self.kernel_id = cast_unicode(kernel_id, 'ascii')
68+
yield super(WebSocketChannelsHandler, self).get(kernel_id=kernel_id, *args, **kwargs)
69+
70+
def send_ping(self):
71+
if self.ws_connection is None and self.ping_callback is not None:
72+
self.ping_callback.stop()
73+
return
74+
75+
self.ping(b'')
76+
77+
def open(self, kernel_id, *args, **kwargs):
78+
"""Handle web socket connection open to notebook server and delegate to gateway web socket handler """
79+
self.ping_callback = PeriodicCallback(self.send_ping, GATEWAY_WS_PING_INTERVAL_SECS * 1000)
80+
self.ping_callback.start()
81+
82+
self.gateway.on_open(
83+
kernel_id=kernel_id,
84+
message_callback=self.write_message,
85+
compression_options=self.get_compression_options()
86+
)
87+
88+
def on_message(self, message):
89+
"""Forward message to gateway web socket handler."""
90+
self.gateway.on_message(message)
91+
92+
def write_message(self, message, binary=False):
93+
"""Send message back to notebook client. This is called via callback from self.gateway._read_messages."""
94+
if self.ws_connection: # prevent WebSocketClosedError
95+
if isinstance(message, bytes):
96+
binary = True
97+
super(WebSocketChannelsHandler, self).write_message(message, binary=binary)
98+
elif self.log.isEnabledFor(logging.DEBUG):
99+
msg_summary = WebSocketChannelsHandler._get_message_summary(json_decode(utf8(message)))
100+
self.log.debug("Notebook client closed websocket connection - message dropped: {}".format(msg_summary))
101+
102+
def on_close(self):
103+
self.log.debug("Closing websocket connection %s", self.request.path)
104+
self.gateway.on_close()
105+
super(WebSocketChannelsHandler, self).on_close()
106+
107+
@staticmethod
108+
def _get_message_summary(message):
109+
summary = []
110+
message_type = message['msg_type']
111+
summary.append('type: {}'.format(message_type))
112+
113+
if message_type == 'status':
114+
summary.append(', state: {}'.format(message['content']['execution_state']))
115+
elif message_type == 'error':
116+
summary.append(', {}:{}:{}'.format(message['content']['ename'],
117+
message['content']['evalue'],
118+
message['content']['traceback']))
119+
else:
120+
summary.append(', ...') # don't display potentially sensitive data
121+
122+
return ''.join(summary)
123+
124+
125+
class GatewayWebSocketClient(LoggingConfigurable):
126+
"""Proxy web socket connection to a kernel/enterprise gateway."""
127+
128+
def __init__(self, **kwargs):
129+
super(GatewayWebSocketClient, self).__init__(**kwargs)
130+
self.kernel_id = None
131+
self.ws = None
132+
self.ws_future = Future()
133+
self.disconnected = False
134+
135+
@gen.coroutine
136+
def _connect(self, kernel_id):
137+
# websocket is initialized before connection
138+
self.ws = None
139+
self.kernel_id = kernel_id
140+
ws_url = url_path_join(
141+
GatewayClient.instance().ws_url,
142+
GatewayClient.instance().kernels_endpoint, url_escape(kernel_id), 'channels'
143+
)
144+
self.log.info('Connecting to {}'.format(ws_url))
145+
kwargs = {}
146+
kwargs = GatewayClient.instance().load_connection_args(**kwargs)
147+
148+
request = HTTPRequest(ws_url, **kwargs)
149+
self.ws_future = websocket_connect(request)
150+
self.ws_future.add_done_callback(self._connection_done)
151+
152+
def _connection_done(self, fut):
153+
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
154+
self.ws = fut.result()
155+
self.log.debug("Connection is ready: ws: {}".format(self.ws))
156+
else:
157+
self.log.warning("Websocket connection has been closed via client disconnect or due to error. "
158+
"Kernel with ID '{}' may not be terminated on GatewayClient: {}".
159+
format(self.kernel_id, GatewayClient.instance().url))
160+
161+
def _disconnect(self):
162+
self.disconnected = True
163+
if self.ws is not None:
164+
# Close connection
165+
self.ws.close()
166+
elif not self.ws_future.done():
167+
# Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally
168+
self.ws_future.cancel()
169+
self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected))
170+
171+
@gen.coroutine
172+
def _read_messages(self, callback):
173+
"""Read messages from gateway server."""
174+
while self.ws is not None:
175+
message = None
176+
if not self.disconnected:
177+
try:
178+
message = yield self.ws.read_message()
179+
except Exception as e:
180+
self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True)
181+
if message is None:
182+
if not self.disconnected:
183+
self.log.warning("Lost connection to Gateway: {}".format(self.kernel_id))
184+
break
185+
callback(message) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open)
186+
else: # ws cancelled - stop reading
187+
break
188+
189+
if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway
190+
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id))
191+
self._connect(self.kernel_id)
192+
loop = IOLoop.current()
193+
loop.add_future(self.ws_future, lambda future: self._read_messages(callback))
194+
195+
def on_open(self, kernel_id, message_callback, **kwargs):
196+
"""Web socket connection open against gateway server."""
197+
self._connect(kernel_id)
198+
loop = IOLoop.current()
199+
loop.add_future(
200+
self.ws_future,
201+
lambda future: self._read_messages(message_callback)
202+
)
203+
204+
def on_message(self, message):
205+
"""Send message to gateway server."""
206+
if self.ws is None:
207+
loop = IOLoop.current()
208+
loop.add_future(
209+
self.ws_future,
210+
lambda future: self._write_message(message)
211+
)
212+
else:
213+
self._write_message(message)
214+
215+
def _write_message(self, message):
216+
"""Send message to gateway server."""
217+
try:
218+
if not self.disconnected and self.ws is not None:
219+
self.ws.write_message(message)
220+
except Exception as e:
221+
self.log.error("Exception writing message to websocket: {}".format(e)) # , exc_info=True)
222+
223+
def on_close(self):
224+
"""Web socket closed event."""
225+
self._disconnect()
226+
227+
228+
class GatewayResourceHandler(APIHandler):
229+
"""Retrieves resources for specific kernelspec definitions from kernel/enterprise gateway."""
230+
231+
@web.authenticated
232+
@gen.coroutine
233+
def get(self, kernel_name, path, include_body=True):
234+
ksm = self.kernel_spec_manager
235+
kernel_spec_res = yield ksm.get_kernel_spec_resource(kernel_name, path)
236+
if kernel_spec_res is None:
237+
self.log.warning("Kernelspec resource '{}' for '{}' not found. Gateway may not support"
238+
" resource serving.".format(path, kernel_name))
239+
else:
240+
self.set_header("Content-Type", mimetypes.guess_type(path)[0])
241+
self.finish(kernel_spec_res)
242+
243+
244+
from ..services.kernels.handlers import _kernel_id_regex
245+
from ..services.kernelspecs.handlers import kernel_name_regex
246+
247+
default_handlers = [
248+
(r"/api/kernels/%s/channels" % _kernel_id_regex, WebSocketChannelsHandler),
249+
(r"/kernelspecs/%s/(?P<path>.*)" % kernel_name_regex, GatewayResourceHandler),
250+
]

0 commit comments

Comments
 (0)