Skip to content

Commit 43a9780

Browse files
rgbkrkgnestor
authored andcommitted
buffer messages when websocket connection is interrupted (#2871)
* provide some top level comments * implement buffering of messages on last dropped connection - buffer is per-kernel - session_key is stored because only a single session can resume the buffer and we can't be sure - on any new connection to a kernel, buffer is flushed. If session_key matches, it is replayed. Otherwise, it is discarded. - buffer is an unbounded list for now * restore actual zmq channels when resuming connection rather than establishing new connections fixes failure to resume shell channel * hookup restart callbacks in open instead of in `create_stream`, which is not called on reconnect * improve handling of restored connections in js - dismiss 'connection lost' dialog on reconnect - set busy status on reconnect (if not busy, idle will come soon after via kernel_ready)
1 parent 9b4660f commit 43a9780

File tree

3 files changed

+158
-22
lines changed

3 files changed

+158
-22
lines changed

notebook/services/kernels/handlers.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def post(self, kernel_id, action):
9393

9494

9595
class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
96+
'''There is one ZMQChannelsHandler per running kernel and it oversees all
97+
the sessions.
98+
'''
9699

97100
# class-level registry of open sessions
98101
# allows checking for conflict on session-id,
@@ -125,8 +128,6 @@ def create_stream(self):
125128
meth = getattr(km, 'connect_' + channel)
126129
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
127130
stream.channel = channel
128-
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
129-
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
130131

131132
def request_kernel_info(self):
132133
"""send a request for kernel_info"""
@@ -252,23 +253,41 @@ def _register_session(self):
252253
self.log.warning("Replacing stale connection: %s", self.session_key)
253254
yield stale_handler.close()
254255
self._open_sessions[self.session_key] = self
255-
256+
256257
def open(self, kernel_id):
257258
super(ZMQChannelsHandler, self).open()
258-
self.kernel_manager.notify_connect(kernel_id)
259-
try:
260-
self.create_stream()
261-
except web.HTTPError as e:
262-
self.log.error("Error opening stream: %s", e)
263-
# WebSockets don't response to traditional error codes so we
264-
# close the connection.
265-
for channel, stream in self.channels.items():
266-
if not stream.closed():
267-
stream.close()
268-
self.close()
259+
km = self.kernel_manager
260+
km.notify_connect(kernel_id)
261+
262+
# on new connections, flush the message buffer
263+
buffer_info = km.get_buffer(kernel_id, self.session_key)
264+
if buffer_info and buffer_info['session_key'] == self.session_key:
265+
self.log.info("Restoring connection for %s", self.session_key)
266+
self.channels = buffer_info['channels']
267+
replay_buffer = buffer_info['buffer']
268+
if replay_buffer:
269+
self.log.info("Replaying %s buffered messages", len(replay_buffer))
270+
for channel, msg_list in replay_buffer:
271+
stream = self.channels[channel]
272+
self._on_zmq_reply(stream, msg_list)
269273
else:
270-
for channel, stream in self.channels.items():
271-
stream.on_recv_stream(self._on_zmq_reply)
274+
try:
275+
self.create_stream()
276+
except web.HTTPError as e:
277+
self.log.error("Error opening stream: %s", e)
278+
# WebSockets don't response to traditional error codes so we
279+
# close the connection.
280+
for channel, stream in self.channels.items():
281+
if not stream.closed():
282+
stream.close()
283+
self.close()
284+
return
285+
286+
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
287+
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
288+
289+
for channel, stream in self.channels.items():
290+
stream.on_recv_stream(self._on_zmq_reply)
272291

273292
def on_message(self, msg):
274293
if not self.channels:
@@ -288,7 +307,7 @@ def on_message(self, msg):
288307
return
289308
stream = self.channels[channel]
290309
self.session.send(stream, msg)
291-
310+
292311
def _on_zmq_reply(self, stream, msg_list):
293312
idents, fed_msg_list = self.session.feed_identities(msg_list)
294313
msg = self.session.deserialize(fed_msg_list)
@@ -301,7 +320,6 @@ def write_stderr(error_message):
301320
)
302321
msg['channel'] = 'iopub'
303322
self.write_message(json.dumps(msg, default=date_default))
304-
305323
channel = getattr(stream, 'channel', None)
306324
msg_type = msg['header']['msg_type']
307325

