Skip to content

Commit cae886a

Browse files
committed
sync
1 parent dfe7620 commit cae886a

File tree

4 files changed

+82
-6
lines changed

4 files changed

+82
-6
lines changed

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from collections import deque
77
from typing import Optional, Set, Dict
88

9+
import grpc
10+
911
from .. import _apis
1012
from ..aio import Driver
1113
from ..issues import Error as YdbError
@@ -145,6 +147,9 @@ async def wait_messages(self):
145147
raise TopicReaderStreamClosedError()
146148

147149
while len(self._message_batches) == 0:
150+
if self._first_error is not None:
151+
raise self._first_error
152+
148153
await self._state_changed.wait()
149154
self._state_changed.clear()
150155

@@ -180,6 +185,8 @@ async def _read_messages_loop(self, stream: IGrpcWrapperAsyncIO):
180185
)
181186

182187
self._state_changed.set()
188+
except grpc.RpcError as e:
189+
183190
except Exception as e:
184191
self._set_first_error(e)
185192
raise e

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
import datetime
33
from unittest import mock
44

5+
import grpc
56
import pytest
67

78
from ydb import aio
8-
from ydb._topic_reader.datatypes import PublicBatch, PublicMessage
9-
from ydb._topic_reader.topic_reader import PublicReaderSettings
10-
from ydb._topic_reader.topic_reader_asyncio import ReaderStream, PartitionSession
11-
from ydb._topic_wrapper.common import OffsetsRange, Codec
12-
from ydb._topic_wrapper.reader import StreamReadMessage
13-
from ydb._topic_wrapper.test_helpers import StreamMock, wait_condition, wait_for_fast
9+
from .datatypes import PublicBatch, PublicMessage
10+
from .topic_reader import PublicReaderSettings
11+
from .topic_reader_asyncio import ReaderStream, PartitionSession
12+
from .._topic_wrapper.common import OffsetsRange, Codec
13+
from .._topic_wrapper.reader import StreamReadMessage
14+
from .._topic_wrapper.test_helpers import StreamMock, wait_condition, wait_for_fast
15+
from ..issues import Unavailable
1416

1517

1618
@pytest.fixture()
@@ -167,6 +169,21 @@ def batch_count():
167169
)))
168170
await wait_condition(lambda: batch_count() > initial_batches)
169171

172+
async def test_convert_errors_to_ydb(self, stream, stream_reader):
173+
class TestError(grpc.RpcError):
174+
_code: grpc.StatusCode
175+
176+
def __init__(self, code: grpc.StatusCode):
177+
self._code = code
178+
179+
def code(self):
180+
return self._code
181+
182+
stream.from_server.put_nowait(TestError(grpc.StatusCode.UNAVAILABLE))
183+
184+
with pytest.raises(Unavailable):
185+
await wait_for_fast(stream_reader.wait_messages())
186+
170187
async def test_init_reader(self, stream, default_reader_settings):
171188
reader = ReaderStream(default_reader_settings)
172189
init_message = StreamReadMessage.InitRequest(
@@ -563,3 +580,9 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi
563580

564581
with pytest.raises(asyncio.QueueEmpty):
565582
stream.from_client.get_nowait()
583+
584+
@pytest.mark.asyncio
585+
class TestReaderReconnector:
586+
async def test_start(self):
587+
pass
588+

ydb/_topic_wrapper/common.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,13 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any:
238238

239239

240240
TokenGetterFuncType = typing.Optional[typing.Callable[[], str]]
241+
242+
243+
def callback_from_asyncio(callback: typing.Union[typing.Callable, typing.Coroutine]) -> [asyncio.Future, asyncio.Task]:
244+
loop = asyncio.get_running_loop()
245+
246+
if asyncio.iscoroutinefunction(callback):
247+
return loop.create_task(callback())
248+
else:
249+
return loop.run_in_executor(None, callback)
250+

ydb/_topic_wrapper/common_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from .common import callback_from_asyncio
6+
7+
8+
@pytest.mark.asyncio
9+
class Test:
10+
async def test_callback_from_asyncio(self):
11+
class TestError(Exception):
12+
pass
13+
14+
def sync_success():
15+
return 1
16+
17+
assert await callback_from_asyncio(sync_success) == 1
18+
19+
def sync_failed():
20+
raise TestError()
21+
22+
with pytest.raises(TestError):
23+
await callback_from_asyncio(sync_failed)
24+
25+
async def async_success():
26+
await asyncio.sleep(0)
27+
return 1
28+
29+
assert await callback_from_asyncio(async_success) == 1
30+
31+
async def async_failed():
32+
await asyncio.sleep(0)
33+
raise TestError()
34+
35+
with pytest.raises(TestError):
36+
await callback_from_asyncio(async_failed)

0 commit comments

Comments
 (0)