Skip to content

Commit 97966b0

Browse files
authored
Revert "Fix pending kernels again" (#853)
1 parent c33e6b6 commit 97966b0

File tree

4 files changed

+55
-69
lines changed

4 files changed

+55
-69
lines changed

jupyter_client/manager.py

Lines changed: 38 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -57,37 +57,34 @@ class _ShutdownStatus(Enum):
5757
F = t.TypeVar('F', bound=t.Callable[..., t.Any])
5858

5959

60-
def in_pending_state(prefix: str = '') -> t.Callable[[F], F]:
61-
def decorator(method: F) -> F:
62-
"""Sets the kernel to a pending state by
63-
creating a fresh Future for the KernelManager's `ready`
64-
attribute. Once the method is finished, set the Future's results.
65-
"""
66-
67-
@t.no_type_check
68-
@functools.wraps(method)
69-
async def wrapper(self, *args, **kwargs):
70-
# Create a future for the decorated method
71-
name = f"{prefix}_ready"
72-
future = getattr(self, name)
73-
if not future or future.done():
74-
future = self._future_factory()
75-
setattr(self, name, future)
76-
try:
77-
# call wrapped method, await, and set the result or exception.
78-
out = await method(self, *args, **kwargs)
79-
# Add a small sleep to ensure tests can capture the state before done
80-
await asyncio.sleep(0.01)
81-
future.set_result(None)
82-
return out
83-
except Exception as e:
84-
future.set_exception(e)
85-
self.log.exception(future.exception())
86-
raise e
60+
def in_pending_state(method: F) -> F:
61+
"""Sets the kernel to a pending state by
62+
creating a fresh Future for the KernelManager's `ready`
63+
attribute. Once the method is finished, set the Future's results.
64+
"""
8765

88-
return t.cast(F, wrapper)
66+
@t.no_type_check
67+
@functools.wraps(method)
68+
async def wrapper(self, *args, **kwargs):
69+
# Create a future for the decorated method
70+
try:
71+
self._ready = Future()
72+
except RuntimeError:
73+
# No event loop running, use concurrent future
74+
self._ready = CFuture()
75+
try:
76+
# call wrapped method, await, and set the result or exception.
77+
out = await method(self, *args, **kwargs)
78+
# Add a small sleep to ensure tests can capture the state before done
79+
await asyncio.sleep(0.01)
80+
self._ready.set_result(None)
81+
return out
82+
except Exception as e:
83+
self._ready.set_exception(e)
84+
self.log.exception(self._ready.exception())
85+
raise e
8986

90-
return decorator
87+
return t.cast(F, wrapper)
9188

9289

9390
class KernelManager(ConnectionFileMixin):
@@ -117,14 +114,18 @@ def _emit(self, *, action: str) -> None:
117114
data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"},
118115
)
119116

120-
_ready: t.Optional[CFuture]
121-
_shutdown_ready: t.Optional[CFuture]
117+
_ready: t.Union[Future, CFuture]
122118

123119
def __init__(self, *args, **kwargs):
124120
super().__init__(**kwargs)
125121
self._shutdown_status = _ShutdownStatus.Unset
126-
self._ready = None
127-
self._shutdown_ready = None
122+
# Create a place holder future.
123+
try:
124+
asyncio.get_running_loop()
125+
self._ready = Future()
126+
except RuntimeError:
127+
# No event loop running, use concurrent future
128+
self._ready = CFuture()
128129

129130
_created_context: Bool = Bool(False)
130131

@@ -142,8 +143,6 @@ def _context_default(self) -> zmq.Context:
142143
)
143144
client_factory: Type = Type(klass="jupyter_client.KernelClient")
144145

145-
_future_factory: t.Type[CFuture] = CFuture
146-
147146
@default("client_factory") # type:ignore[misc]
148147
def _client_factory_default(self) -> Type:
149148
return import_item(self.client_class)
@@ -209,21 +208,10 @@ def _default_cache_ports(self) -> bool:
209208
return self.transport == "tcp"
210209

211210
@property
212-
def ready(self) -> CFuture:
213-
"""A future that resolves when the kernel process has started."""
214-
if not self._ready:
215-
self._ready = self._future_factory()
216-
assert self._ready is not None
211+
def ready(self) -> t.Union[CFuture, Future]:
212+
"""A future that resolves when the kernel process has started for the first time"""
217213
return self._ready
218214

219-
@property
220-
def shutdown_ready(self) -> CFuture:
221-
"""A future that resolves when the kernel process has shut down."""
222-
if not self._shutdown_ready:
223-
self._shutdown_ready = self._future_factory()
224-
assert self._shutdown_ready is not None
225-
return self._shutdown_ready
226-
227215
@property
228216
def ipykernel(self) -> bool:
229217
return self.kernel_name in {"python", "python2", "python3"}
@@ -407,7 +395,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
407395

408396
post_start_kernel = run_sync(_async_post_start_kernel)
409397

