Skip to content

Commit db69061

Browse files
committed
Merge remote-tracking branch 'upstream/main' into kernelspec-cache
2 parents 1610d4b + 35ffe5d commit db69061

File tree

11 files changed

+373
-30
lines changed

11 files changed

+373
-30
lines changed

docs/source/api/jupyter_server.gateway.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ Submodules
55
----------
66

77

8+
.. automodule:: jupyter_server.gateway.connections
9+
:members:
10+
:undoc-members:
11+
:show-inheritance:
12+
13+
814
.. automodule:: jupyter_server.gateway.gateway_client
915
:members:
1016
:undoc-members:

jupyter_server/base/handlers.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,13 @@ def wrapper(self, *args, **kwargs):
946946

947947

948948
class FileFindHandler(JupyterHandler, web.StaticFileHandler):
949-
"""subclass of StaticFileHandler for serving files from a search path"""
949+
"""subclass of StaticFileHandler for serving files from a search path
950+
951+
The setting "static_immutable_cache" can be set up to serve some static
952+
file as immutable (e.g. file name containing a hash). The setting is a
953+
list of base URL, every static file URL starting with one of those will
954+
be immutable.
955+
"""
950956

951957
# cache search results, don't search for files more than once
952958
_static_paths: dict = {}
@@ -955,8 +961,15 @@ class FileFindHandler(JupyterHandler, web.StaticFileHandler):
955961
def set_headers(self):
956962
"""Set the headers."""
957963
super().set_headers()
964+
965+
immutable_paths = self.settings.get("static_immutable_cache", [])
966+
967+
# allow immutable cache for files
968+
if any(self.request.path.startswith(path) for path in immutable_paths):
969+
self.set_header("Cache-Control", "public, max-age=31536000, immutable")
970+
958971
# disable browser caching, rely on 304 replies for savings
959-
if "v" not in self.request.arguments or any(
972+
elif "v" not in self.request.arguments or any(
960973
self.request.path.startswith(path) for path in self.no_cache_paths
961974
):
962975
self.set_header("Cache-Control", "no-cache")
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""Gateway connection classes."""
2+
# Copyright (c) Jupyter Development Team.
3+
# Distributed under the terms of the Modified BSD License.
4+
5+
import asyncio
6+
import logging
7+
import random
8+
from typing import Any, cast
9+
10+
import tornado.websocket as tornado_websocket
11+
from tornado.concurrent import Future
12+
from tornado.escape import json_decode, url_escape, utf8
13+
from tornado.httpclient import HTTPRequest
14+
from tornado.ioloop import IOLoop
15+
from traitlets import Bool, Instance, Int
16+
17+
from ..services.kernels.connection.base import BaseKernelWebsocketConnection
18+
from ..utils import url_path_join
19+
from .managers import GatewayClient
20+
21+
22+
class GatewayWebSocketConnection(BaseKernelWebsocketConnection):
23+
"""Web socket connection that proxies to a kernel/enterprise gateway."""
24+
25+
ws = Instance(klass=tornado_websocket.WebSocketClientConnection, allow_none=True)
26+
27+
ws_future = Instance(default_value=Future(), klass=Future)
28+
29+
disconnected = Bool(False)
30+
31+
retry = Int(0)
32+
33+
async def connect(self):
34+
"""Connect to the socket."""
35+
# websocket is initialized before connection
36+
self.ws = None
37+
ws_url = url_path_join(
38+
GatewayClient.instance().ws_url,
39+
GatewayClient.instance().kernels_endpoint,
40+
url_escape(self.kernel_id),
41+
"channels",
42+
)
43+
self.log.info(f"Connecting to {ws_url}")
44+
kwargs: dict = {}
45+
kwargs = GatewayClient.instance().load_connection_args(**kwargs)
46+
47+
request = HTTPRequest(ws_url, **kwargs)
48+
self.ws_future = cast(Future, tornado_websocket.websocket_connect(request))
49+
self.ws_future.add_done_callback(self._connection_done)
50+
51+
loop = IOLoop.current()
52+
loop.add_future(self.ws_future, lambda future: self._read_messages())
53+
54+
def _connection_done(self, fut):
55+
"""Handle a finished connection."""
56+
if (
57+
not self.disconnected and fut.exception() is None
58+
): # prevent concurrent.futures._base.CancelledError
59+
self.ws = fut.result()
60+
self.retry = 0
61+
self.log.debug(f"Connection is ready: ws: {self.ws}")
62+
else:
63+
self.log.warning(
64+
"Websocket connection has been closed via client disconnect or due to error. "
65+
"Kernel with ID '{}' may not be terminated on GatewayClient: {}".format(
66+
self.kernel_id, GatewayClient.instance().url
67+
)
68+
)
69+
70+
def disconnect(self):
71+
"""Handle a disconnect."""
72+
self.disconnected = True
73+
if self.ws is not None:
74+
# Close connection
75+
self.ws.close()
76+
elif not self.ws_future.done():
77+
# Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally
78+
self.ws_future.cancel()
79+
self.log.debug(f"_disconnect: future cancelled, disconnected: {self.disconnected}")
80+
81+
async def _read_messages(self):
82+
"""Read messages from gateway server."""
83+
while self.ws is not None:
84+
message = None
85+
if not self.disconnected:
86+
try:
87+
message = await self.ws.read_message()
88+
except Exception as e:
89+
self.log.error(
90+
f"Exception reading message from websocket: {e}"
91+
) # , exc_info=True)
92+
if message is None:
93+
if not self.disconnected:
94+
self.log.warning(f"Lost connection to Gateway: {self.kernel_id}")
95+
break
96+
self.handle_outgoing_message(
97+
message
98+
) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open)
99+
else: # ws cancelled - stop reading
100+
break
101+
102+
# NOTE(esevan): if websocket is not disconnected by client, try to reconnect.
103+
if not self.disconnected and self.retry < GatewayClient.instance().gateway_retry_max:
104+
jitter = random.randint(10, 100) * 0.01 # noqa
105+
retry_interval = (
106+
min(
107+
GatewayClient.instance().gateway_retry_interval * (2**self.retry),
108+
GatewayClient.instance().gateway_retry_interval_max,
109+
)
110+
+ jitter
111+
)
112+
self.retry += 1
113+
self.log.info(
114+
"Attempting to re-establish the connection to Gateway in %s secs (%s/%s): %s",
115+
retry_interval,
116+
self.retry,
117+
GatewayClient.instance().gateway_retry_max,
118+
self.kernel_id,
119+
)
120+
await asyncio.sleep(retry_interval)
121+
loop = IOLoop.current()
122+
loop.spawn_callback(self.connect)
123+
124+
def handle_outgoing_message(self, incoming_msg: str, *args: Any) -> None:
125+
"""Send message to the notebook client."""
126+
try:
127+
self.websocket_handler.write_message(incoming_msg)
128+
except tornado_websocket.WebSocketClosedError:
129+
if self.log.isEnabledFor(logging.DEBUG):
130+
msg_summary = GatewayWebSocketConnection._get_message_summary(
131+
json_decode(utf8(incoming_msg))
132+
)
133+
self.log.debug(
134+
"Notebook client closed websocket connection - message dropped: {}".format(
135+
msg_summary
136+
)
137+
)
138+
139+
def handle_incoming_message(self, message: str) -> None:
140+
"""Send message to gateway server."""
141+
if self.ws is None:
142+
loop = IOLoop.current()
143+
loop.add_future(self.ws_future, lambda future: self.handle_incoming_message(message))
144+
else:
145+
self._write_message(message)
146+
147+
def _write_message(self, message):
148+
"""Send message to gateway server."""
149+
try:
150+
if not self.disconnected and self.ws is not None:
151+
self.ws.write_message(message)
152+
except Exception as e:
153+
self.log.error(f"Exception writing message to websocket: {e}") # , exc_info=True)
154+
155+
@staticmethod
156+
def _get_message_summary(message):
157+
"""Get a summary of a message."""
158+
summary = []
159+
message_type = message["msg_type"]
160+
summary.append(f"type: {message_type}")
161+
162+
if message_type == "status":
163+
summary.append(", state: {}".format(message["content"]["execution_state"]))
164+
elif message_type == "error":
165+
summary.append(
166+
", {}:{}:{}".format(
167+
message["content"]["ename"],
168+
message["content"]["evalue"],
169+
message["content"]["traceback"],
170+
)
171+
)
172+
else:
173+
summary.append(", ...") # don't display potentially sensitive data
174+
175+
return "".join(summary)

jupyter_server/gateway/handlers.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import mimetypes
77
import os
88
import random
9+
import warnings
910
from typing import Optional, cast
1011

1112
from jupyter_client.session import Session
@@ -21,6 +22,13 @@
2122
from ..utils import url_path_join
2223
from .managers import GatewayClient
2324

25+
warnings.warn(
26+
"The jupyter_server.gateway.handlers module is deprecated and will not be supported in Jupyter Server 3.0",
27+
DeprecationWarning,
28+
stacklevel=2,
29+
)
30+
31+
2432
# Keepalive ping interval (default: 30 seconds)
2533
GATEWAY_WS_PING_INTERVAL_SECS = int(os.getenv("GATEWAY_WS_PING_INTERVAL_SECS", "30"))
2634

jupyter_server/kernelspecs/handlers.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
"""Kernelspecs API Handlers."""
2+
import mimetypes
3+
24
from jupyter_core.utils import ensure_async
35
from tornado import web
46

@@ -27,6 +29,26 @@ async def get(self, kernel_name, path, include_body=True):
2729
ksc = self.kernel_spec_cache
2830
if path.lower().endswith(".png"):
2931
self.set_header("Cache-Control", f"max-age={60*60*24*30}")
32+
ksm = self.kernel_spec_manager
33+
if hasattr(ksm, "get_kernel_spec_resource"):
34+
# If the kernel spec manager defines a method to get kernelspec resources,
35+
# then use that instead of trying to read from disk.
36+
kernel_spec_res = await ksm.get_kernel_spec_resource(kernel_name, path)
37+
if kernel_spec_res is not None:
38+
# We have to explicitly specify the `absolute_path` attribute so that
39+
# the underlying StaticFileHandler methods can calculate an etag.
40+
self.absolute_path = path
41+
mimetype: str = mimetypes.guess_type(path)[0] or "text/plain"
42+
self.set_header("Content-Type", mimetype)
43+
self.finish(kernel_spec_res)
44+
return
45+
else:
46+
self.log.warning(
47+
"Kernelspec resource '{}' for '{}' not found. Kernel spec manager may"
48+
" not support resource serving. Falling back to reading from disk".format(
49+
path, kernel_name
50+
)
51+
)
3052
try:
3153
kspec = await ensure_async(ksc.get_kernel_spec(kernel_name))
3254
self.root = kspec.resource_dir

jupyter_server/serverapp.py

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
from jupyter_server.extension.config import ExtensionConfigManager
8888
from jupyter_server.extension.manager import ExtensionManager
8989
from jupyter_server.extension.serverextension import ServerExtensionApp
90+
from jupyter_server.gateway.connections import GatewayWebSocketConnection
9091
from jupyter_server.gateway.managers import (
9192
GatewayClient,
9293
GatewayKernelSpecManager,
@@ -438,17 +439,6 @@ def init_handlers(self, default_services, settings):
438439
# And from identity provider
439440
handlers.extend(settings["identity_provider"].get_handlers())
440441

441-
# If gateway mode is enabled, replace appropriate handlers to perform redirection
442-
if GatewayClient.instance().gateway_enabled:
443-
# for each handler required for gateway, locate its pattern
444-
# in the current list and replace that entry...
445-
gateway_handlers = load_handlers("jupyter_server.gateway.handlers")
446-
for _, gwh in enumerate(gateway_handlers):
447-
for j, h in enumerate(handlers):
448-
if gwh[0] == h[0]:
449-
handlers[j] = (gwh[0], gwh[1])
450-
break
451-
452442
# register base handlers last
453443
handlers.extend(load_handlers("jupyter_server.base.handlers"))
454444

@@ -801,6 +791,7 @@ class ServerApp(JupyterApp):
801791
GatewayMappingKernelManager,
802792
GatewayKernelSpecManager,
803793
GatewaySessionManager,
794+
GatewayWebSocketConnection,
804795
GatewayClient,
805796
Authorizer,
806797
EventLogger,
@@ -1520,12 +1511,17 @@ def _default_session_manager_class(self):
15201511
)
15211512

15221513
kernel_websocket_connection_class = Type(
1523-
default_value=ZMQChannelsWebsocketConnection,
15241514
klass=BaseKernelWebsocketConnection,
15251515
config=True,
15261516
help=_i18n("The kernel websocket connection class to use."),
15271517
)
15281518

1519+
@default("kernel_websocket_connection_class")
1520+
def _default_kernel_websocket_connection_class(self):
1521+
if self.gateway_config.gateway_enabled:
1522+
return "jupyter_server.gateway.connections.GatewayWebSocketConnection"
1523+
return ZMQChannelsWebsocketConnection
1524+
15291525
config_manager_class = Type(
15301526
default_value=ConfigManager,
15311527
config=True,
@@ -1827,6 +1823,17 @@ def _default_terminals_enabled(self):
18271823
config=True,
18281824
)
18291825

1826+
static_immutable_cache = List(
1827+
Unicode(),
1828+
help="""
1829+
Paths to set up static files as immutable.
1830+
1831+
This allow setting up the cache control of static files as immutable.
1832+
It should be used for static file named with a hash for instance.
1833+
""",
1834+
config=True,
1835+
)
1836+
18301837
_starter_app = Instance(
18311838
default_value=None,
18321839
allow_none=True,
@@ -2010,6 +2017,9 @@ def init_webapp(self):
20102017
] = self.identity_provider.get_secure_cookie_kwargs
20112018
self.tornado_settings["token"] = self.identity_provider.token
20122019

2020+
if self.static_immutable_cache:
2021+
self.tornado_settings["static_immutable_cache"] = self.static_immutable_cache
2022+
20132023
# ensure default_url starts with base_url
20142024
if not self.default_url.startswith(self.base_url):
20152025
self.default_url = url_path_join(self.base_url, self.default_url)
@@ -2883,7 +2893,19 @@ async def _cleanup(self):
28832893
self.remove_browser_open_files()
28842894
await self.cleanup_extensions()
28852895
await self.cleanup_kernels()
2886-
await self.kernel_websocket_connection_class.close_all()
2896+
try:
2897+
await self.kernel_websocket_connection_class.close_all()
2898+
except AttributeError:
2899+
# This can happen in two different scenarios:
2900+
#
2901+
# 1. During tests, where the _cleanup method is invoked without
2902+
# the corresponding initialize method having been invoked.
2903+
# 2. If the provided `kernel_websocket_connection_class` does not
2904+
# implement the `close_all` class method.
2905+
#
2906+
# In either case, we don't need to do anything and just want to treat
2907+
# the raised error as a no-op.
2908+
pass
28872909
if getattr(self, "kernel_manager", None):
28882910
self.kernel_manager.__del__()
28892911
if getattr(self, "session_manager", None):

0 commit comments

Comments
 (0)