@@ -408,6 +426,7 @@ def on_close(self):
408426
# unregister myself as an open session (only if it's really me)
409427
if self._open_sessions.get(self.session_key) is self:
410428
self._open_sessions.pop(self.session_key)
429+
411430
km = self.kernel_manager
412431
if self.kernel_id in km:
413432
km.notify_disconnect(self.kernel_id)
@@ -417,6 +436,13 @@ def on_close(self):
417436
km.remove_restart_callback(
418437
self.kernel_id, self.on_restart_failed, 'dead',
419438
)
439+
440+
# start buffering instead of closing if this was the last connection
441+
if km._kernel_connections[self.kernel_id] == 0:
442+
km.start_buffering(self.kernel_id, self.session_key, self.channels)
443+
self._close_future.set_result(None)
444+
return
445+
420446
# This method can be called twice, once by self.kernel_died and once
421447
# from the WebSocket close event. If the WebSocket connection is
422448
# closed before the ZMQ streams are setup, they could be None.

notebook/services/kernels/kernelmanager.py

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
# Copyright (c) Jupyter Development Team.
88
# Distributed under the terms of the Modified BSD License.
99

10+
from collections import defaultdict
11+
from functools import partial
1012
import os
1113

1214
from tornado import gen, web
@@ -15,13 +17,13 @@
1517

1618
from jupyter_client.session import Session
1719
from jupyter_client.multikernelmanager import MultiKernelManager
18-
from traitlets import Bool, Dict, List, Unicode, TraitError, Integer, default, validate
20+
from traitlets import Any, Bool, Dict, List, Unicode, TraitError, Integer, default, validate
1921

2022
from notebook.utils import to_os_path, exists
2123
from notebook._tz import utcnow, isoformat
2224
from ipython_genutils.py3compat import getcwd
2325

24-
from datetime import datetime, timedelta
26+
from datetime import timedelta
2527

2628

