Skip to content

Commit 0d87835

Browse files
authored
Revert "Backport PR #845: Fix pending kernels again" (#851)
1 parent 66f5114 commit 0d87835

File tree

5 files changed

+57
-70
lines changed

5 files changed

+57
-70
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ jobs:
130130
pip check
131131
- name: Run the tests
132132
run: |
133-
pytest -vv -W default jupyter_client || pytest -vv -W default jupyter_client --lf
133+
pytest -vv jupyter_client || pytest -vv jupyter_client --lf
134134
135135
make_sdist:
136136
name: Make SDist

jupyter_client/manager.py

Lines changed: 39 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -55,37 +55,34 @@ class _ShutdownStatus(Enum):
5555
F = t.TypeVar('F', bound=t.Callable[..., t.Any])
5656

5757

58-
def in_pending_state(prefix: str = '') -> t.Callable[[F], F]:
59-
def decorator(method: F) -> F:
60-
"""Sets the kernel to a pending state by
61-
creating a fresh Future for the KernelManager's `ready`
62-
attribute. Once the method is finished, set the Future's results.
63-
"""
64-
65-
@t.no_type_check
66-
@functools.wraps(method)
67-
async def wrapper(self, *args, **kwargs):
68-
# Create a future for the decorated method
69-
name = f"{prefix}_ready"
70-
future = getattr(self, name)
71-
if not future or future.done():
72-
future = self._future_factory()
73-
setattr(self, name, future)
74-
try:
75-
# call wrapped method, await, and set the result or exception.
76-
out = await method(self, *args, **kwargs)
77-
# Add a small sleep to ensure tests can capture the state before done
78-
await asyncio.sleep(0.01)
79-
future.set_result(None)
80-
return out
81-
except Exception as e:
82-
future.set_exception(e)
83-
self.log.exception(future.exception())
84-
raise e
58+
def in_pending_state(method: F) -> F:
59+
"""Sets the kernel to a pending state by
60+
creating a fresh Future for the KernelManager's `ready`
61+
attribute. Once the method is finished, set the Future's results.
62+
"""
8563

86-
return t.cast(F, wrapper)
64+
@t.no_type_check
65+
@functools.wraps(method)
66+
async def wrapper(self, *args, **kwargs):
67+
# Create a future for the decorated method
68+
try:
69+
self._ready = Future()
70+
except RuntimeError:
71+
# No event loop running, use concurrent future
72+
self._ready = CFuture()
73+
try:
74+
# call wrapped method, await, and set the result or exception.
75+
out = await method(self, *args, **kwargs)
76+
# Add a small sleep to ensure tests can capture the state before done
77+
await asyncio.sleep(0.01)
78+
self._ready.set_result(None)
79+
return out
80+
except Exception as e:
81+
self._ready.set_exception(e)
82+
self.log.exception(self._ready.exception())
83+
raise e
8784

88-
return decorator
85+
return t.cast(F, wrapper)
8986

9087

9188
class KernelManager(ConnectionFileMixin):
@@ -94,14 +91,18 @@ class KernelManager(ConnectionFileMixin):
9491
This version starts kernels with Popen.
9592
"""
9693

97-
_ready: t.Optional[t.Union[Future, CFuture]]
98-
_shutdown_ready: t.Optional[CFuture]
94+
_ready: t.Union[Future, CFuture]
9995

10096
def __init__(self, *args, **kwargs):
10197
super().__init__(**kwargs)
10298
self._shutdown_status = _ShutdownStatus.Unset
103-
self._ready = None
104-
self._shutdown_ready = None
99+
# Create a place holder future.
100+
try:
101+
asyncio.get_running_loop()
102+
self._ready = Future()
103+
except RuntimeError:
104+
# No event loop running, use concurrent future
105+
self._ready = CFuture()
105106

106107
_created_context: Bool = Bool(False)
107108

@@ -119,8 +120,6 @@ def _context_default(self) -> zmq.Context:
119120
)
120121
client_factory: Type = Type(klass="jupyter_client.KernelClient")
121122

122-
_future_factory: t.Type[CFuture] = CFuture
123-
124123
@default("client_factory") # type:ignore[misc]
125124
def _client_factory_default(self) -> Type:
126125
return import_item(self.client_class)
@@ -186,20 +185,9 @@ def _default_cache_ports(self) -> bool:
186185
return self.transport == "tcp"
187186

188187
@property
189-
def ready(self) -> CFuture:
190-
"""A future that resolves when the kernel process has started."""
191-
if not self._ready:
192-
self._ready = self._future_factory()
193-
assert self._ready is not None
194-
return self._ready # type:ignore[return-value]
195-
196-
@property
197-
def shutdown_ready(self) -> CFuture:
198-
"""A future that resolves when the kernel process has shut down."""
199-
if not self._shutdown_ready:
200-
self._shutdown_ready = self._future_factory()
201-
assert self._shutdown_ready is not None
202-
return self._shutdown_ready
188+
def ready(self) -> t.Union[CFuture, Future]:
189+
"""A future that resolves when the kernel process has started for the first time"""
190+
return self._ready
203191

204192
@property
205193
def ipykernel(self) -> bool:
@@ -381,7 +369,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
381369

382370
post_start_kernel = run_sync(_async_post_start_kernel)
383371

384-
@in_pending_state()
372+
@in_pending_state
385373
async def _async_start_kernel(self, **kw: t.Any) -> None:
386374
"""Starts a kernel on this host in a separate process.
387375
@@ -474,7 +462,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:
474462

475463
cleanup_resources = run_sync(_async_cleanup_resources)
476464

477-
@in_pending_state('_shutdown')
465+
@in_pending_state
478466
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
479467
"""Attempts to stop the kernel process cleanly.
480468
@@ -493,8 +481,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
493481
Will this kernel be restarted after it is shutdown. When this
494482
is True, connection files will not be cleaned up.
495483
"""
496-
# Reset the start ready future.
497-
self._ready = self._future_factory()
498484
self.shutting_down = True # Used by restarter to prevent race condition
499485
# Stop monitoring for restarting while we shutdown.
500486
self.stop_restarter()
@@ -657,7 +643,6 @@ class AsyncKernelManager(KernelManager):
657643
)
658644
client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient")
659645

