10
10
import logging
11
11
from textwrap import dedent
12
12
13
- from tornado import web
13
+ from tornado import web , gen
14
14
from tornado .concurrent import Future
15
15
from tornado .ioloop import IOLoop
16
16
@@ -122,11 +122,122 @@ def __repr__(self):
122
122
def create_stream (self ):
123
123
km = self .kernel_manager
124
124
identity = self .session .bsession
125
- for channel in ('shell ' , 'control ' , 'iopub ' , 'stdin' ):
125
+ for channel in ('iopub ' , 'shell ' , 'control ' , 'stdin' ):
126
126
meth = getattr (km , 'connect_' + channel )
127
127
self .channels [channel ] = stream = meth (self .kernel_id , identity = identity )
128
128
stream .channel = channel
129
129
130
+ def nudge (self ):
131
+ """Nudge the zmq connections with kernel_info_requests
132
+ Returns a Future that will resolve when we have received
133
+ a shell reply and at least one iopub message,
134
+ ensuring that zmq subscriptions are established,
135
+ sockets are fully connected, and kernel is responsive.
136
+ Keeps retrying kernel_info_request until these are both received.
137
+ """
138
+ kernel = self .kernel_manager .get_kernel (self .kernel_id )
139
+
140
+ # Do not nudge busy kernels as kernel info requests sent to shell are
141
+ # queued behind execution requests.
142
+ # nudging in this case would cause a potentially very long wait
143
+ # before connections are opened,
144
+ # plus it is *very* unlikely that a busy kernel will not finish
145
+ # establishing its zmq subscriptions before processing the next request.
146
+ if getattr (kernel , "execution_state" ) == "busy" :
147
+ self .log .debug ("Nudge: not nudging busy kernel %s" , self .kernel_id )
148
+ f = Future ()
149
+ f .set_result (None )
150
+ return f
151
+
152
+ # Use a transient shell channel to prevent leaking
153
+ # shell responses to the front-end.
154
+ shell_channel = kernel .connect_shell ()
155
+ # The IOPub used by the client, whose subscriptions we are verifying.
156
+ iopub_channel = self .channels ["iopub" ]
157
+
158
+ info_future = Future ()
159
+ iopub_future = Future ()
160
+ both_done = gen .multi ([info_future , iopub_future ])
161
+
162
+ def finish (_ = None ):
163
+ """Ensure all futures are resolved
164
+ which in turn triggers cleanup
165
+ """
166
+ for f in (info_future , iopub_future ):
167
+ if not f .done ():
168
+ f .set_result (None )
169
+
170
+ def cleanup (_ = None ):
171
+ """Common cleanup"""
172
+ loop .remove_timeout (nudge_handle )
173
+ iopub_channel .stop_on_recv ()
174
+ if not shell_channel .closed ():
175
+ shell_channel .close ()
176
+
177
+ # trigger cleanup when both message futures are resolved
178
+ both_done .add_done_callback (cleanup )
179
+
180
+ def on_shell_reply (msg ):
181
+ self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
182
+ if not info_future .done ():
183
+ self .log .debug ("Nudge: resolving shell future: %s" , self .kernel_id )
184
+ info_future .set_result (None )
185
+
186
+ def on_iopub (msg ):
187
+ self .log .debug ("Nudge: IOPub received: %s" , self .kernel_id )
188
+ if not iopub_future .done ():
189
+ iopub_channel .stop_on_recv ()
190
+ self .log .debug ("Nudge: resolving iopub future: %s" , self .kernel_id )
191
+ iopub_future .set_result (None )
192
+
193
+ iopub_channel .on_recv (on_iopub )
194
+ shell_channel .on_recv (on_shell_reply )
195
+ loop = IOLoop .current ()
196
+
197
+ # Nudge the kernel with kernel info requests until we get an IOPub message
198
+ def nudge (count ):
199
+ count += 1
200
+
201
+ # NOTE: this close check appears to never be True during on_open,
202
+ # even when the peer has closed the connection
203
+ if self .ws_connection is None or self .ws_connection .is_closing ():
204
+ self .log .debug (
205
+ "Nudge: cancelling on closed websocket: %s" , self .kernel_id
206
+ )
207
+ finish ()
208
+ return
209
+
210
+ # check for stopped kernel
211
+ if self .kernel_id not in self .kernel_manager :
212
+ self .log .debug (
213
+ "Nudge: cancelling on stopped kernel: %s" , self .kernel_id
214
+ )
215
+ finish ()
216
+ return
217
+
218
+ # check for closed zmq socket
219
+ if shell_channel .closed ():
220
+ self .log .debug (
221
+ "Nudge: cancelling on closed zmq socket: %s" , self .kernel_id
222
+ )
223
+ finish ()
224
+ return
225
+
226
+ if not both_done .done ():
227
+ log = self .log .warning if count % 10 == 0 else self .log .debug
228
+ log ("Nudge: attempt %s on kernel %s" % (count , self .kernel_id ))
229
+ self .session .send (shell_channel , "kernel_info_request" )
230
+ nonlocal nudge_handle
231
+ nudge_handle = loop .call_later (0.5 , nudge , count )
232
+
233
+ nudge_handle = loop .call_later (0 , nudge , count = 0 )
234
+
235
+ # resolve with a timeout if we get no response
236
+ future = gen .with_timeout (loop .time () + self .kernel_info_timeout , both_done )
237
+ # ensure we have no dangling resources or unresolved Futures in case of timeout
238
+ future .add_done_callback (finish )
239
+ return future
240
+
130
241
def request_kernel_info (self ):
131
242
"""send a request for kernel_info"""
132
243
km = self .kernel_manager
@@ -259,15 +370,23 @@ def open(self, kernel_id):
259
370
if buffer_info and buffer_info ['session_key' ] == self .session_key :
260
371
self .log .info ("Restoring connection for %s" , self .session_key )
261
372
self .channels = buffer_info ['channels' ]
262
- replay_buffer = buffer_info ['buffer' ]
263
- if replay_buffer :
264
- self .log .info ("Replaying %s buffered messages" , len (replay_buffer ))
265
- for channel , msg_list in replay_buffer :
266
- stream = self .channels [channel ]
267
- self ._on_zmq_reply (stream , msg_list )
373
+
374
+ connected = self .nudge ()
375
+
376
+ def replay (value ):
377
+ replay_buffer = buffer_info ['buffer' ]
378
+ if replay_buffer :
379
+ self .log .info ("Replaying %s buffered messages" , len (replay_buffer ))
380
+ for channel , msg_list in replay_buffer :
381
+ stream = self .channels [channel ]
382
+ self ._on_zmq_reply (stream , msg_list )
383
+
384
+
385
+ connected .add_done_callback (replay )
268
386
else :
269
387
try :
270
388
self .create_stream ()
389
+ connected = self .nudge ()
271
390
except web .HTTPError as e :
272
391
self .log .error ("Error opening stream: %s" , e )
273
392
# WebSockets don't response to traditional error codes so we
@@ -281,8 +400,14 @@ def open(self, kernel_id):
281
400
km .add_restart_callback (self .kernel_id , self .on_kernel_restarted )
282
401
km .add_restart_callback (self .kernel_id , self .on_restart_failed , 'dead' )
283
402
284
- for channel , stream in self .channels .items ():
285
- stream .on_recv_stream (self ._on_zmq_reply )
403
+ def subscribe (value ):
404
+ for channel , stream in self .channels .items ():
405
+ stream .on_recv_stream (self ._on_zmq_reply )
406
+
407
+ connected .add_done_callback (subscribe )
408
+
409
+ return connected
410
+
286
411
287
412
def on_message (self , msg ):
288
413
if not self .channels :
0 commit comments