@@ -97,7 +97,12 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager:
97
97
98
98
context = Instance ("zmq.Context" )
99
99
100
- _starting_kernels = Dict ()
100
+ _pending_kernels = Dict ()
101
+
102
+ @property
103
+ def _starting_kernels (self ):
104
+ """A shim for backwards compatibility."""
105
+ return self ._pending_kernels
101
106
102
107
@default ("context" )
103
108
def _context_default (self ) -> zmq .Context :
@@ -165,7 +170,22 @@ async def _add_kernel_when_ready(
165
170
await kernel_awaitable
166
171
self ._kernels [kernel_id ] = km
167
172
finally :
168
- self ._starting_kernels .pop (kernel_id , None )
173
+ self ._pending_kernels .pop (kernel_id , None )
174
+
175
+ async def _remove_kernel_when_ready (
176
+ self , kernel_id : str , kernel_awaitable : t .Awaitable
177
+ ) -> None :
178
+ try :
179
+ await kernel_awaitable
180
+ self .remove_kernel (kernel_id )
181
+ finally :
182
+ self ._pending_kernels .pop (kernel_id , None )
183
+
184
+ def _using_pending_kernels (self ):
185
+ """Returns a boolean; a clearer method for determining if
186
+ this multikernelmanager is using pending kernels or not
187
+ """
188
+ return getattr (self , 'use_pending_kernels' , False )
169
189
170
190
async def _async_start_kernel (self , kernel_name : t .Optional [str ] = None , ** kwargs ) -> str :
171
191
"""Start a new kernel.
@@ -186,12 +206,18 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
186
206
187
207
starter = ensure_async (km .start_kernel (** kwargs ))
188
208
fut = asyncio .ensure_future (self ._add_kernel_when_ready (kernel_id , km , starter ))
189
- self ._starting_kernels [kernel_id ] = fut
209
+ self ._pending_kernels [kernel_id ] = fut
190
210
191
- if getattr (self , 'use_pending_kernels' , False ):
211
+ # Handling a Pending Kernel
212
+ if self ._using_pending_kernels ():
213
+ # If using pending kernels, do not block
214
+ # on the kernel start.
192
215
self ._kernels [kernel_id ] = km
193
216
else :
194
217
await fut
218
+ # raise an exception if one occurred during kernel startup.
219
+ if km .ready .exception ():
220
+ raise km .ready .exception ()
195
221
196
222
return kernel_id
197
223
@@ -215,23 +241,28 @@ async def _async_shutdown_kernel(
215
241
Will the kernel be restarted?
216
242
"""
217
243
kernel = self .get_kernel (kernel_id )
218
- if getattr ( self , 'use_pending_kernels' , False ):
244
+ if self . _using_pending_kernels ( ):
219
245
# Make sure the previous kernel started before trying to shutdown.
220
246
if not kernel .ready .done ():
221
247
raise RuntimeError ("Kernel is in a pending state. Cannot shutdown." )
222
248
# If the kernel didn't start properly, no need to shutdown.
223
249
elif kernel .ready .exception ():
250
+ self .remove_kernel (kernel_id )
224
251
return
225
252
self .log .info ("Kernel shutdown: %s" % kernel_id )
226
- if kernel_id in self ._starting_kernels :
253
+ if kernel_id in self ._pending_kernels :
227
254
try :
228
- await self ._starting_kernels [kernel_id ]
255
+ await self ._pending_kernels [kernel_id ]
229
256
except Exception :
230
257
self .remove_kernel (kernel_id )
231
258
return
232
259
km = self .get_kernel (kernel_id )
233
- await ensure_async (km .shutdown_kernel (now , restart ))
234
- self .remove_kernel (kernel_id )
260
+ stopper = ensure_async (km .shutdown_kernel (now , restart ))
261
+ fut = asyncio .ensure_future (self ._remove_kernel_when_ready (kernel_id , stopper ))
262
+ self ._pending_kernels [kernel_id ] = fut
263
+ # Await the kernel if not using pending kernels.
264
+ if not self ._using_pending_kernels ():
265
+ await fut
235
266
236
267
shutdown_kernel = run_sync (_async_shutdown_kernel )
237
268
@@ -266,13 +297,13 @@ def remove_kernel(self, kernel_id: str) -> KernelManager:
266
297
async def _async_shutdown_all (self , now : bool = False ) -> None :
267
298
"""Shutdown all kernels."""
268
299
kids = self .list_kernel_ids ()
269
- kids += list (self ._starting_kernels )
300
+ kids += list (self ._pending_kernels )
270
301
futs = [ensure_async (self .shutdown_kernel (kid , now = now )) for kid in set (kids )]
271
302
await asyncio .gather (* futs )
272
303
273
304
shutdown_all = run_sync (_async_shutdown_all )
274
305
275
- @kernel_method
306
+ # @kernel_method
276
307
def interrupt_kernel (self , kernel_id : str ) -> None :
277
308
"""Interrupt (SIGINT) the kernel by its uuid.
278
309
@@ -281,7 +312,12 @@ def interrupt_kernel(self, kernel_id: str) -> None:
281
312
kernel_id : uuid
282
313
The id of the kernel to interrupt.
283
314
"""
315
+ kernel = self .get_kernel (kernel_id )
316
+ if not kernel .ready .done ():
317
+ raise RuntimeError ("Kernel is in a pending state. Cannot interrupt." )
318
+ out = kernel .interrupt_kernel ()
284
319
self .log .info ("Kernel interrupted: %s" % kernel_id )
320
+ return out
285
321
286
322
@kernel_method
287
323
def signal_kernel (self , kernel_id : str , signum : int ) -> None :
@@ -315,7 +351,7 @@ async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None
315
351
it is given a chance to perform a clean shutdown or not.
316
352
"""
317
353
kernel = self .get_kernel (kernel_id )
318
- if getattr ( self , 'use_pending_kernels' , False ):
354
+ if self . _using_pending_kernels ( ):
319
355
if not kernel .ready .done ():
320
356
raise RuntimeError ("Kernel is in a pending state. Cannot restart." )
321
357
out = await ensure_async (kernel .restart_kernel (now = now ))
0 commit comments