Skip to content

Commit d4e09db

Browse files
Async jupyter client (#10)
* Asynchronous cell execution
1 parent 8174e9e commit d4e09db

File tree

5 files changed

+238
-110
lines changed

5 files changed

+238
-110
lines changed

.travis.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,22 @@ matrix:
1010
env: TOXENV=py37
1111
- python: 3.8
1212
env: TOXENV=py38
13-
- python: 3.6
13+
- python: 3.8
1414
env: TOXENV=flake8
15-
- python: 3.6
15+
- python: 3.8
1616
env: TOXENV=dist
17-
- python: 3.6
17+
- python: 3.8
1818
env: TOXENV=docs
19-
- python: 3.6
19+
- python: 3.8
2020
env: TOXENV=manifest
2121
install:
2222
- pip install tox coverage codecov
2323
script:
2424
- tox -e $TOXENV
2525
after_success:
2626
- test $TRAVIS_BRANCH = "master" &&
27-
test $TOXENV = "py36" &&
27+
test $TOXENV = "py38" &&
2828
coverage xml -i
2929
- test $TRAVIS_BRANCH = "master" &&
30-
test $TOXENV = "py36" &&
30+
test $TOXENV = "py38" &&
3131
codecov

nbclient/client.py

Lines changed: 123 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import base64
22
from textwrap import dedent
3-
from contextlib import contextmanager
3+
4+
# For python 3.5 compatibility we import asynccontextmanager from async_generator instead of
5+
# contextlib, and we `await yield_()` instead of just `yield`
6+
from async_generator import asynccontextmanager, async_generator, yield_
7+
48
from time import monotonic
59
from queue import Empty
10+
import asyncio
611

712
from traitlets.config.configurable import LoggingConfigurable
813
from traitlets import List, Unicode, Bool, Enum, Any, Type, Dict, Integer, default
@@ -285,9 +290,10 @@ def start_kernel_manager(self):
285290
self.km = self.kernel_manager_class(config=self.config)
286291
else:
287292
self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config)
293+
self.km.client_class = 'jupyter_client.asynchronous.AsyncKernelClient'
288294
return self.km
289295

290-
def start_new_kernel_client(self, **kwargs):
296+
async def start_new_kernel_client(self, **kwargs):
291297
"""Creates a new kernel client.
292298
293299
Parameters
@@ -314,16 +320,17 @@ def start_new_kernel_client(self, **kwargs):
314320
self.kc = self.km.client()
315321
self.kc.start_channels()
316322
try:
317-
self.kc.wait_for_ready(timeout=self.startup_timeout)
323+
await self.kc.wait_for_ready(timeout=self.startup_timeout)
318324
except RuntimeError:
319325
self.kc.stop_channels()
320326
self.km.shutdown_kernel()
321327
raise
322328
self.kc.allow_stdin = False
323329
return self.kc
324330

325-
@contextmanager
326-
def setup_kernel(self, **kwargs):
331+
@asynccontextmanager
332+
@async_generator # needed for python 3.5 compatibility
333+
async def setup_kernel(self, **kwargs):
327334
"""
328335
Context manager for setting up the kernel to execute a notebook.
329336
@@ -336,16 +343,28 @@ def setup_kernel(self, **kwargs):
336343
self.start_kernel_manager()
337344

338345
if not self.km.has_kernel:
339-
self.start_new_kernel_client(**kwargs)
346+
await self.start_new_kernel_client(**kwargs)
340347
try:
341-
yield
348+
await yield_(None) # would just yield in python >3.5
342349
finally:
343350
self.kc.stop_channels()
344351
self.kc = None
345352

346353
def execute(self, **kwargs):
347354
"""
348-
Executes each code cell.
355+
Executes each code cell (blocking).
356+
357+
Returns
358+
-------
359+
nb : NotebookNode
360+
The executed notebook.
361+
"""
362+
loop = get_loop()
363+
return loop.run_until_complete(self.async_execute(**kwargs))
364+
365+
async def async_execute(self, **kwargs):
366+
"""
367+
Executes each code cell asynchronously.
349368
350369
Returns
351370
-------
@@ -354,13 +373,15 @@ def execute(self, **kwargs):
354373
"""
355374
self.reset_execution_trackers()
356375

