@@ -123,11 +123,125 @@ def __repr__(self):
123123 def create_stream (self ):
124124 km = self .kernel_manager
125125 identity = self .session .bsession
126- for channel in (' shell' , ' control' , 'iopub' , ' stdin' ):
127- meth = getattr (km , ' connect_' + channel )
126+ for channel in ("iopub" , " shell" , " control" , " stdin" ):
127+ meth = getattr (km , " connect_" + channel )
128128 self .channels [channel ] = stream = meth (self .kernel_id , identity = identity )
129129 stream .channel = channel
130130
131+ def nudge (self ):
132+ """Nudge the zmq connections with kernel_info_requests
133+
134+ Returns a Future that will resolve when we have received
135+ a shell reply and at least one iopub message,
136+ ensuring that zmq subscriptions are established,
137+ sockets are fully connected, and kernel is responsive.
138+
139+ Keeps retrying kernel_info_request until these are both received.
140+ """
141+ kernel = self .kernel_manager .get_kernel (self .kernel_id )
142+
143+ # Do not nudge busy kernels as kernel info requests sent to shell are
144+ # queued behind execution requests.
145+ # nudging in this case would cause a potentially very long wait
146+ # before connections are opened,
147+ # plus it is *very* unlikely that a busy kernel will not finish
148+ # establishing its zmq subscriptions before processing the next request.
149+ if getattr (kernel , "execution_state" ) == "busy" :
150+ self .log .debug ("Nudge: not nudging busy kernel %s" , self .kernel_id )
151+ f = Future ()
152+ f .set_result (None )
153+ return f
154+
155+ # Use a transient shell channel to prevent leaking
156+ # shell responses to the front-end.
157+ shell_channel = kernel .connect_shell ()
158+ # The IOPub used by the client, whose subscriptions we are verifying.
159+ iopub_channel = self .channels ["iopub" ]
160+
161+ info_future = Future ()
162+ iopub_future = Future ()
163+ both_done = gen .multi ([info_future , iopub_future ])
164+
165+ def finish (f = None ):
166+ """Ensure all futures are resolved
167+
168+ which in turn triggers cleanup
169+ """
170+ for f in (info_future , iopub_future ):
171+ if not f .done ():
172+ f .set_result (None )
173+
174+ def cleanup (f = None ):
175+ """Common cleanup"""
176+ loop .remove_timeout (nudge_handle )
177+ iopub_channel .stop_on_recv ()
178+ if not shell_channel .closed ():
179+ shell_channel .close ()
180+
181+ # trigger cleanup when both message futures are resolved
182+ both_done .add_done_callback (cleanup )
183+
184+ def on_shell_reply (msg ):
185+ self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
186+ if not info_future .done ():
187+ self .log .debug ("Nudge: resolving shell future: %s" , self .kernel_id )
188+ info_future .set_result (None )
189+
190+ def on_iopub (msg ):
191+ self .log .debug ("Nudge: IOPub received: %s" , self .kernel_id )
192+ if not iopub_future .done ():
193+ iopub_channel .stop_on_recv ()
194+ self .log .debug ("Nudge: resolving iopub future: %s" , self .kernel_id )
195+ iopub_future .set_result (None )
196+
197+ iopub_channel .on_recv (on_iopub )
198+ shell_channel .on_recv (on_shell_reply )
199+ loop = IOLoop .current ()
200+
201+ # Nudge the kernel with kernel info requests until we get an IOPub message
202+ def nudge (count ):
203+ count += 1
204+
205+ # NOTE: this close check appears to never be True during on_open,
206+ # even when the peer has closed the connection
207+ if self .ws_connection is None or self .ws_connection .is_closing ():
208+ self .log .debug (
209+ "Nudge: cancelling on closed websocket: %s" , self .kernel_id
210+ )
211+ finish ()
212+ return
213+
214+ # check for stopped kernel
215+ if self .kernel_id not in self .kernel_manager :
216+ self .log .debug (
217+ "Nudge: cancelling on stopped kernel: %s" , self .kernel_id
218+ )
219+ finish ()
220+ return
221+
222+ # check for closed zmq socket
223+ if shell_channel .closed ():
224+ self .log .debug (
225+ "Nudge: cancelling on closed zmq socket: %s" , self .kernel_id
226+ )
227+ finish ()
228+ return
229+
230+ if not both_done .done ():
231+ log = self .log .warning if count % 10 == 0 else self .log .debug
232+ log ("Nudge: attempt %s on kernel %s" % (count , self .kernel_id ))
233+ self .session .send (shell_channel , "kernel_info_request" )
234+ nonlocal nudge_handle
235+ nudge_handle = loop .call_later (0.5 , nudge , count )
236+
237+ nudge_handle = loop .call_later (0 , nudge , count = 0 )
238+
239+ # resolve with a timeout if we get no response
240+ future = gen .with_timeout (loop .time () + self .kernel_info_timeout , both_done )
241+ # ensure we have no dangling resources or unresolved Futures in case of timeout
242+ future .add_done_callback (finish )
243+ return future
244+
131245 def request_kernel_info (self ):
132246 """send a request for kernel_info"""
133247 km = self .kernel_manager
@@ -150,7 +264,7 @@ def request_kernel_info(self):
150264 self .log .debug ("Waiting for pending kernel_info request" )
151265 future .add_done_callback (lambda f : self ._finish_kernel_info (f .result ()))
152266 return self ._kernel_info_future
153-
267+
154268 def _handle_kernel_info_reply (self , msg ):
155269 """process the kernel_info_reply
156270
@@ -263,15 +377,21 @@ def open(self, kernel_id):
263377 if buffer_info and buffer_info ['session_key' ] == self .session_key :
264378 self .log .info ("Restoring connection for %s" , self .session_key )
265379 self .channels = buffer_info ['channels' ]
266- replay_buffer = buffer_info ['buffer' ]
267- if replay_buffer :
268- self .log .info ("Replaying %s buffered messages" , len (replay_buffer ))
269- for channel , msg_list in replay_buffer :
270- stream = self .channels [channel ]
271- self ._on_zmq_reply (stream , msg_list )
380+ connected = self .nudge ()
381+
382+ def replay (value ):
383+ replay_buffer = buffer_info ['buffer' ]
384+ if replay_buffer :
385+ self .log .info ("Replaying %s buffered messages" , len (replay_buffer ))
386+ for channel , msg_list in replay_buffer :
387+ stream = self .channels [channel ]
388+ self ._on_zmq_reply (stream , msg_list )
389+
390+ connected .add_done_callback (replay )
272391 else :
273392 try :
274393 self .create_stream ()
394+ connected = self .nudge ()
275395 except web .HTTPError as e :
276396 self .log .error ("Error opening stream: %s" , e )
277397 # WebSockets don't response to traditional error codes so we
@@ -285,8 +405,13 @@ def open(self, kernel_id):
285405 km .add_restart_callback (self .kernel_id , self .on_kernel_restarted )
286406 km .add_restart_callback (self .kernel_id , self .on_restart_failed , 'dead' )
287407
288- for channel , stream in self .channels .items ():
289- stream .on_recv_stream (self ._on_zmq_reply )
408+ def subscribe (value ):
409+ for channel , stream in self .channels .items ():
410+ stream .on_recv_stream (self ._on_zmq_reply )
411+
412+ connected .add_done_callback (subscribe )
413+
414+ return connected
290415
291416 def on_message (self , msg ):
292417 if not self .channels :
0 commit comments