Skip to content

Commit f502655

Browse files
authored
Merge pull request #1 from davidbrochart/zombieProcFix
Replace await_or_block with ensure_async
2 parents fb2285a + ca653f1 commit f502655

File tree

3 files changed

+38
-34
lines changed

3 files changed

+38
-34
lines changed

nbclient/client.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
CellExecutionComplete,
2424
CellExecutionError
2525
)
26-
from .util import run_sync, await_or_block
26+
from .util import run_sync, ensure_async
2727

2828

2929
def timestamp():
@@ -334,20 +334,20 @@ def start_kernel_manager(self):
334334
async def _async_cleanup_kernel(self):
335335
try:
336336
# Send a polite shutdown request
337-
await await_or_block(self.kc.shutdown)
337+
await ensure_async(self.kc.shutdown())
338338
try:
339339
# Queue the manager to kill the process, sometimes the built-in and above
340340
# shutdowns have not been successful or called yet, so give a direct kill
341341
# call here and recover gracefully if it's already dead.
342-
await await_or_block(self.km.shutdown_kernel, now=True)
342+
await ensure_async(self.km.shutdown_kernel(now=True))
343343
except RuntimeError as e:
344344
# The error isn't specialized, so we have to check the message
345345
if 'No kernel is running!' not in str(e):
346346
raise
347347
finally:
348348
# Remove any state left over even if we failed to stop the kernel
349-
await await_or_block(self.km.cleanup)
350-
await await_or_block(self.kc.stop_channels)
349+
await ensure_async(self.km.cleanup())
350+
await ensure_async(self.kc.stop_channels())
351351
self.kc = None
352352

353353
_cleanup_kernel = run_sync(_async_cleanup_kernel)
@@ -374,12 +374,12 @@ async def async_start_new_kernel_client(self, **kwargs):
374374
if self.km.ipykernel and self.ipython_hist_file:
375375
self.extra_arguments += ['--HistoryManager.hist_file={}'.format(self.ipython_hist_file)]
376376

377-
await await_or_block(self.km.start_kernel, extra_arguments=self.extra_arguments, **kwargs)
377+
await ensure_async(self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs))
378378

379379
self.kc = self.km.client()
380-
await await_or_block(self.kc.start_channels)
380+
await ensure_async(self.kc.start_channels())
381381
try:
382-
await await_or_block(self.kc.wait_for_ready, timeout=self.startup_timeout)
382+
await ensure_async(self.kc.wait_for_ready(timeout=self.startup_timeout))
383383
except RuntimeError:
384384
await self._async_cleanup_kernel()
385385
raise
@@ -449,7 +449,7 @@ async def async_execute(self, **kwargs):
449449
await self.async_execute_cell(
450450
cell, index, execution_count=self.code_cells_executed + 1
451451
)
452-
msg_id = await await_or_block(self.kc.kernel_info)
452+
msg_id = await ensure_async(self.kc.kernel_info())
453453
info_msg = await self.async_wait_for_reply(msg_id)
454454
self.nb.metadata['language_info'] = info_msg['content']['language_info']
455455
self.set_widgets_metadata()
@@ -505,7 +505,7 @@ async def _async_poll_for_reply(self, msg_id, cell, timeout, task_poll_output_ms
505505
deadline = monotonic() + timeout
506506
while True:
507507
try:
508-
msg = await await_or_block(self.kc.shell_channel.get_msg, timeout=timeout)
508+
msg = await ensure_async(self.kc.shell_channel.get_msg(timeout=timeout))
509509
if msg['parent_header'].get('msg_id') == msg_id:
510510
if self.record_timing:
511511
cell['metadata']['execution']['shell.execute_reply'] = timestamp()
@@ -529,7 +529,7 @@ async def _async_poll_for_reply(self, msg_id, cell, timeout, task_poll_output_ms
529529

530530
async def _async_poll_output_msg(self, parent_msg_id, cell, cell_index):
531531
while True:
532-
msg = await await_or_block(self.kc.iopub_channel.get_msg, timeout=None)
532+
msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None))
533533
if msg['parent_header'].get('msg_id') == parent_msg_id:
534534
try:
535535
# Will raise CellExecutionComplete when completed
@@ -552,14 +552,14 @@ async def _async_handle_timeout(self, timeout, cell=None):
552552
self.log.error("Timeout waiting for execute reply (%is)." % timeout)
553553
if self.interrupt_on_timeout:
554554
self.log.error("Interrupting kernel")
555-
await await_or_block(self.km.interrupt_kernel)
555+
await ensure_async(self.km.interrupt_kernel())
556556
else:
557557
raise CellTimeoutError.error_from_timeout_and_cell(
558558
"Cell execution timed out", timeout, cell
559559
)
560560

