6
6
import socket
7
7
import typing as t
8
8
import uuid
9
+ import weakref
9
10
10
11
import zmq
11
12
from traitlets import Any
@@ -95,8 +96,6 @@ def create_kernel_manager(*args: Any, **kwargs: Any) -> KernelManager:
95
96
help = "Share a single zmq.Context to talk to all my kernels" ,
96
97
).tag (config = True )
97
98
98
- _created_context = Bool (False )
99
-
100
99
context = Instance ("zmq.Context" )
101
100
102
101
_pending_kernels = Dict ()
@@ -108,20 +107,18 @@ def _starting_kernels(self):
108
107
109
108
@default ("context" ) # type:ignore[misc]
110
109
def _context_default (self ) -> zmq .Context :
111
- self ._created_context = True
112
- return zmq .Context ()
113
-
114
- def __del__ (self ):
115
- if self ._created_context and self .context and not self .context .closed :
116
- if self .log :
117
- self .log .debug ("Destroying zmq context for %s" , self )
118
- self .context .destroy ()
119
- try :
120
- super_del = super ().__del__
121
- except AttributeError :
122
- pass
123
- else :
124
- super_del ()
110
+ context = zmq .Context ()
111
+ # Use a finalizer to destroy the context.
112
+ # self._finalizer = weakref.finalize(self, context.destroy)
113
+
114
+ raise ValueError ('Notes here' )
115
+ """
116
+ The finalizer hangs because the context can't be destroyed.
117
+ There are two other things that are probably contributing:
118
+ - There is an open kernel subprocess at shutdown that is raising a warning on __del__
119
+ - We are not properly canceling the _add_kernel_when_ready task
120
+ """
121
+ return context
125
122
126
123
connection_dir = Unicode ("" )
127
124
@@ -209,15 +206,15 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
209
206
kwargs ['kernel_id' ] = kernel_id # Make kernel_id available to manager and provisioner
210
207
211
208
starter = ensure_async (km .start_kernel (** kwargs ))
212
- fut = asyncio .ensure_future (self ._add_kernel_when_ready (kernel_id , km , starter ))
213
- self ._pending_kernels [kernel_id ] = fut
209
+ task = asyncio .create_task (self ._add_kernel_when_ready (kernel_id , km , starter ))
210
+ self ._pending_kernels [kernel_id ] = task
214
211
# Handling a Pending Kernel
215
212
if self ._using_pending_kernels ():
216
213
# If using pending kernels, do not block
217
214
# on the kernel start.
218
215
self ._kernels [kernel_id ] = km
219
216
else :
220
- await fut
217
+ await task
221
218
# raise an exception if one occurred during kernel startup.
222
219
if km .ready .exception ():
223
220
raise km .ready .exception () # type: ignore
@@ -246,9 +243,9 @@ async def _async_shutdown_kernel(
246
243
self .log .info ("Kernel shutdown: %s" % kernel_id )
247
244
# If the kernel is still starting, wait for it to be ready.
248
245
if kernel_id in self ._pending_kernels :
249
- kernel = self ._pending_kernels [kernel_id ]
246
+ task = self ._pending_kernels [kernel_id ]
250
247
try :
251
- await kernel
248
+ await task
252
249
km = self .get_kernel (kernel_id )
253
250
await t .cast (asyncio .Future , km .ready )
254
251
except Exception :
@@ -311,7 +308,9 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
311
308
for km in kms :
312
309
try :
313
310
await km .ready
314
- except Exception :
311
+ except asyncio .exceptions .CancelledError :
312
+ self ._pending_kernels [km .kernel_id ].cancel ()
313
+ except Exception as e :
315
314
# Will have been logged in _add_kernel_when_ready
316
315
pass
317
316
0 commit comments