@@ -123,80 +123,123 @@ def __repr__(self):
123123 def create_stream (self ):
124124 km = self .kernel_manager
125125 identity = self .session .bsession
126- for channel in (' shell' , ' control' , 'iopub' , ' stdin' ):
127- meth = getattr (km , ' connect_' + channel )
126+ for channel in ("iopub" , " shell" , " control" , " stdin" ):
127+ meth = getattr (km , " connect_" + channel )
128128 self .channels [channel ] = stream = meth (self .kernel_id , identity = identity )
129129 stream .channel = channel
130130
131- def nudge (self ):
131+ def nudge (self ):
132+ """Nudge the zmq connections with kernel_info_requests
133+
134+ Returns a Future that will resolve when we have received
135+ a shell reply and at least one iopub message,
136+ ensuring that zmq subscriptions are established,
137+ sockets are fully connected, and kernel is responsive.
138+
139+ Keeps retrying kernel_info_request until these are both received.
140+ """
141+ kernel = self .kernel_manager .get_kernel (self .kernel_id )
142+
143+ # Do not nudge busy kernels as kernel info requests sent to shell are
144+ # queued behind execution requests.
145+ # nudging in this case would cause a potentially very long wait
146+ # before connections are opened,
147+ # plus it is *very* unlikely that a busy kernel will not finish
148+ # establishing its zmq subscriptions before processing the next request.
149+ if getattr (kernel , "execution_state" ) == "busy" :
150+ self .log .debug ("Nudge: not nudging busy kernel %s" , self .kernel_id )
151+ f = Future ()
152+ f .set_result (None )
153+ return f
154+
132155 # Use a transient shell channel to prevent leaking
133156 # shell responses to the front-end.
134- kernel = self .kernel_manager .get_kernel (self .kernel_id )
135157 shell_channel = kernel .connect_shell ()
158+ # The IOPub used by the client, whose subscriptions we are verifying.
159+ iopub_channel = self .channels ["iopub" ]
136160
137- # The IOPub used by the client.
138- iopub_channel = self .channels ['iopub' ]
139-
140- future = Future ()
141161 info_future = Future ()
142162 iopub_future = Future ()
163+ both_done = gen .multi ([info_future , iopub_future ])
164+
165+ def finish (f = None ):
166+ """Ensure all futures are resolved
143167
144- def finish ():
168+ which in turn triggers cleanup
169+ """
170+ for f in (info_future , iopub_future ):
171+ if not f .done ():
172+ f .set_result (None )
173+
174+ def cleanup (f = None ):
145175 """Common cleanup"""
146- loop .remove_timeout (timeout )
147176 loop .remove_timeout (nudge_handle )
148177 iopub_channel .stop_on_recv ()
149178 if not shell_channel .closed ():
150179 shell_channel .close ()
151180
181+ # trigger cleanup when both message futures are resolved
182+ both_done .add_done_callback (cleanup )
183+
152184 def on_shell_reply (msg ):
185+ self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
153186 if not info_future .done ():
154- self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
155- if not shell_channel .closed ():
156- shell_channel .close ()
157- self .log .debug ("Nudge: resolving shell future" )
187+ self .log .debug ("Nudge: resolving shell future: %s" , self .kernel_id )
158188 info_future .set_result (None )
159- if iopub_future .done ():
160- finish ()
161- self .log .debug ("Nudge: resolving main future in shell handler" )
162- future .set_result (None )
163189
164190 def on_iopub (msg ):
191+ self .log .debug ("Nudge: IOPub received: %s" , self .kernel_id )
165192 if not iopub_future .done ():
166- self .log .debug ("Nudge: first IOPub received: %s" , self .kernel_id )
167193 iopub_channel .stop_on_recv ()
168- self .log .debug ("Nudge: resolving iopub future" )
194+ self .log .debug ("Nudge: resolving iopub future: %s" , self . kernel_id )
169195 iopub_future .set_result (None )
170- if info_future .done ():
171- finish ()
172- self .log .debug ("Nudge: resolving main future in iopub handler" )
173- future .set_result (None )
174-
175- def on_timeout ():
176- self .log .warning ("Nudge: Timeout waiting for kernel_info_reply: %s" , self .kernel_id )
177- finish ()
178- if not future .done ():
179- future .set_exception (TimeoutError ("Timeout waiting for nudge" ))
180196
181197 iopub_channel .on_recv (on_iopub )
182198 shell_channel .on_recv (on_shell_reply )
183199 loop = IOLoop .current ()
184200
185201 # Nudge the kernel with kernel info requests until we get an IOPub message
186202 def nudge (count ):
187- # Do not nudge busy kernels as kernel info requests sent to shell are
188- # queued behind execution requests.
189- if kernel .execution_state == 'busy' :
190- future .set_result (None )
191- if not future .done ():
203+ count += 1
204+
205+ # NOTE: this close check appears to never be True during on_open,
206+ # even when the peer has closed the connection
207+ if self .ws_connection is None or self .ws_connection .is_closing ():
208+ self .log .debug (
209+ "Nudge: cancelling on closed websocket: %s" , self .kernel_id
210+ )
211+ finish ()
212+ return
213+
214+ # check for stopped kernel
215+ if self .kernel_id not in self .kernel_manager :
216+ self .log .debug (
217+ "Nudge: cancelling on stopped kernel: %s" , self .kernel_id
218+ )
219+ finish ()
220+ return
221+
222+ # check for closed zmq socket
223+ if shell_channel .closed ():
224+ self .log .debug (
225+ "Nudge: cancelling on closed zmq socket: %s" , self .kernel_id
226+ )
227+ finish ()
228+ return
229+
230+ if not both_done .done ():
192231 log = self .log .warning if count % 10 == 0 else self .log .debug
193- log ("Nudging attempt %s on kernel %s" % (count , self .kernel_id ))
232+ log ("Nudge: attempt %s on kernel %s" % (count , self .kernel_id ))
194233 self .session .send (shell_channel , "kernel_info_request" )
195234 nonlocal nudge_handle
196235 nudge_handle = loop .call_later (0.5 , nudge , count )
197236
198237 nudge_handle = loop .call_later (0 , nudge , count = 0 )
199- timeout = loop .add_timeout (loop .time () + self .kernel_info_timeout , on_timeout )
238+
239+ # resolve with a timeout if we get no response
240+ future = gen .with_timeout (loop .time () + self .kernel_info_timeout , both_done )
241+ # ensure we have no dangling resources or unresolved Futures in case of timeout
242+ future .add_done_callback (finish )
200243 return future
201244
202245 def request_kernel_info (self ):
@@ -221,7 +264,7 @@ def request_kernel_info(self):
221264 self .log .debug ("Waiting for pending kernel_info request" )
222265 future .add_done_callback (lambda f : self ._finish_kernel_info (f .result ()))
223266 return self ._kernel_info_future
224-
267+
225268 def _handle_kernel_info_reply (self , msg ):
226269 """process the kernel_info_reply
227270
0 commit comments