Skip to content

Commit d0d409f

Browse files
committed
implement calls from sync to async
1 parent 350f92f commit d0d409f

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed

ydb/_topic_common/common.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,89 @@ def start_event_loop():
6060

6161
_shared_event_loop = event_loop_set_done.result()
6262
return _shared_event_loop
63+
64+
65+
class CallFromSyncToAsync:
66+
_loop: asyncio.AbstractEventLoop
67+
68+
def __init__(self, loop: asyncio.AbstractEventLoop):
69+
self._loop = loop
70+
71+
def unsafe_call_with_future(self, coro: typing.Coroutine) -> concurrent.futures.Future:
72+
"""
73+
returned result from coro may be lost
74+
"""
75+
return asyncio.run_coroutine_threadsafe(coro, self._loop)
76+
77+
def unsafe_call_with_result(self, coro: typing.Coroutine, timeout: typing.Union[int, float, None]):
78+
"""
79+
returned result from coro may be lost by race future cancel by timeout and return value from coroutine
80+
"""
81+
f = self.unsafe_call_with_future(coro)
82+
try:
83+
return f.result(timeout)
84+
except concurrent.futures.TimeoutError:
85+
raise TimeoutError()
86+
finally:
87+
f.cancel()
88+
89+
def safe_call_with_result(self, coro: typing.Coroutine, timeout: typing.Union[int, float]):
90+
"""
91+
no lost returned value from coro, but may be slower especially timeout latency - it wait coroutine cancelation.
92+
"""
93+
94+
if timeout <= 0:
95+
return self._safe_call_fast(coro)
96+
97+
async def call_coro():
98+
task = self._loop.create_task(coro)
99+
try:
100+
res = await asyncio.wait_for(task, timeout)
101+
return res
102+
except BaseException as err:
103+
try:
104+
res = await task
105+
return res
106+
except asyncio.CancelledError:
107+
pass
108+
109+
# return builtin TimeoutError instead of asyncio.TimeoutError
110+
raise TimeoutError()
111+
112+
113+
return asyncio.run_coroutine_threadsafe(call_coro(), self._loop).result()
114+
115+
def _safe_call_fast(self, coro: typing.Coroutine):
116+
"""
117+
no lost returned value from coro, but may be slower especially timeout latency - it wait coroutine cancelation.
118+
Wait coroutine result only one loop.
119+
"""
120+
res = concurrent.futures.Future()
121+
122+
async def call_coro():
123+
try:
124+
res.set_result(await coro)
125+
except asyncio.CancelledError:
126+
res.set_exception(TimeoutError())
127+
128+
async def sleep0():
129+
await asyncio.sleep(0)
130+
131+
coro_future = asyncio.run_coroutine_threadsafe(call_coro(), self._loop)
132+
asyncio.run_coroutine_threadsafe(sleep0(), self._loop).result()
133+
coro_future.cancel()
134+
return res.result()
135+
136+
def call_sync(self, callback: typing.Callable[[], typing.Any]) -> typing.Any:
137+
result = concurrent.futures.Future()
138+
139+
def call_callback():
140+
try:
141+
res = callback()
142+
result.set_result(res)
143+
except BaseException as err:
144+
result.set_exception(err)
145+
146+
self._loop.call_soon_threadsafe(call_callback)
147+
148+
return result.result()

ydb/_topic_common/common_test.py

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import asyncio
2+
import threading
3+
import time
24
import typing
35

46
import grpc
57
import pytest
68

9+
from .common import CallFromSyncToAsync
710
from .._grpc.grpcwrapper.common_utils import (
811
GrpcWrapperAsyncIO,
912
ServerStatus,
@@ -25,6 +28,23 @@
2528
)
2629

2730

