Skip to content

Commit ae998d3

Browse files
committed
fix pending kernels
1 parent 5874148 commit ae998d3

File tree

4 files changed

+67
-55
lines changed

4 files changed

+67
-55
lines changed

jupyter_client/manager.py

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,34 +57,37 @@ class _ShutdownStatus(Enum):
5757
F = t.TypeVar('F', bound=t.Callable[..., t.Any])
5858

5959

60-
def in_pending_state(method: F) -> F:
61-
"""Sets the kernel to a pending state by
62-
creating a fresh Future for the KernelManager's `ready`
63-
attribute. Once the method is finished, set the Future's results.
64-
"""
60+
def in_pending_state(prefix=''):
61+
def decorator(method: F) -> F:
62+
"""Sets the kernel to a pending state by
63+
creating a fresh Future for the KernelManager's `ready`
64+
attribute. Once the method is finished, set the Future's results.
65+
"""
6566

66-
@t.no_type_check
67-
@functools.wraps(method)
68-
async def wrapper(self, *args, **kwargs):
69-
# Create a future for the decorated method
70-
try:
71-
self._ready = Future()
72-
except RuntimeError:
73-
# No event loop running, use concurrent future
74-
self._ready = CFuture()
75-
try:
76-
# call wrapped method, await, and set the result or exception.
77-
out = await method(self, *args, **kwargs)
78-
# Add a small sleep to ensure tests can capture the state before done
79-
await asyncio.sleep(0.01)
80-
self._ready.set_result(None)
81-
return out
82-
except Exception as e:
83-
self._ready.set_exception(e)
84-
self.log.exception(self._ready.exception())
85-
raise e
67+
@t.no_type_check
68+
@functools.wraps(method)
69+
async def wrapper(self, *args, **kwargs):
70+
# Create a future for the decorated method
71+
name = f"{prefix}_ready"
72+
future = getattr(self, name)
73+
if not future or future.done():
74+
future = self._future_factory()
75+
setattr(self, name, future)
76+
try:
77+
# call wrapped method, await, and set the result or exception.
78+
out = await method(self, *args, **kwargs)
79+
# Add a small sleep to ensure tests can capture the state before done
80+
await asyncio.sleep(0.01)
81+
future.set_result(None)
82+
return out
83+
except Exception as e:
84+
future.set_exception(e)
85+
self.log.exception(future.exception())
86+
raise e
87+
88+
return t.cast(F, wrapper)
8689

87-
return t.cast(F, wrapper)
90+
return decorator
8891

8992

9093
class KernelManager(ConnectionFileMixin):
@@ -114,18 +117,14 @@ def _emit(self, *, action: str) -> None:
114117
data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"},
115118
)
116119

117-
_ready: t.Union[Future, CFuture]
120+
_ready: CFuture
121+
_shutdown_ready: CFuture
118122

119123
def __init__(self, *args, **kwargs):
120124
super().__init__(**kwargs)
121125
self._shutdown_status = _ShutdownStatus.Unset
122-
# Create a place holder future.
123-
try:
124-
asyncio.get_running_loop()
125-
self._ready = Future()
126-
except RuntimeError:
127-
# No event loop running, use concurrent future
128-
self._ready = CFuture()
126+
self._ready = None
127+
self._shutdown_ready = None
129128

130129
_created_context: Bool = Bool(False)
131130

@@ -143,6 +142,8 @@ def _context_default(self) -> zmq.Context:
143142
)
144143
client_factory: Type = Type(klass="jupyter_client.KernelClient")
145144

145+
_future_factory = Future
146+
146147
@default("client_factory") # type:ignore[misc]
147148
def _client_factory_default(self) -> Type:
148149
return import_item(self.client_class)
@@ -208,10 +209,19 @@ def _default_cache_ports(self) -> bool:
208209
return self.transport == "tcp"
209210

210211
@property
211-
def ready(self) -> t.Union[CFuture, Future]:
212-
"""A future that resolves when the kernel process has started for the first time"""
212+
def ready(self) -> CFuture:
213+
"""A future that resolves when the kernel process has started."""
214+
if not self._ready:
215+
self._ready = self._future_factory()
213216
return self._ready
214217

218+
@property
219+
def shutdown_ready(self) -> CFuture:
220+
"""A future that resolves when the kernel process has shut down."""
221+
if not self._shutdown_ready:
222+
self._shutdown_ready = self._future_factory()
223+
return self._shutdown_ready
224+
215225
@property
216226
def ipykernel(self) -> bool:
217227
return self.kernel_name in {"python", "python2", "python3"}
@@ -395,7 +405,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
395405

396406
post_start_kernel = run_sync(_async_post_start_kernel)
397407