410-
@in_pending_state()
398+
@in_pending_state
411399
async def _async_start_kernel(self, **kw: t.Any) -> None:
412400
"""Starts a kernel on this host in a separate process.
413401
@@ -503,7 +491,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:
503491

504492
cleanup_resources = run_sync(_async_cleanup_resources)
505493

506-
@in_pending_state('_shutdown')
494+
@in_pending_state
507495
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
508496
"""Attempts to stop the kernel process cleanly.
509497
@@ -522,8 +510,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
522510
Will this kernel be restarted after it is shutdown. When this
523511
is True, connection files will not be cleaned up.
524512
"""
525-
# Reset the start ready future.
526-
self._ready = self._future_factory()
527513
self._emit(action="shutdown_started")
528514
self.shutting_down = True # Used by restarter to prevent race condition
529515
# Stop monitoring for restarting while we shutdown.
@@ -696,8 +682,6 @@ class AsyncKernelManager(KernelManager):
696682
# The PyZMQ Context to use for communication with the kernel.
697683
context: Instance = Instance(zmq.asyncio.Context)
698684

699-
_future_factory: t.Type[Future] = Future # type:ignore[assignment]
700-
701685
@default("context") # type:ignore[misc]
702686
def _context_default(self) -> zmq.asyncio.Context:
703687
self._created_context = True

jupyter_client/multikernelmanager.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ async def _async_start_kernel(
221221
self._kernels[kernel_id] = km
222222
else:
223223
await task
224-
await asyncio.wrap_future(km.ready)
224+
# raise an exception if one occurred during kernel startup.
225+
if km.ready.exception():
226+
raise km.ready.exception() # type: ignore
225227

226228
return kernel_id
227229

@@ -251,17 +253,15 @@ async def _async_shutdown_kernel(
251253
try:
252254
await task
253255
km = self.get_kernel(kernel_id)
254-
await asyncio.wrap_future(km.ready)
256+
await t.cast(asyncio.Future, km.ready)
255257
except asyncio.CancelledError:
256258
pass
257259
except Exception:
258260
self.remove_kernel(kernel_id)
259261
return
260262
km = self.get_kernel(kernel_id)
261263
# If a pending kernel raised an exception, remove it.
262-
try:
263-
await asyncio.wrap_future(km.ready)
264-
except Exception:
264+
if not km.ready.cancelled() and km.ready.exception():
265265
self.remove_kernel(kernel_id)
266266
return
267267
stopper = ensure_async(km.shutdown_kernel(now, restart))
@@ -270,7 +270,9 @@ async def _async_shutdown_kernel(
270270
# Await the kernel if not using pending kernels.
271271
if not self._using_pending_kernels():
272272
await fut
273-
await asyncio.wrap_future(km.shutdown_ready)
273+
# raise an exception if one occurred during kernel shutdown.
274+
if km.ready.exception():
275+
raise km.ready.exception() # type: ignore
274276

275277
shutdown_kernel = run_sync(_async_shutdown_kernel)
276278

@@ -313,7 +315,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
313315
if self._using_pending_kernels():
314316
for km in kms:
315317
try:
316-
await km.shutdown_ready
318+
await km.ready
317319
except asyncio.CancelledError:
318320
self._pending_kernels[km.kernel_id].cancel()
319321
except Exception:

tests/test_kernelmanager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,13 @@ def _(*expected_list):
135135

136136

137137
@pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass])
138-
async def async_km(request, config, jp_event_logger):
138+
def async_km(request, config, jp_event_logger):
139139
km = request.param(config=config, event_logger=jp_event_logger)
140140
return km
141141

142142

143143
@pytest.fixture
144-
async def async_km_subclass(config, jp_event_logger):
144+
def async_km_subclass(config, jp_event_logger):
145145
km = AsyncKMSubclass(config=config, event_logger=jp_event_logger)
146146
return km
147147

@@ -489,11 +489,11 @@ async def test_lifecycle(
489489
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
490490
is_alive = await async_km.is_alive()
491491
assert is_alive
492-
await async_km.ready
492+
is_ready = async_km.ready.done()
493+
assert is_ready
493494
await async_km.restart_kernel(now=True)
494495
is_alive = await async_km.is_alive()
495496
assert is_alive
496-
await async_km.ready
497497
await async_km.interrupt_kernel()
498498
assert isinstance(async_km, AsyncKernelManager)
499499
await async_km.shutdown_kernel(now=True)

tests/test_multikernelmanager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ async def test_use_pending_kernels(self):
429429
assert isinstance(k, AsyncKernelManager)
430430
await ensure_future(km.shutdown_kernel(kid, now=True))
431431
# Wait for the kernel to shutdown
432-
await kernel.shutdown_ready
432+
await kernel.ready
433433
assert kid not in km, f"{kid} not in {km}"
434434

435435
@gen_test
@@ -443,7 +443,7 @@ async def test_use_pending_kernels_early_restart(self):
443443
await kernel.ready
444444
await ensure_future(km.shutdown_kernel(kid, now=True))
445445
# Wait for the kernel to shutdown
446-
await kernel.shutdown_ready
446+
await kernel.ready
447447
assert kid not in km, f"{kid} not in {km}"
448448

449449
@gen_test
@@ -455,7 +455,7 @@ async def test_use_pending_kernels_early_shutdown(self):
455455
# Try shutting down while the kernel is pending
456456
await ensure_future(km.shutdown_kernel(kid, now=True))
457457
# Wait for the kernel to shutdown
458-
await kernel.shutdown_ready
458+
await kernel.ready
459459
assert kid not in km, f"{kid} not in {km}"
460460

461461
@gen_test
@@ -470,7 +470,7 @@ async def test_use_pending_kernels_early_interrupt(self):
470470
await kernel.ready
471471
await ensure_future(km.shutdown_kernel(kid, now=True))
472472
# Wait for the kernel to shutdown
473-
await kernel.shutdown_ready
473+
await kernel.ready
474474
assert kid not in km, f"{kid} not in {km}"
475475

476476
@gen_test

0 commit comments

Comments
 (0)