2729
class MappingKernelManager(MultiKernelManager):
@@ -81,6 +83,11 @@ def _update_root_dir(self, proposal):
8183
Only effective if cull_idle_timeout is not 0."""
8284
)
8385

86+
_kernel_buffers = Any()
87+
@default('_kernel_buffers')
88+
def _default_kernel_buffers(self):
89+
return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}})
90+
8491
#-------------------------------------------------------------------------
8592
# Methods for managing kernels and sessions
8693
#-------------------------------------------------------------------------
@@ -142,10 +149,97 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs):
142149
# py2-compat
143150
raise gen.Return(kernel_id)
144151

152+
def start_buffering(self, kernel_id, session_key, channels):
153+
"""Start buffering messages for a kernel
154+
155+
Parameters
156+
----------
157+
kernel_id : str
158+
The id of the kernel to stop buffering.
159+
session_key: str
160+
The session_key, if any, that should get the buffer.
161+
If the session_key matches the current buffered session_key,
162+
the buffer will be returned.
163+
channels: dict({'channel': ZMQStream})
164+
The zmq channels whose messages should be buffered.
165+
"""
166+
self.log.info("Starting buffering for %s", session_key)
167+
self._check_kernel_id(kernel_id)
168+
# clear previous buffering state
169+
self.stop_buffering(kernel_id)
170+
buffer_info = self._kernel_buffers[kernel_id]
171+
# record the session key because only one session can buffer
172+
buffer_info['session_key'] = session_key
173+
# TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple
174+
buffer_info['buffer'] = []
175+
buffer_info['channels'] = channels
176+
177+
# forward any future messages to the internal buffer
178+
def buffer_msg(channel, msg_parts):
179+
self.log.debug("Buffering msg on %s:%s", kernel_id, channel)
180+
buffer_info['buffer'].append((channel, msg_parts))
181+
182+
for channel, stream in channels.items():
183+
stream.on_recv(partial(buffer_msg, channel))
184+
185+
186+
def get_buffer(self, kernel_id, session_key):
187+
"""Get the buffer for a given kernel
188+
189+
Parameters
190+
----------
191+
kernel_id : str
192+
The id of the kernel to stop buffering.
193+
session_key: str, optional
194+
The session_key, if any, that should get the buffer.
195+
If the session_key matches the current buffered session_key,
196+
the buffer will be returned.
197+
"""
198+
self.log.debug("Getting buffer for %s", kernel_id)
199+
if kernel_id not in self._kernel_buffers:
200+
return
201+
202+
buffer_info = self._kernel_buffers[kernel_id]
203+
if buffer_info['session_key'] == session_key:
204+
# remove buffer
205+
self._kernel_buffers.pop(kernel_id)
206+
# only return buffer_info if it's a match
207+
return buffer_info
208+
else:
209+
self.stop_buffering(kernel_id)
210+
211+
def stop_buffering(self, kernel_id):
212+
"""Stop buffering kernel messages
213+
214+
Parameters
215+
----------
216+
kernel_id : str
217+
The id of the kernel to stop buffering.
218+
"""
219+
self.log.debug("Clearing buffer for %s", kernel_id)
220+
self._check_kernel_id(kernel_id)
221+
222+
if kernel_id not in self._kernel_buffers:
223+
return
224+
buffer_info = self._kernel_buffers.pop(kernel_id)
225+
# close buffering streams
226+
for stream in buffer_info['channels'].values():
227+
if not stream.closed():
228+
stream.on_recv(None)
229+
stream.socket.close()
230+
stream.close()
231+
232+
msg_buffer = buffer_info['buffer']
233+
if msg_buffer:
234+
self.log.info("Discarding %s buffered messages for %s",
235+
len(msg_buffer), buffer_info['session_key'])
236+
145237
def shutdown_kernel(self, kernel_id, now=False):
146238
"""Shutdown a kernel by kernel_id"""
147239
self._check_kernel_id(kernel_id)
148-
self._kernels[kernel_id]._activity_stream.close()
240+
kernel = self._kernels[kernel_id]
241+
kernel._activity_stream.close()
242+
self.stop_buffering(kernel_id)
149243
self._kernel_connections.pop(kernel_id, None)
150244
return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now)
151245

@@ -256,6 +350,7 @@ def record_activity(msg_list):
256350

257351
idents, fed_msg_list = session.feed_identities(msg_list)
258352
msg = session.deserialize(fed_msg_list)
353+
259354
msg_type = msg['header']['msg_type']
260355
self.log.debug("activity on %s: %s", kernel_id, msg_type)
261356
if msg_type == 'status':

notebook/static/notebook/js/notificationarea.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ define([
107107

108108
this.events.on('kernel_connected.Kernel', function () {
109109
knw.info("Connected", 500);
110+
// trigger busy in the status to clear broken-link state immediately
111+
// a kernel_ready event will come when the kernel becomes responsive.
112+
$kernel_ind_icon
113+
.attr('class', 'kernel_busy_icon')
114+
.attr('title', i18n.msg._('Kernel Connected'));
110115
});
111116

112117
this.events.on('kernel_restarting.Kernel', function () {
@@ -161,7 +166,7 @@ define([
161166
" The notebook will continue trying to reconnect. Check your" +
162167
" network connection or notebook server configuration.");
163168

164-
dialog.kernel_modal({
169+
var the_dialog = dialog.kernel_modal({
165170
title: i18n.msg._("Connection failed"),
166171
body: msg,
167172
keyboard_manager: that.keyboard_manager,
@@ -170,6 +175,16 @@ define([
170175
"OK": {}
171176
}
172177
});
178+
179+
// hide the dialog on reconnect if it's still visible
180+
var dismiss = function () {
181+
the_dialog.modal('hide');
182+
}
183+
that.events.on("kernel_connected.Kernel", dismiss);
184+
the_dialog.on("hidden.bs.modal", function () {
185+
// clear handler on dismiss
186+
that.events.off("kernel_connected.Kernel", dismiss);
187+
});
173188
}
174189
});
175190

0 commit comments

Comments
 (0)