@@ -131,13 +131,27 @@ def create_stream(self):
131
131
def nudge (self ):
132
132
"""Nudge the zmq connections with kernel_info_requests
133
133
Returns a Future that will resolve when we have received
134
- a control reply and at least one iopub message,
134
+ a shell or control reply and at least one iopub message,
135
135
ensuring that zmq subscriptions are established,
136
136
sockets are fully connected, and kernel is responsive.
137
137
Keeps retrying kernel_info_request until these are both received.
138
138
"""
139
139
kernel = self .kernel_manager .get_kernel (self .kernel_id )
140
140
141
+ # Do not nudge busy kernels as kernel info requests sent to shell are
142
+ # queued behind execution requests.
143
+ # nudging in this case would cause a potentially very long wait
144
+ # before connections are opened,
145
+ # plus it is *very* unlikely that a busy kernel will not finish
146
+ # establishing its zmq subscriptions before processing the next request.
147
+ if getattr (kernel , "execution_state" ) == "busy" :
148
+ self .log .debug ("Nudge: not nudging busy kernel %s" , self .kernel_id )
149
+ f = Future ()
150
+ f .set_result (None )
151
+ return f
152
+ # Use a transient shell channel to prevent leaking
153
+ # shell responses to the front-end.
154
+ shell_channel = kernel .connect_shell ()
141
155
# Use a transient control channel to prevent leaking
142
156
# control responses to the front-end.
143
157
control_channel = kernel .connect_control ()
@@ -160,18 +174,26 @@ def cleanup(_=None):
160
174
"""Common cleanup"""
161
175
loop .remove_timeout (nudge_handle )
162
176
iopub_channel .stop_on_recv ()
177
+ if not shell_channel .closed ():
178
+ shell_channel .close ()
163
179
if not control_channel .closed ():
164
180
control_channel .close ()
165
181
166
182
# trigger cleanup when both message futures are resolved
167
183
both_done .add_done_callback (cleanup )
168
184
169
- def on_control_reply (msg ):
185
+ def on_shell_reply (msg ):
170
186
self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
171
187
if not info_future .done ():
172
188
self .log .debug ("Nudge: resolving shell future: %s" , self .kernel_id )
173
189
info_future .set_result (None )
174
190
191
+ def on_control_reply (msg ):
192
+ self .log .debug ("Nudge: control info reply received: %s" , self .kernel_id )
193
+ if not info_future .done ():
194
+ self .log .debug ("Nudge: resolving control future: %s" , self .kernel_id )
195
+ info_future .set_result (None )
196
+
175
197
def on_iopub (msg ):
176
198
self .log .debug ("Nudge: IOPub received: %s" , self .kernel_id )
177
199
if not iopub_future .done ():
@@ -180,6 +202,7 @@ def on_iopub(msg):
180
202
iopub_future .set_result (None )
181
203
182
204
iopub_channel .on_recv (on_iopub )
205
+ shell_channel .on_recv (on_shell_reply )
183
206
control_channel .on_recv (on_control_reply )
184
207
loop = IOLoop .current ()
185
208
@@ -200,6 +223,12 @@ def nudge(count):
200
223
finish ()
201
224
return
202
225
226
+ # check for closed zmq socket
227
+ if shell_channel .closed ():
228
+ self .log .debug ("Nudge: cancelling on closed zmq socket: %s" , self .kernel_id )
229
+ finish ()
230
+ return
231
+
203
232
# check for closed zmq socket
204
233
if control_channel .closed ():
205
234
self .log .debug ("Nudge: cancelling on closed zmq socket: %s" , self .kernel_id )
@@ -209,6 +238,7 @@ def nudge(count):
209
238
if not both_done .done ():
210
239
log = self .log .warning if count % 10 == 0 else self .log .debug
211
240
log ("Nudge: attempt %s on kernel %s" % (count , self .kernel_id ))
241
+ self .session .send (shell_channel , "kernel_info_request" )
212
242
self .session .send (control_channel , "kernel_info_request" )
213
243
nonlocal nudge_handle
214
244
nudge_handle = loop .call_later (0.5 , nudge , count )
0 commit comments