Skip to content

Commit ba3c976

Browse files
authored
vendor run_sync, fix invocation (#4141)
* vendor run_sync, fix invocation
1 parent 833d408 commit ba3c976

File tree

2 files changed

+94
-3
lines changed

2 files changed

+94
-3
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Copyright (c) Jupyter Development Team.
2+
# Originally in jupyter_core/utils/__init__.py
3+
# https://github.com/jupyter/jupyter_core/blob/main/jupyter_core/utils/__init__.py
4+
5+
from typing import Awaitable, Callable, List, Optional, TypeVar, Union, cast
6+
import threading
7+
import atexit
8+
import asyncio
9+
import inspect
10+
11+
12+
class _TaskRunner:
13+
"""A task runner that runs an asyncio event loop on a background thread."""
14+
15+
def __init__(self):
16+
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
17+
self.__runner_thread: Optional[threading.Thread] = None
18+
self.__lock = threading.Lock()
19+
atexit.register(self._close)
20+
21+
def _close(self):
22+
if self.__io_loop:
23+
self.__io_loop.stop()
24+
25+
def _runner(self):
26+
loop = self.__io_loop
27+
assert loop is not None # noqa
28+
try:
29+
loop.run_forever()
30+
finally:
31+
loop.close()
32+
33+
def run(self, coro):
34+
"""Synchronously run a coroutine on a background thread."""
35+
with self.__lock:
36+
name = f"{threading.current_thread().name} - runner"
37+
if self.__io_loop is None:
38+
self.__io_loop = asyncio.new_event_loop()
39+
self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name)
40+
self.__runner_thread.start()
41+
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
42+
return fut.result(None)
43+
44+
45+
_runner_map = {}
46+
_loop_map = {}
47+
48+
T = TypeVar("T")
49+
50+
def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
51+
"""Wraps coroutine in a function that blocks until it has executed.
52+
53+
Parameters
54+
----------
55+
coro : coroutine-function
56+
The coroutine-function to be executed.
57+
58+
Returns
59+
-------
60+
result :
61+
Whatever the coroutine-function returns.
62+
"""
63+
64+
if not inspect.iscoroutinefunction(coro):
65+
raise AssertionError
66+
67+
def wrapped(*args, **kwargs):
68+
name = threading.current_thread().name
69+
inner = coro(*args, **kwargs)
70+
try:
71+
# If a loop is currently running in this thread,
72+
# use a task runner.
73+
asyncio.get_running_loop()
74+
if name not in _runner_map:
75+
_runner_map[name] = _TaskRunner()
76+
return _runner_map[name].run(inner)
77+
except RuntimeError:
78+
pass
79+
80+
# Run the loop for this thread.
81+
if name not in _loop_map:
82+
_loop_map[name] = asyncio.new_event_loop()
83+
loop = _loop_map[name]
84+
return loop.run_until_complete(inner)
85+
86+
wrapped.__doc__ = coro.__doc__
87+
return wrapped

src/resources/jupyter/notebook.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
import nbformat
1818
from nbclient import NotebookClient
1919
from jupyter_client import KernelManager
20-
from jupyter_core.utils import run_sync
20+
from jupyter_core_utils_vendor import run_sync
21+
import asyncio
2122

2223
# optional import of papermill for params support
2324
try:
@@ -261,10 +262,13 @@ def notebook_init(nb, resources, allow_errors):
261262
client.create_kernel_manager()
262263
client.start_new_kernel()
263264
client.start_new_kernel_client()
264-
info = client.kc.kernel_info()
265265

266266
async def get_info():
267-
return await client.kc.kernel_info()
267+
i = client.kc.kernel_info()
268+
if asyncio.isfuture(i):
269+
return await i
270+
else:
271+
return i
268272
info = run_sync(get_info)()
269273

270274
info_msg = client.wait_for_reply(info)

0 commit comments

Comments
 (0)