Skip to content

Commit 331e1b4

Browse files
committed
use tornado for better compatiblity than raw asyncio
when we are running our own loops for sync apis
1 parent 0873112 commit 331e1b4

File tree

2 files changed

+44
-9
lines changed

2 files changed

+44
-9
lines changed

ipyparallel/_async.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
11
"""Async utilities"""
22
import asyncio
3+
import concurrent.futures
34
import inspect
4-
from concurrent.futures import ThreadPoolExecutor
5+
import threading
56
from functools import partial
67

8+
from tornado.ioloop import IOLoop
9+
10+
11+
def _asyncio_run(coro):
12+
"""Like asyncio.run, but works when there's no event loop"""
13+
# for now: using tornado for broader compatibility with FDs,
14+
# e.g. when using the only partially functional default
15+
# Proactor on windows
16+
loop = IOLoop()
17+
return loop.run_sync(lambda: asyncio.ensure_future(coro))
18+
719

820
class AsyncFirst:
921
"""Wrapper class that defines synchronous `_sync` method wrappers
@@ -16,13 +28,30 @@ class AsyncFirst:
1628

1729
_async_thread = None
1830

31+
def _thread_main(self):
32+
asyncio_loop = asyncio.new_event_loop()
33+
asyncio.set_event_loop(asyncio_loop)
34+
loop = self._thread_loop = IOLoop.current()
35+
loop.add_callback(self._loop_started.set)
36+
loop.start()
37+
1938
def _in_thread(self, async_f, *args, **kwargs):
2039
"""Run an async function in a background thread"""
2140
if self._async_thread is None:
22-
self._async_thread = ThreadPoolExecutor(1)
23-
future = self._async_thread.submit(
24-
lambda: asyncio.run(async_f(*args, **kwargs))
25-
)
41+
self._loop_started = threading.Event()
42+
self._async_thread = threading.Thread(target=self._thread_main, daemon=True)
43+
self._async_thread.start()
44+
self._loop_started.wait(timeout=5)
45+
46+
future = concurrent.futures.Future()
47+
48+
async def thread_callback():
49+
try:
50+
future.set_result(await async_f(*args, **kwargs))
51+
except Exception as e:
52+
future.set_exception(e)
53+
54+
self._thread_loop.add_callback(thread_callback)
2655
return future.result()
2756

2857
def _synchronize(self, async_f, *args, **kwargs):
@@ -31,10 +60,16 @@ def _synchronize(self, async_f, *args, **kwargs):
3160
Uses asyncio.run if asyncio is not running,
3261
otherwise puts it in a background thread
3362
"""
34-
if asyncio.get_event_loop().is_running():
63+
try:
64+
loop = asyncio.get_event_loop()
65+
except RuntimeError:
66+
# sometimes get returns a RuntimeError
67+
# if there's no current loop under certain policies
68+
loop = None
69+
if loop and loop.is_running():
3570
return self._in_thread(async_f, *args, **kwargs)
3671
else:
37-
return asyncio.run(async_f(*args, **kwargs))
72+
return _asyncio_run(async_f(*args, **kwargs))
3873

3974
def __getattr__(self, name):
4075
if name.endswith("_sync"):

ipyparallel/tests/test_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ def test_getattr():
3232
a.sync_method_sync
3333

3434

35-
def test_asyncio_run():
35+
def test_sync_no_asyncio():
3636
a = A()
3737
assert a.async_method_sync() == 'async'
3838
assert a._async_thread is None
3939

4040

41-
async def test_asyncio_sync():
41+
async def test_sync_asyncio():
4242
a = A()
4343
assert a.async_method_sync() == 'async'
4444
assert a._async_thread is not None

0 commit comments

Comments
 (0)