Skip to content

Commit 6b76603

Browse files
authored
Merge pull request #712 from blink1073/pending-kernels
Add support for pending kernels
2 parents 670ee79 + 40e8878 commit 6b76603

File tree

6 files changed

+205
-54
lines changed

6 files changed

+205
-54
lines changed

jupyter_client/manager.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import sys
99
import typing as t
1010
import uuid
11+
from asyncio.futures import Future
12+
from concurrent.futures import Future as CFuture
1113
from contextlib import contextmanager
1214
from enum import Enum
1315

@@ -58,6 +60,11 @@ class KernelManager(ConnectionFileMixin):
5860
def __init__(self, *args, **kwargs):
5961
super().__init__(**kwargs)
6062
self._shutdown_status = _ShutdownStatus.Unset
63+
try:
64+
self._ready = Future()
65+
except RuntimeError:
66+
# No event loop running, use concurrent future
67+
self._ready = CFuture()
6168

6269
_created_context: Bool = Bool(False)
6370

@@ -139,6 +146,11 @@ def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]:
139146
def _default_cache_ports(self) -> bool:
140147
return self.transport == "tcp"
141148

149+
@property
150+
def ready(self) -> Future:
151+
"""A future that resolves when the kernel process has started for the first time"""
152+
return self._ready
153+
142154
@property
143155
def ipykernel(self) -> bool:
144156
return self.kernel_name in {"python", "python2", "python3"}
@@ -329,12 +341,25 @@ async def _async_start_kernel(self, **kw):
329341
keyword arguments that are passed down to build the kernel_cmd
330342
and launching the kernel (e.g. Popen kwargs).
331343
"""
332-
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))
344+
done = self._ready.done()
333345

334-
# launch the kernel subprocess
335-
self.log.debug("Starting kernel: %s", kernel_cmd)
336-
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
337-
await ensure_async(self.post_start_kernel(**kw))
346+
try:
347+
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))
348+
349+
# launch the kernel subprocess
350+
self.log.debug("Starting kernel: %s", kernel_cmd)
351+
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
352+
await ensure_async(self.post_start_kernel(**kw))
353+
if not done:
354+
# Add a small sleep to ensure tests can capture the state before done
355+
await asyncio.sleep(0.01)
356+
self._ready.set_result(None)
357+
358+
except Exception as e:
359+
if not done:
360+
self._ready.set_exception(e)
361+
self.log.exception(self._ready.exception())
362+
raise e
338363

339364
start_kernel = run_sync(_async_start_kernel)
340365

@@ -427,6 +452,10 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
427452
Will this kernel be restarted after it is shutdown. When this
428453
is True, connection files will not be cleaned up.
429454
"""
455+
# Shutdown is a no-op for a kernel that had a failed startup
456+
if self._ready.exception():
457+
return
458+
430459
self.shutting_down = True # Used by restarter to prevent race condition
431460
# Stop monitoring for restarting while we shutdown.
432461
self.stop_restarter()
@@ -473,16 +502,19 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False,
473502
"""
474503
if self._launch_args is None:
475504
raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.")
476-
else:
477-
# Stop currently running kernel.
478-
await ensure_async(self.shutdown_kernel(now=now, restart=True))
479505

480-
if newports:
481-
self.cleanup_random_ports()
506+
if not self._ready.done():
507+
raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.")
508+
509+
# Stop currently running kernel.
510+
await ensure_async(self.shutdown_kernel(now=now, restart=True))
511+
512+
if newports:
513+
self.cleanup_random_ports()
482514

483-
# Start new kernel.
484-
self._launch_args.update(kw)
485-
await ensure_async(self.start_kernel(**self._launch_args))
515+
# Start new kernel.
516+
self._launch_args.update(kw)
517+
await ensure_async(self.start_kernel(**self._launch_args))
486518

487519
restart_kernel = run_sync(_async_restart_kernel)
488520

jupyter_client/multikernelmanager.py

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,17 @@ class MultiKernelManager(LoggingConfigurable):
5252
"""A class for managing multiple kernels."""
5353

5454
default_kernel_name = Unicode(
55-
NATIVE_KERNEL_NAME, config=True, help="The name of the default kernel to start"
56-
)
55+
NATIVE_KERNEL_NAME, help="The name of the default kernel to start"
56+
).tag(config=True)
5757

5858
kernel_spec_manager = Instance(KernelSpecManager, allow_none=True)
5959

6060
kernel_manager_class = DottedObjectName(
6161
"jupyter_client.ioloop.IOLoopKernelManager",
62-
config=True,
6362
help="""The kernel manager class. This is configurable to allow
6463
subclassing of the KernelManager for customized behavior.
6564
""",
66-
)
65+
).tag(config=True)
6766

6867
@observe("kernel_manager_class")
6968
def _kernel_manager_class_changed(self, change):
@@ -91,9 +90,8 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager:
9190

9291
shared_context = Bool(
9392
True,
94-
config=True,
9593
help="Share a single zmq.Context to talk to all my kernels",
96-
)
94+
).tag(config=True)
9795

9896
_created_context = Bool(False)
9997

@@ -163,8 +161,11 @@ def pre_start_kernel(
163161
async def _add_kernel_when_ready(
164162
self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable
165163
) -> None:
166-
await kernel_awaitable
167-
self._kernels[kernel_id] = km
164+
try:
165+
await kernel_awaitable
166+
self._kernels[kernel_id] = km
167+
finally:
168+
self._starting_kernels.pop(kernel_id, None)
168169

169170
async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str:
170171
"""Start a new kernel.
@@ -182,12 +183,16 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
182183
)
183184
)
184185
kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner
185-
fut = asyncio.ensure_future(
186-
self._add_kernel_when_ready(kernel_id, km, ensure_async(km.start_kernel(**kwargs)))
187-
)
186+
187+
starter = ensure_async(km.start_kernel(**kwargs))
188+
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
188189
self._starting_kernels[kernel_id] = fut
189-
await fut
190-
del self._starting_kernels[kernel_id]
190+
191+
if getattr(self, 'use_pending_kernels', False):
192+
self._kernels[kernel_id] = km
193+
else:
194+
await fut
195+
191196
return kernel_id
192197