660-
_future_factory: t.Type[Future] = Future # type:ignore[assignment]
661646
_launch_kernel = KernelManager._async_launch_kernel
662647
start_kernel = KernelManager._async_start_kernel
663648
pre_start_kernel = KernelManager._async_pre_start_kernel

jupyter_client/multikernelmanager.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ async def _async_start_kernel(
221221
self._kernels[kernel_id] = km
222222
else:
223223
await task
224-
await asyncio.wrap_future(km.ready)
224+
# raise an exception if one occurred during kernel startup.
225+
if km.ready.exception():
226+
raise km.ready.exception() # type: ignore
225227

226228
return kernel_id
227229

@@ -251,17 +253,15 @@ async def _async_shutdown_kernel(
251253
try:
252254
await task
253255
km = self.get_kernel(kernel_id)
254-
await asyncio.wrap_future(km.ready)
256+
await t.cast(asyncio.Future, km.ready)
255257
except asyncio.CancelledError:
256258
pass
257259
except Exception:
258260
self.remove_kernel(kernel_id)
259261
return
260262
km = self.get_kernel(kernel_id)
261263
# If a pending kernel raised an exception, remove it.
262-
try:
263-
await asyncio.wrap_future(km.ready)
264-
except Exception:
264+
if not km.ready.cancelled() and km.ready.exception():
265265
self.remove_kernel(kernel_id)
266266
return
267267
stopper = ensure_async(km.shutdown_kernel(now, restart))
@@ -270,7 +270,9 @@ async def _async_shutdown_kernel(
270270
# Await the kernel if not using pending kernels.
271271
if not self._using_pending_kernels():
272272
await fut
273-
await asyncio.wrap_future(km.shutdown_ready)
273+
# raise an exception if one occurred during kernel shutdown.
274+
if km.ready.exception():
275+
raise km.ready.exception() # type: ignore
274276

275277
shutdown_kernel = run_sync(_async_shutdown_kernel)
276278

@@ -313,7 +315,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
313315
if self._using_pending_kernels():
314316
for km in kms:
315317
try:
316-
await km.shutdown_ready
318+
await km.ready
317319
except asyncio.CancelledError:
318320
self._pending_kernels[km.kernel_id].cancel()
319321
except Exception:

jupyter_client/tests/test_kernelmanager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ def zmq_context():
112112

113113

114114
@pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass])
115-
async def async_km(request, config):
115+
def async_km(request, config):
116116
km = request.param(config=config)
117117
return km
118118

119119

120120
@pytest.fixture
121-
async def async_km_subclass(config):
121+
def async_km_subclass(config):
122122
km = AsyncKMSubclass(config=config)
123123
return km
124124

@@ -451,11 +451,11 @@ async def test_lifecycle(self, async_km):
451451
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
452452
is_alive = await async_km.is_alive()
453453
assert is_alive
454-
await async_km.ready
454+
is_ready = async_km.ready.done()
455+
assert is_ready
455456
await async_km.restart_kernel(now=True)
456457
is_alive = await async_km.is_alive()
457458
assert is_alive
458-
await async_km.ready
459459
await async_km.interrupt_kernel()
460460
assert isinstance(async_km, AsyncKernelManager)
461461
await async_km.shutdown_kernel(now=True)

jupyter_client/tests/test_multikernelmanager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ async def test_use_pending_kernels(self):
395395
assert isinstance(k, AsyncKernelManager)
396396
await ensure_future(km.shutdown_kernel(kid, now=True))
397397
# Wait for the kernel to shutdown
398-
await kernel.shutdown_ready
398+
await kernel.ready
399399
assert kid not in km, f"{kid} not in {km}"
400400

401401
@gen_test
@@ -409,7 +409,7 @@ async def test_use_pending_kernels_early_restart(self):
409409
await kernel.ready
410410
await ensure_future(km.shutdown_kernel(kid, now=True))
411411
# Wait for the kernel to shutdown
412-
await kernel.shutdown_ready
412+
await kernel.ready
413413
assert kid not in km, f"{kid} not in {km}"
414414

415415
@gen_test
@@ -421,7 +421,7 @@ async def test_use_pending_kernels_early_shutdown(self):
421421
# Try shutting down while the kernel is pending
422422
await ensure_future(km.shutdown_kernel(kid, now=True))
423423
# Wait for the kernel to shutdown
424-
await kernel.shutdown_ready
424+
await kernel.ready
425425
assert kid not in km, f"{kid} not in {km}"
426426

427427
@gen_test
@@ -436,7 +436,7 @@ async def test_use_pending_kernels_early_interrupt(self):
436436
await kernel.ready
437437
await ensure_future(km.shutdown_kernel(kid, now=True))
438438
# Wait for the kernel to shutdown
439-
await kernel.shutdown_ready
439+
await kernel.ready
440440
assert kid not in km, f"{kid} not in {km}"
441441

442442
@gen_test

0 commit comments

Comments
 (0)