561561
async def _async_check_alive(self):
562-
if not await await_or_block(self.kc.is_alive):
562+
if not await ensure_async(self.kc.is_alive()):
563563
self.log.error("Kernel died while waiting for execute reply.")
564564
raise DeadKernelError("Kernel died")
565565

@@ -569,9 +569,10 @@ async def async_wait_for_reply(self, msg_id, cell=None):
569569
cummulative_time = 0
570570
while True:
571571
try:
572-
msg = await await_or_block(
573-
self.kc.shell_channel.get_msg,
574-
timeout=self.shell_timeout_interval
572+
msg = await ensure_async(
573+
self.kc.shell_channel.get_msg(
574+
timeout=self.shell_timeout_interval
575+
)
575576
)
576577
except Empty:
577578
await self._async_check_alive()
@@ -652,11 +653,12 @@ async def async_execute_cell(self, cell, cell_index, execution_count=None, store
652653
cell['metadata']['execution'] = {}
653654

654655
self.log.debug("Executing cell:\n%s", cell.source)
655-
parent_msg_id = await await_or_block(
656-
self.kc.execute,
657-
cell.source,
658-
store_history=store_history,
659-
stop_on_error=not self.allow_errors
656+
parent_msg_id = await ensure_async(
657+
self.kc.execute(
658+
cell.source,
659+
store_history=store_history,
660+
stop_on_error=not self.allow_errors
661+
)
660662
)
661663
# We launched a code cell to execute
662664
self.code_cells_executed += 1

nbclient/tests/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ def test_kernel_death(self):
506506
km = executor.start_kernel_manager()
507507

508508
with patch.object(km, "is_alive") as alive_mock:
509-
alive_mock.return_value = make_async(False)
509+
alive_mock.return_value = False
510510
# Will be a RuntimeError or subclass DeadKernelError depending
511511
# on if jupyter_client or nbconvert catches the dead client first
512512
with pytest.raises(RuntimeError):

nbclient/util.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
# Distributed under the terms of the Modified BSD License.
55

66
import asyncio
7-
8-
from typing import Coroutine
7+
import inspect
98

109

1110
def run_sync(coro):
@@ -49,15 +48,18 @@ def wrapped(self, *args, **kwargs):
4948
return wrapped
5049

5150

52-
async def await_or_block(func, *args, **kwargs):
53-
"""Awaits the function if it's an asynchronous function. Otherwise block
54-
on execution.
51+
async def ensure_async(obj):
52+
"""Convert a non-awaitable object to a coroutine if needed,
53+
and await it if it was not already awaited.
5554
"""
56-
if asyncio.iscoroutinefunction(func):
57-
return await func(*args, **kwargs)
58-
else:
59-
result = func(*args, **kwargs)
60-
# Mocks mask that the function is a coroutine :/
61-
if isinstance(result, Coroutine):
62-
return await result
55+
if inspect.isawaitable(obj):
56+
try:
57+
result = await obj
58+
except RuntimeError as e:
59+
if str(e) == 'cannot reuse already awaited coroutine':
60+
# obj is already the coroutine's result
61+
return obj
62+
raise
6363
return result
64+
# obj doesn't need to be awaited
65+
return obj

0 commit comments

Comments
 (0)