193198
start_kernel = run_sync(_async_start_kernel)
@@ -210,9 +215,13 @@ async def _async_shutdown_kernel(
210215
Will the kernel be restarted?
211216
"""
212217
self.log.info("Kernel shutdown: %s" % kernel_id)
213-
218+
if kernel_id in self._starting_kernels:
219+
try:
220+
await self._starting_kernels[kernel_id]
221+
except Exception:
222+
self.remove_kernel(kernel_id)
223+
return
214224
km = self.get_kernel(kernel_id)
215-
216225
await ensure_async(km.shutdown_kernel(now, restart))
217226
self.remove_kernel(kernel_id)
218227

@@ -246,18 +255,11 @@ def remove_kernel(self, kernel_id: str) -> KernelManager:
246255
"""
247256
return self._kernels.pop(kernel_id, None)
248257

249-
async def _shutdown_starting_kernel(self, kid: str, now: bool) -> None:
250-
if kid in self._starting_kernels:
251-
await self._starting_kernels[kid]
252-
await ensure_async(self.shutdown_kernel(kid, now=now))
253-
254258
async def _async_shutdown_all(self, now: bool = False) -> None:
255259
"""Shutdown all kernels."""
256260
kids = self.list_kernel_ids()
257-
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in kids]
258-
futs += [
259-
self._shutdown_starting_kernel(kid, now=now) for kid in self._starting_kernels.keys()
260-
]
261+
kids += list(self._starting_kernels)
262+
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
261263
await asyncio.gather(*futs)
262264

263265
shutdown_all = run_sync(_async_shutdown_all)
@@ -466,6 +468,12 @@ class AsyncMultiKernelManager(MultiKernelManager):
466468
""",
467469
)
468470

471+
use_pending_kernels = Bool(
472+
False,
473+
help="""Whether to make kernels available before the process has started. The
474+
kernel has a `.ready` future which can be awaited before connecting""",
475+
).tag(config=True)
476+
469477
start_kernel = MultiKernelManager._async_start_kernel
470478
shutdown_kernel = MultiKernelManager._async_shutdown_kernel
471479
shutdown_all = MultiKernelManager._async_shutdown_all

jupyter_client/tests/test_kernelmanager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ class TestKernelManager:
188188
def test_lifecycle(self, km):
189189
km.start_kernel(stdout=PIPE, stderr=PIPE)
190190
assert km.is_alive()
191+
is_done = km.ready.done()
192+
assert is_done
191193
km.restart_kernel(now=True)
192194
assert km.is_alive()
193195
km.interrupt_kernel()
@@ -439,6 +441,8 @@ async def test_lifecycle(self, async_km):
439441
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
440442
is_alive = await async_km.is_alive()
441443
assert is_alive
444+
is_ready = async_km.ready.done()
445+
assert is_ready
442446
await async_km.restart_kernel(now=True)
443447
is_alive = await async_km.is_alive()
444448
assert is_alive

jupyter_client/tests/test_kernelspec.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,18 @@
1818
import pytest
1919
from jupyter_core import paths
2020

21+
from .utils import install_kernel
22+
from .utils import sample_kernel_json
2123
from .utils import test_env
2224
from jupyter_client import kernelspec
2325

24-
sample_kernel_json = {
25-
"argv": ["cat", "{connection_file}"],
26-
"display_name": "Test kernel",
27-
}
28-
2926

3027
class KernelSpecTests(unittest.TestCase):
31-
def _install_sample_kernel(self, kernels_dir):
32-
"""install a sample kernel in a kernels directory"""
33-
sample_kernel_dir = pjoin(kernels_dir, "sample")
34-
os.makedirs(sample_kernel_dir)
35-
json_file = pjoin(sample_kernel_dir, "kernel.json")
36-
with open(json_file, "w") as f:
37-
json.dump(sample_kernel_json, f)
38-
return sample_kernel_dir
39-
4028
def setUp(self):
4129
self.env_patch = test_env()
4230
self.env_patch.start()
43-
self.sample_kernel_dir = self._install_sample_kernel(
44-
pjoin(paths.jupyter_data_dir(), "kernels")
31+
self.sample_kernel_dir = install_kernel(
32+
pjoin(paths.jupyter_data_dir(), "kernels"), name="sample"
4533
)
4634

4735
self.ksm = kernelspec.KernelSpecManager()
@@ -87,7 +75,7 @@ def test_find_all_specs(self):
8775
def test_kernel_spec_priority(self):
8876
td = TemporaryDirectory()
8977
self.addCleanup(td.cleanup)
90-
sample_kernel = self._install_sample_kernel(td.name)
78+
sample_kernel = install_kernel(td.name, name="sample")
9179
self.ksm.kernel_dirs.append(td.name)
9280
kernels = self.ksm.find_kernel_specs()
9381
self.assertEqual(kernels["sample"], self.sample_kernel_dir)

0 commit comments

Comments
 (0)