31+
@pytest.fixture()
32+
def separate_loop():
33+
loop = asyncio.new_event_loop()
34+
35+
def run_loop():
36+
loop.run_forever()
37+
pass
38+
39+
t = threading.Thread(target=run_loop, name="test separate loop")
40+
t.start()
41+
42+
yield loop
43+
44+
loop.call_soon_threadsafe(lambda: loop.stop())
45+
t.join()
46+
47+
2848
@pytest.mark.asyncio
2949
class Test:
3050
async def test_callback_from_asyncio(self):
@@ -111,3 +131,163 @@ def test_failed(self):
111131
assert not status.is_success()
112132
with pytest.raises(issues.Overloaded):
113133
issues._process_response(status)
134+
135+
136+
@pytest.mark.asyncio
137+
class TestCallFromSyncToAsync:
138+
@pytest.fixture()
139+
def caller(self, separate_loop):
140+
return CallFromSyncToAsync(separate_loop)
141+
142+
def test_unsafe_call_with_future(self, separate_loop, caller):
143+
callback_loop = None
144+
145+
async def callback():
146+
nonlocal callback_loop
147+
callback_loop = asyncio.get_running_loop()
148+
return 1
149+
150+
f = caller.unsafe_call_with_future(callback())
151+
152+
assert f.result() == 1
153+
assert callback_loop is separate_loop
154+
155+
def test_unsafe_call_with_result_ok(self, separate_loop, caller):
156+
callback_loop = None
157+
158+
async def callback():
159+
nonlocal callback_loop
160+
callback_loop = asyncio.get_running_loop()
161+
return 1
162+
163+
res = caller.unsafe_call_with_result(callback(), None)
164+
165+
assert res == 1
166+
assert callback_loop is separate_loop
167+
168+
def test_unsafe_call_with_result_timeout(self, separate_loop, caller):
169+
timeout = 0.01
170+
callback_loop = None
171+
172+
async def callback():
173+
nonlocal callback_loop
174+
callback_loop = asyncio.get_running_loop()
175+
await asyncio.sleep(1)
176+
return 1
177+
178+
start = time.monotonic()
179+
with pytest.raises(TimeoutError):
180+
caller.unsafe_call_with_result(callback(), timeout)
181+
finished = time.monotonic()
182+
183+
assert callback_loop is separate_loop
184+
assert finished - start > timeout
185+
186+
def test_safe_call_with_result_ok(self, separate_loop, caller):
187+
callback_loop = None
188+
189+
async def callback():
190+
nonlocal callback_loop
191+
callback_loop = asyncio.get_running_loop()
192+
return 1
193+
194+
res = caller.safe_call_with_result(callback(), 1)
195+
196+
assert res == 1
197+
assert callback_loop is separate_loop
198+
199+
def test_safe_call_with_result_timeout(self, separate_loop, caller):
200+
timeout = 0.01
201+
callback_loop = None
202+
cancelled = False
203+
204+
async def callback():
205+
nonlocal callback_loop, cancelled
206+
callback_loop = asyncio.get_running_loop()
207+
try:
208+
await asyncio.sleep(1)
209+
except asyncio.CancelledError:
210+
cancelled = True
211+
raise
212+
213+
return 1
214+
215+
start = time.monotonic()
216+
with pytest.raises(TimeoutError):
217+
caller.safe_call_with_result(callback(), timeout)
218+
finished = time.monotonic()
219+
220+
async def sleep0():
221+
await asyncio.sleep(0)
222+
223+
# wait one loop for handle task cancelation
224+
asyncio.run_coroutine_threadsafe(sleep0(), separate_loop)
225+
226+
assert callback_loop is separate_loop
227+
assert finished - start > timeout
228+
assert cancelled
229+
230+
def test_safe_callback_with_0_timeout_ok(self, separate_loop, caller):
231+
callback_loop = None
232+
233+
async def f1():
234+
return 1
235+
236+
async def f2():
237+
return await f1()
238+
239+
async def callback():
240+
nonlocal callback_loop
241+
callback_loop = asyncio.get_running_loop()
242+
return await f2()
243+
244+
res = caller.safe_call_with_result(callback(), 0)
245+
assert callback_loop is separate_loop
246+
assert res == 1
247+
248+
def test_safe_callback_with_0_timeout_timeout(self, separate_loop, caller):
249+
callback_loop = None
250+
cancelled = False
251+
252+
async def callback():
253+
try:
254+
nonlocal callback_loop, cancelled
255+
256+
callback_loop = asyncio.get_running_loop()
257+
await asyncio.sleep(1)
258+
except asyncio.CancelledError:
259+
cancelled = True
260+
raise
261+
262+
with pytest.raises(TimeoutError):
263+
caller.safe_call_with_result(callback(), 0)
264+
265+
assert callback_loop is separate_loop
266+
assert cancelled
267+
268+
def test_call_sync_ok(self, separate_loop, caller):
269+
callback_eventloop = None
270+
271+
def callback():
272+
nonlocal callback_eventloop
273+
callback_eventloop = asyncio.get_running_loop()
274+
return 1
275+
276+
res = caller.call_sync(callback)
277+
assert callback_eventloop is separate_loop
278+
assert res == 1
279+
280+
def test_call_sync_error(self, separate_loop, caller):
281+
callback_eventloop = None
282+
283+
class TestError(RuntimeError):
284+
pass
285+
286+
def callback():
287+
nonlocal callback_eventloop
288+
callback_eventloop = asyncio.get_running_loop()
289+
raise TestError
290+
291+
with pytest.raises(TestError):
292+
caller.call_sync(callback)
293+
assert callback_eventloop is separate_loop

0 commit comments

Comments
 (0)