@@ -57,34 +57,37 @@ class _ShutdownStatus(Enum):
57
57
F = t .TypeVar ('F' , bound = t .Callable [..., t .Any ])
58
58
59
59
60
- def in_pending_state (method : F ) -> F :
61
- """Sets the kernel to a pending state by
62
- creating a fresh Future for the KernelManager's `ready`
63
- attribute. Once the method is finished, set the Future's results.
64
- """
60
+ def in_pending_state (prefix : str = '' ) -> t .Callable [[F ], F ]:
61
+ def decorator (method : F ) -> F :
62
+ """Sets the kernel to a pending state by
63
+ creating a fresh Future for the KernelManager's `ready`
64
+ attribute. Once the method is finished, set the Future's results.
65
+ """
65
66
66
- @t .no_type_check
67
- @functools .wraps (method )
68
- async def wrapper (self , * args , ** kwargs ):
69
- # Create a future for the decorated method
70
- try :
71
- self ._ready = Future ()
72
- except RuntimeError :
73
- # No event loop running, use concurrent future
74
- self ._ready = CFuture ()
75
- try :
76
- # call wrapped method, await, and set the result or exception.
77
- out = await method (self , * args , ** kwargs )
78
- # Add a small sleep to ensure tests can capture the state before done
79
- await asyncio .sleep (0.01 )
80
- self ._ready .set_result (None )
81
- return out
82
- except Exception as e :
83
- self ._ready .set_exception (e )
84
- self .log .exception (self ._ready .exception ())
85
- raise e
67
+ @t .no_type_check
68
+ @functools .wraps (method )
69
+ async def wrapper (self , * args , ** kwargs ):
70
+ # Create a future for the decorated method
71
+ name = f"{ prefix } _ready"
72
+ future = getattr (self , name )
73
+ if not future or future .done ():
74
+ future = self ._future_factory ()
75
+ setattr (self , name , future )
76
+ try :
77
+ # call wrapped method, await, and set the result or exception.
78
+ out = await method (self , * args , ** kwargs )
79
+ # Add a small sleep to ensure tests can capture the state before done
80
+ await asyncio .sleep (0.01 )
81
+ future .set_result (None )
82
+ return out
83
+ except Exception as e :
84
+ future .set_exception (e )
85
+ self .log .exception (future .exception ())
86
+ raise e
87
+
88
+ return t .cast (F , wrapper )
86
89
87
- return t . cast ( F , wrapper )
90
+ return decorator
88
91
89
92
90
93
class KernelManager (ConnectionFileMixin ):
@@ -114,18 +117,14 @@ def _emit(self, *, action: str) -> None:
114
117
data = {"action" : action , "kernel_id" : self .kernel_id , "caller" : "kernel_manager" },
115
118
)
116
119
117
- _ready : t .Union [Future , CFuture ]
120
+ _ready : t .Optional [CFuture ]
121
+ _shutdown_ready : t .Optional [CFuture ]
118
122
119
123
def __init__ (self , * args , ** kwargs ):
120
124
super ().__init__ (** kwargs )
121
125
self ._shutdown_status = _ShutdownStatus .Unset
122
- # Create a place holder future.
123
- try :
124
- asyncio .get_running_loop ()
125
- self ._ready = Future ()
126
- except RuntimeError :
127
- # No event loop running, use concurrent future
128
- self ._ready = CFuture ()
126
+ self ._ready = None
127
+ self ._shutdown_ready = None
129
128
130
129
_created_context : Bool = Bool (False )
131
130
@@ -143,6 +142,8 @@ def _context_default(self) -> zmq.Context:
143
142
)
144
143
client_factory : Type = Type (klass = "jupyter_client.KernelClient" )
145
144
145
+ _future_factory : t .Type [CFuture ] = CFuture
146
+
146
147
@default ("client_factory" ) # type:ignore[misc]
147
148
def _client_factory_default (self ) -> Type :
148
149
return import_item (self .client_class )
@@ -208,10 +209,21 @@ def _default_cache_ports(self) -> bool:
208
209
return self .transport == "tcp"
209
210
210
211
@property
211
- def ready (self ) -> t .Union [CFuture , Future ]:
212
- """A future that resolves when the kernel process has started for the first time"""
212
+ def ready (self ) -> CFuture :
213
+ """A future that resolves when the kernel process has started."""
214
+ if not self ._ready :
215
+ self ._ready = self ._future_factory ()
216
+ assert self ._ready is not None
213
217
return self ._ready
214
218
219
+ @property
220
+ def shutdown_ready (self ) -> CFuture :
221
+ """A future that resolves when the kernel process has shut down."""
222
+ if not self ._shutdown_ready :
223
+ self ._shutdown_ready = self ._future_factory ()
224
+ assert self ._shutdown_ready is not None
225
+ return self ._shutdown_ready
226
+
215
227
@property
216
228
def ipykernel (self ) -> bool :
217
229
return self .kernel_name in {"python" , "python2" , "python3" }
@@ -395,7 +407,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
395
407
396
408
post_start_kernel = run_sync (_async_post_start_kernel )
397
409
398
- @in_pending_state
410
+ @in_pending_state ()
399
411
async def _async_start_kernel (self , ** kw : t .Any ) -> None :
400
412
"""Starts a kernel on this host in a separate process.
401
413
@@ -491,7 +503,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:
491
503
492
504
cleanup_resources = run_sync (_async_cleanup_resources )
493
505
494
- @in_pending_state
506
+ @in_pending_state ( '_shutdown' )
495
507
async def _async_shutdown_kernel (self , now : bool = False , restart : bool = False ) -> None :
496
508
"""Attempts to stop the kernel process cleanly.
497
509
@@ -510,6 +522,8 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
510
522
Will this kernel be restarted after it is shutdown. When this
511
523
is True, connection files will not be cleaned up.
512
524
"""
525
+ # Reset the start ready future.
526
+ self ._ready = self ._future_factory ()
513
527
self ._emit (action = "shutdown_started" )
514
528
self .shutting_down = True # Used by restarter to prevent race condition
515
529
# Stop monitoring for restarting while we shutdown.
@@ -682,6 +696,8 @@ class AsyncKernelManager(KernelManager):
682
696
# The PyZMQ Context to use for communication with the kernel.
683
697
context : Instance = Instance (zmq .asyncio .Context )
684
698
699
+ _future_factory : t .Type [Future ] = Future # type:ignore[assignment]
700
+
685
701
@default ("context" ) # type:ignore[misc]
686
702
def _context_default (self ) -> zmq .asyncio .Context :
687
703
self ._created_context = True
0 commit comments