357-
with self.setup_kernel(**kwargs):
376+
async with self.setup_kernel(**kwargs):
358377
self.log.info("Executing notebook with kernel: %s" % self.kernel_name)
359378
for index, cell in enumerate(self.nb.cells):
360379
# Ignore `'execution_count' in content` as it's always 1
361380
# when store_history is False
362-
self.execute_cell(cell, index, execution_count=self.code_cells_executed + 1)
363-
info_msg = self._wait_for_reply(self.kc.kernel_info())
381+
await self.async_execute_cell(
382+
cell, index, execution_count=self.code_cells_executed + 1
383+
)
384+
info_msg = await self._wait_for_reply(self.kc.kernel_info())
364385
self.nb.metadata['language_info'] = info_msg['content']['language_info']
365386
self.set_widgets_metadata()
366387

@@ -408,16 +429,40 @@ def _update_display_id(self, display_id, msg):
408429
outputs[output_idx]['data'] = out['data']
409430
outputs[output_idx]['metadata'] = out['metadata']
410431

411-
def _poll_for_reply(self, msg_id, cell=None, timeout=None):
412-
try:
413-
# check with timeout if kernel is still alive
414-
msg = self.kc.shell_channel.get_msg(timeout=timeout)
415-
if msg['parent_header'].get('msg_id') == msg_id:
416-
return msg
417-
except Empty:
418-
# received no message, check if kernel is still alive
419-
self._check_alive()
420-
# kernel still alive, wait for a message
432+
async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg):
433+
if timeout is not None:
434+
deadline = monotonic() + timeout
435+
while True:
436+
try:
437+
msg = await self.kc.shell_channel.get_msg(timeout=timeout)
438+
if msg['parent_header'].get('msg_id') == msg_id:
439+
try:
440+
await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout)
441+
except (asyncio.TimeoutError, Empty):
442+
if self.raise_on_iopub_timeout:
443+
raise CellTimeoutError.error_from_timeout_and_cell(
444+
"Timeout waiting for IOPub output", self.iopub_timeout, cell
445+
)
446+
else:
447+
self.log.warning("Timeout waiting for IOPub output")
448+
return msg
449+
else:
450+
if timeout is not None:
451+
timeout = max(0, deadline - monotonic())
452+
except Empty:
453+
# received no message, check if kernel is still alive
454+
self._check_alive()
455+
self._handle_timeout(timeout, cell)
456+
457+
async def _poll_output_msg(self, parent_msg_id, cell, cell_index):
458+
while True:
459+
msg = await self.kc.iopub_channel.get_msg(timeout=None)
460+
if msg['parent_header'].get('msg_id') == parent_msg_id:
461+
try:
462+
# Will raise CellExecutionComplete when completed
463+
self.process_message(msg, cell, cell_index)
464+
except CellExecutionComplete:
465+
return
421466

422467
def _get_timeout(self, cell):
423468
if self.timeout_func is not None and cell is not None:
@@ -445,14 +490,14 @@ def _check_alive(self):
445490
self.log.error("Kernel died while waiting for execute reply.")
446491
raise DeadKernelError("Kernel died")
447492

448-
def _wait_for_reply(self, msg_id, cell=None):
493+
async def _wait_for_reply(self, msg_id, cell=None):
449494
# wait for finish, with timeout
450495
timeout = self._get_timeout(cell)
451496
cummulative_time = 0
452497
self.shell_timeout_interval = 5
453498
while True:
454499
try:
455-
msg = self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
500+
msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
456501
except Empty:
457502
self._check_alive()
458503
cummulative_time += self.shell_timeout_interval
@@ -488,7 +533,46 @@ def _check_raise_for_error(self, cell, exec_reply):
488533