398-
@in_pending_state
408+
@in_pending_state()
399409
async def _async_start_kernel(self, **kw: t.Any) -> None:
400410
"""Starts a kernel on this host in a separate process.
401411
@@ -491,7 +501,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:
491501

492502
cleanup_resources = run_sync(_async_cleanup_resources)
493503

494-
@in_pending_state
504+
@in_pending_state('_shutdown')
495505
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
496506
"""Attempts to stop the kernel process cleanly.
497507
@@ -510,6 +520,8 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
510520
Will this kernel be restarted after it is shutdown. When this
511521
is True, connection files will not be cleaned up.
512522
"""
523+
# Reset the start ready future.
524+
self._ready = self._future_factory()
513525
self._emit(action="shutdown_started")
514526
self.shutting_down = True # Used by restarter to prevent race condition
515527
# Stop monitoring for restarting while we shutdown.
@@ -682,6 +694,8 @@ class AsyncKernelManager(KernelManager):
682694
# The PyZMQ Context to use for communication with the kernel.
683695
context: Instance = Instance(zmq.asyncio.Context)
684696

697+
_future_factory = Future
698+
685699
@default("context") # type:ignore[misc]
686700
def _context_default(self) -> zmq.asyncio.Context:
687701
self._created_context = True

jupyter_client/multikernelmanager.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,7 @@ async def _async_start_kernel(
221221
self._kernels[kernel_id] = km
222222
else:
223223
await task
224-
# raise an exception if one occurred during kernel startup.
225-
if km.ready.exception():
226-
raise km.ready.exception() # type: ignore
224+
await asyncio.wrap_future(km.ready)
227225

228226
return kernel_id
229227

@@ -253,15 +251,17 @@ async def _async_shutdown_kernel(
253251
try:
254252
await task
255253
km = self.get_kernel(kernel_id)
256-
await t.cast(asyncio.Future, km.ready)
254+
await asyncio.wrap_future(km.ready)
257255
except asyncio.CancelledError:
258256
pass
259257
except Exception:
260258
self.remove_kernel(kernel_id)
261259
return
262260
km = self.get_kernel(kernel_id)
263261
# If a pending kernel raised an exception, remove it.
264-
if not km.ready.cancelled() and km.ready.exception():
262+
try:
263+
await asyncio.wrap_future(km.ready)
264+
except Exception:
265265
self.remove_kernel(kernel_id)
266266
return
267267
stopper = ensure_async(km.shutdown_kernel(now, restart))
@@ -270,9 +270,7 @@ async def _async_shutdown_kernel(
270270
# Await the kernel if not using pending kernels.
271271
if not self._using_pending_kernels():
272272
await fut
273-
# raise an exception if one occurred during kernel shutdown.
274-
if km.ready.exception():
275-
raise km.ready.exception() # type: ignore
273+
await asyncio.wrap_future(km.shutdown_ready)
276274

277275
shutdown_kernel = run_sync(_async_shutdown_kernel)
278276

@@ -315,7 +313,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
315313
if self._using_pending_kernels():
316314
for km in kms:
317315
try:
318-
await km.ready
316+
await km.shutdown_ready
319317
except asyncio.CancelledError:
320318
self._pending_kernels[km.kernel_id].cancel()
321319
except Exception:

tests/test_kernelmanager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,13 @@ def _(*expected_list):
135135

136136

137137
@pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass])
138-
def async_km(request, config, jp_event_logger):
138+
async def async_km(request, config, jp_event_logger):
139139
km = request.param(config=config, event_logger=jp_event_logger)
140140
return km
141141

142142

143143
@pytest.fixture
144-
def async_km_subclass(config, jp_event_logger):
144+
async def async_km_subclass(config, jp_event_logger):
145145
km = AsyncKMSubclass(config=config, event_logger=jp_event_logger)
146146
return km
147147

@@ -489,11 +489,11 @@ async def test_lifecycle(
489489
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
490490
is_alive = await async_km.is_alive()
491491
assert is_alive
492-
is_ready = async_km.ready.done()
493-
assert is_ready
492+
await async_km.ready
494493
await async_km.restart_kernel(now=True)
495494
is_alive = await async_km.is_alive()
496495
assert is_alive
496+
await async_km.ready
497497
await async_km.interrupt_kernel()
498498
assert isinstance(async_km, AsyncKernelManager)
499499
await async_km.shutdown_kernel(now=True)

tests/test_multikernelmanager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ async def test_use_pending_kernels(self):
429429
assert isinstance(k, AsyncKernelManager)
430430
await ensure_future(km.shutdown_kernel(kid, now=True))
431431
# Wait for the kernel to shutdown
432-
await kernel.ready
432+
await kernel.shutdown_ready
433433
assert kid not in km, f"{kid} not in {km}"
434434

435435
@gen_test
@@ -443,7 +443,7 @@ async def test_use_pending_kernels_early_restart(self):
443443
await kernel.ready
444444
await ensure_future(km.shutdown_kernel(kid, now=True))
445445
# Wait for the kernel to shutdown
446-
await kernel.ready
446+
await kernel.shutdown_ready
447447
assert kid not in km, f"{kid} not in {km}"
448448

449449
@gen_test
@@ -455,7 +455,7 @@ async def test_use_pending_kernels_early_shutdown(self):
455455
# Try shutting down while the kernel is pending
456456
await ensure_future(km.shutdown_kernel(kid, now=True))
457457
# Wait for the kernel to shutdown
458-
await kernel.ready
458+
await kernel.shutdown_ready
459459
assert kid not in km, f"{kid} not in {km}"
460460

461461
@gen_test
@@ -470,7 +470,7 @@ async def test_use_pending_kernels_early_interrupt(self):
470470
await kernel.ready
471471
await ensure_future(km.shutdown_kernel(kid, now=True))
472472
# Wait for the kernel to shutdown
473-
await kernel.ready
473+
await kernel.shutdown_ready
474474
assert kid not in km, f"{kid} not in {km}"
475475

476476
@gen_test

0 commit comments

Comments
 (0)