489534
def execute_cell(self, cell, cell_index, execution_count=None, store_history=True):
490535
"""
491-
Executes a single code cell.
536+
Executes a single code cell (blocking).
537+
538+
To execute all cells see :meth:`execute`.
539+
540+
Parameters
541+
----------
542+
cell : nbformat.NotebookNode
543+
The cell which is currently being processed.
544+
cell_index : int
545+
The position of the cell within the notebook object.
546+
execution_count : int
547+
The execution count to be assigned to the cell (default: Use kernel response)
548+
store_history : bool
549+
Determines if history should be stored in the kernel (default: False).
550+
Specific to ipython kernels, which can store command histories.
551+
552+
Returns
553+
-------
554+
output : dict
555+
The execution output payload (or None for no output).
556+
557+
Raises
558+
------
559+
CellExecutionError
560+
If execution failed and should raise an exception, this will be raised
561+
with defaults about the failure.
562+
563+
Returns
564+
-------
565+
cell : NotebookNode
566+
The cell which was just processed.
567+
"""
568+
loop = get_loop()
569+
return loop.run_until_complete(
570+
self.async_execute_cell(cell, cell_index, execution_count, store_history)
571+
)
572+
573+
async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True):
574+
"""
575+
Executes a single code cell asynchronously.
492576
493577
To execute all cells see :meth:`execute`.
494578
@@ -531,70 +615,16 @@ def execute_cell(self, cell, cell_index, execution_count=None, store_history=Tru
531615
# We launched a code cell to execute
532616
self.code_cells_executed += 1
533617
exec_timeout = self._get_timeout(cell)
534-
deadline = None
535-
if exec_timeout is not None:
536-
deadline = monotonic() + exec_timeout
537618

538619
cell.outputs = []
539620
self.clear_before_next_output = False
540621

541-
# This loop resolves nbconvert#659. By polling iopub_channel's and shell_channel's
542-
# output we avoid dropping output and important signals (like idle) from
543-
# iopub_channel. Prior to this change, iopub_channel wasn't polled until
544-
# after exec_reply was obtained from shell_channel, leading to the
545-
# aforementioned dropped data.
546-
547-
# These two variables are used to track what still needs polling:
548-
# more_output=true => continue to poll the iopub_channel
549-
more_output = True
550-
# polling_exec_reply=true => continue to poll the shell_channel
551-
polling_exec_reply = True
552-
553-
while more_output or polling_exec_reply:
554-
if polling_exec_reply:
555-
if self._passed_deadline(deadline):
556-
self._handle_timeout(exec_timeout, cell)
557-
polling_exec_reply = False
558-
continue
559-
560-
# Avoid exceeding the execution timeout (deadline), but stop
561-
# after at most 1s so we can poll output from iopub_channel.
562-
timeout = self._timeout_with_deadline(1, deadline)
563-
exec_reply = self._poll_for_reply(parent_msg_id, cell, timeout)
564-
if exec_reply is not None:
565-
polling_exec_reply = False
566-
567-
if more_output:
568-
try:
569-
timeout = self.iopub_timeout
570-
if polling_exec_reply:
571-
# Avoid exceeding the execution timeout (deadline) while
572-
# polling for output.
573-
timeout = self._timeout_with_deadline(timeout, deadline)
574-
msg = self.kc.iopub_channel.get_msg(timeout=timeout)
575-
except Empty:
576-
if polling_exec_reply:
577-
# Still waiting for execution to finish so we expect that
578-
# output may not always be produced yet.
579-
continue
580-
581-
if self.raise_on_iopub_timeout:
582-
raise CellTimeoutError.error_from_timeout_and_cell(
583-
"Timeout waiting for IOPub output", self.iopub_timeout, cell
584-
)
585-
else:
586-
self.log.warning("Timeout waiting for IOPub output")
587-
more_output = False
588-
continue
589-
if msg['parent_header'].get('msg_id') != parent_msg_id:
590-
# not an output from our execution
591-
continue
592-
593-
try:
594-
# Will raise CellExecutionComplete when completed
595-
self.process_message(msg, cell, cell_index)
596-
except CellExecutionComplete:
597-
more_output = False
622+
task_poll_output_msg = asyncio.ensure_future(
623+
self._poll_output_msg(parent_msg_id, cell, cell_index)
624+
)
625+
exec_reply = await self._poll_for_reply(
626+
parent_msg_id, cell, exec_timeout, task_poll_output_msg
627+
)
598628

599629
if execution_count:
600630
cell['execution_count'] = execution_count
@@ -748,3 +778,12 @@ def execute(nb, cwd=None, km=None, **kwargs):
748778
if cwd is not None:
749779
resources['metadata'] = {'path': cwd}
750780
return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute()
781+
782+
783+
def get_loop():
784+
try:
785+
loop = asyncio.get_event_loop()
786+
except RuntimeError:
787+
loop = asyncio.new_event_loop()
788+
asyncio.set_event_loop(loop)
789+
return loop

0 commit comments

Comments
 (0)