Skip to content

Commit 0cff446

Browse files
committed
sync
1 parent 8a0bda1 commit 0cff446

File tree

7 files changed

+127
-39
lines changed

7 files changed

+127
-39
lines changed

ydb/_topic_reader/topic_reader.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Any, Dict,
1717
)
1818

19+
from ydb import RetrySettings
1920
from ydb._topic_wrapper.common import OffsetsRange, TokenGetterFuncType
2021
from ydb._topic_wrapper.reader import StreamReadMessage
2122

@@ -281,6 +282,9 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
281282
consumer=self.consumer,
282283
)
283284

285+
def _retry_settings(self)->RetrySettings:
286+
return RetrySettings(idempotent=True)
287+
284288

285289
class Events:
286290
class OnCommit:

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88

99
import grpc
1010

11-
from .. import _apis, issues
11+
import ydb
12+
from .. import _apis, issues, RetrySettings
1213
from ..aio import Driver
13-
from ..issues import Error as YdbError
14+
from ..issues import (
15+
Error as YdbError,
16+
_process_response
17+
)
1418
from .datatypes import PartitionSession, PublicMessage, PublicBatch
1519
from .topic_reader import PublicReaderSettings
1620
from .._topic_wrapper.common import TokenGetterFuncType, IGrpcWrapperAsyncIO, SupportedDriverType, GrpcWrapperAsyncIO
1721
from .._topic_wrapper.reader import StreamReadMessage
22+
from .._errors import check_retriable_error
1823

1924

2025
class TopicReaderError(YdbError):
@@ -48,22 +53,42 @@ class ReaderReconnector:
4853

4954
_state_changed: asyncio.Event
5055
_stream_reader: Optional["ReaderStream"]
56+
_first_error: asyncio.Future[ydb.Error]
5157

5258
def __init__(self, driver: Driver, settings: PublicReaderSettings):
5359
self._settings = settings
5460
self._driver = driver
5561
self._background_tasks = set()
62+
self._retry_settins = RetrySettings(idempotent=True) # get from settings
5663

5764
self._state_changed = asyncio.Event()
5865
self._stream_reader = None
59-
self._background_tasks.add(asyncio.create_task(self.start()))
66+
self._background_tasks.add(asyncio.create_task(self._connection_loop()))
67+
self._first_error = asyncio.get_running_loop().create_future()
6068

61-
async def start(self):
62-
self._stream_reader = await ReaderStream.create(self._driver, self._settings)
63-
self._state_changed.set()
69+
async def _connection_loop(self):
70+
attempt = 0
71+
while True:
72+
try:
73+
self._stream_reader = await ReaderStream.create(self._driver, self._settings)
74+
self._state_changed.set()
75+
self._stream_reader._state_changed.wait()
76+
except Exception as err:
77+
# todo reset attempts when connection established
78+
79+
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
80+
if not retry_info.is_retriable:
81+
self._set_first_error(err)
82+
return
83+
await asyncio.sleep(retry_info.sleep_timeout_seconds)
84+
85+
attempt += 1
6486

6587
async def wait_message(self):
6688
while True:
89+
if self._first_error.done():
90+
raise self._first_error.result()
91+
6792
if self._stream_reader is not None:
6893
await self._stream_reader.wait_messages()
6994
return
@@ -81,6 +106,13 @@ async def close(self):
81106

82107
await asyncio.wait(self._background_tasks)
83108

109+
def _set_first_error(self, err: issues.Error):
110+
try:
111+
self._first_error.set_result(err)
112+
self._state_changed.set()
113+
except asyncio.InvalidStateError:
114+
# skip if already has result
115+
pass
84116

85117
class ReaderStream:
86118
_token_getter: Optional[TokenGetterFuncType]
@@ -93,8 +125,8 @@ class ReaderStream:
93125

94126
_state_changed: asyncio.Event
95127
_closed: bool
96-
_first_error: Optional[YdbError]
97128
_message_batches: typing.Deque[PublicBatch]
129+
first_error: asyncio.Future[YdbError]
98130

99131
def __init__(self, settings: PublicReaderSettings):
100132
self._token_getter = settings._token_getter
@@ -107,7 +139,7 @@ def __init__(self, settings: PublicReaderSettings):
107139

108140
self._state_changed = asyncio.Event()
109141
self._closed = False
110-
self._first_error = None
142+
self.first_error = asyncio.get_running_loop().create_future()
111143
self._message_batches = deque()
112144

113145
@staticmethod
@@ -144,8 +176,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
144176

145177
async def wait_messages(self):
146178
while True:
147-
if self._first_error is not None:
148-
raise self._first_error
179+
if self._get_first_error() is not None:
180+
raise self._get_first_error()
149181

150182
if len(self._message_batches) > 0:
151183
return
@@ -154,8 +186,8 @@ async def wait_messages(self):
154186
self._state_changed.clear()
155187

156188
def receive_batch_nowait(self):
157-
if self._first_error is not None:
158-
raise self._first_error
189+
if self._get_first_error() is not None:
190+
raise self._get_first_error()
159191

160192
try:
161193
batch = self._message_batches.popleft()
@@ -173,6 +205,7 @@ async def _read_messages_loop(self, stream: IGrpcWrapperAsyncIO):
173205
))
174206
while True:
175207
message = await stream.receive() # type: StreamReadMessage.FromServer
208+
_process_response(message.server_status)
176209
if isinstance(message.server_message, StreamReadMessage.ReadResponse):
177210
self._on_read_response(message.server_message)
178211
elif isinstance(message.server_message, StreamReadMessage.StartPartitionSessionRequest):
@@ -285,9 +318,18 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
285318
return batches
286319

287320
def _set_first_error(self, err):
288-
if self._first_error is None:
289-
self._first_error = err
321+
try:
322+
self.first_error.set_result(err)
290323
self._state_changed.set()
324+
except asyncio.InvalidStateError:
325+
# skip later set errors
326+
pass
327+
328+
def _get_first_error(self):
329+
if self.first_error.done():
330+
return self.first_error.result()
331+
else:
332+
return None
291333

292334
async def close(self):
293335
if self._closed:

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
import pytest
77

88
import ydb
9-
from ydb import aio
9+
from ydb import aio, issues
1010
from .datatypes import PublicBatch, PublicMessage
1111
from .topic_reader import PublicReaderSettings
12-
from .topic_reader_asyncio import ReaderStream, PartitionSession
13-
from .._topic_wrapper.common import OffsetsRange, Codec, ServerStatus
12+
from .topic_reader_asyncio import ReaderStream, PartitionSession, ReaderReconnector
13+
from .._topic_wrapper.common import OffsetsRange, Codec, ServerStatus, UpdateTokenResponse
1414
from .._topic_wrapper.reader import StreamReadMessage
1515
from .._topic_wrapper.test_helpers import StreamMock, wait_condition, wait_for_fast
1616
from ..issues import Unavailable
@@ -135,14 +135,14 @@ async def stream_reader_started(self, stream, default_reader_settings, partition
135135
async def stream_reader(self, stream_reader_started: ReaderStream):
136136
yield stream_reader_started
137137

138-
assert stream_reader_started._first_error is None
138+
assert stream_reader_started._get_first_error() is None
139139
await stream_reader_started.close()
140140

141141
@pytest.fixture()
142142
async def stream_reader_finish_with_error(self, stream_reader_started: ReaderStream):
143143
yield stream_reader_started
144144

145-
assert stream_reader_started._first_error is not None
145+
assert stream_reader_started._get_first_error() is not None
146146
await stream_reader_started.close()
147147

148148

@@ -195,16 +195,9 @@ def batch_count():
195195
)))
196196
await wait_condition(lambda: batch_count() > initial_batches)
197197

198-
async def test_first_error(self, stream, stream_reader_finish_with_error):
199-
class TestError(grpc.RpcError, grpc.Call):
200-
def __init__(self):
201-
pass
202-
203-
def code(self):
204-
return grpc.StatusCode.UNAUTHENTICATED
205-
206-
def details(self):
207-
return "test error"
198+
async def test_unknown_error(self, stream, stream_reader_finish_with_error):
199+
class TestError(Exception):
200+
pass
208201

209202
test_err = TestError()
210203
stream.from_server.put_nowait(test_err)
@@ -215,6 +208,24 @@ def details(self):
215208
with pytest.raises(TestError):
216209
stream_reader_finish_with_error.receive_batch_nowait()
217210

211+
async def test_error_from_status_code(self, stream, stream_reader_finish_with_error):
212+
# noinspection PyTypeChecker
213+
stream.from_server.put_nowait(
214+
StreamReadMessage.FromServer(
215+
server_status=ServerStatus(
216+
status=issues.StatusCode.OVERLOADED,
217+
issues=[],
218+
),
219+
server_message=None,
220+
)
221+
)
222+
223+
with pytest.raises(issues.Overloaded):
224+
await wait_for_fast(stream_reader_finish_with_error.wait_messages())
225+
226+
with pytest.raises(issues.Overloaded):
227+
stream_reader_finish_with_error.receive_batch_nowait()
228+
218229
async def test_init_reader(self, stream, default_reader_settings):
219230
reader = ReaderStream(default_reader_settings)
220231
init_message = StreamReadMessage.InitRequest(
@@ -619,8 +630,15 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi
619630
with pytest.raises(asyncio.QueueEmpty):
620631
stream.from_client.get_nowait()
621632

633+
622634
@pytest.mark.asyncio
623635
class TestReaderReconnector:
624-
async def test_start(self):
625-
pass
636+
async def test_reconnect_on_repeatable_error(self, monkeypatch):
637+
def stream_create():
638+
pass
639+
640+
with mock.patch.object(ReaderStream, "create", stream_create):
641+
reconnector = ReaderReconnector(None, PublicReaderSettings("", ""))
642+
await reconnector.wait_message()
626643

644+
raise NotImplementedError()

ydb/_topic_wrapper/common.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,11 @@ class ServerStatus(IFromProto):
211211

212212
def __init__(
213213
self,
214-
status_code: ydb_status_codes_pb2.StatusIds.StatusCode,
215-
grpc_issues: typing.Iterable[ydb_issue_message_pb2.IssueMessage],
214+
status: issues.StatusCode,
215+
issues: typing.Iterable[typing.Any],
216216
):
217-
self._grpc_status_code = status_code
218-
self._issues = grpc_issues
217+
self.status = status
218+
self.issues = issues
219219

220220
def __str__(self):
221221
return self.__repr__()
@@ -228,7 +228,7 @@ def from_proto(msg: typing.Union[
228228
return ServerStatus(msg.status, msg.issues)
229229

230230
def is_success(self) -> bool:
231-
return self._grpc_status_code == ydb_status_codes_pb2.StatusIds.SUCCESS
231+
return self.status == issues.StatusCode.SUCCESS
232232

233233
@classmethod
234234
def issue_to_str(cls, issue: ydb_issue_message_pb2.IssueMessage):

ydb/_topic_wrapper/common_test.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import grpc
44
import pytest
55

6-
from .common import callback_from_asyncio, GrpcWrapperAsyncIO
6+
from .common import callback_from_asyncio, GrpcWrapperAsyncIO, ServerStatus
77
from .. import issues
88

99
# Workaround for good autocomplete in IDE and universal import at runtime
@@ -90,3 +90,22 @@ async def __anext__(self):
9090
with pytest.raises(issues.Overloaded):
9191
await wrapper.receive()
9292

93+
94+
class TestServerStatus:
95+
def test_success(self):
96+
status = ServerStatus(
97+
status=ydb_status_codes_pb2.StatusIds.SUCCESS,
98+
issues=[],
99+
)
100+
assert status.is_success()
101+
assert issues._process_response(status) is None
102+
103+
def test_failed(self):
104+
status = ServerStatus(
105+
status=ydb_status_codes_pb2.StatusIds.OVERLOADED,
106+
issues=[],
107+
)
108+
assert not status.is_success()
109+
with pytest.raises(issues.Overloaded):
110+
issues._process_response(status)
111+

ydb/_topic_wrapper/reader.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,17 +247,20 @@ class FromServer(IFromProto):
247247
@staticmethod
248248
def from_proto(msg: ydb_topic_pb2.StreamReadMessage.FromServer) -> "StreamReadMessage.FromServer":
249249
mess_type = msg.WhichOneof("server_message")
250-
server_status = ServerStatus.from_proto(ms)
250+
server_status = ServerStatus.from_proto(msg)
251251
if mess_type == "read_response":
252252
return StreamReadMessage.FromServer(
253-
server_message=StreamReadMessage.ReadResponse.from_proto(msg.read_response)
253+
server_status=server_status,
254+
server_message=StreamReadMessage.ReadResponse.from_proto(msg.read_response),
254255
)
255256
elif mess_type == "init_response":
256257
return StreamReadMessage.FromServer(
258+
server_status=server_status,
257259
server_message=StreamReadMessage.InitResponse.from_proto(msg.init_response),
258260
)
259261
elif mess_type == "start_partition_session_request":
260262
return StreamReadMessage.FromServer(
263+
server_status=server_status,
261264
server_message=StreamReadMessage.StartPartitionSessionRequest.from_proto(msg.start_partition_session_request)
262265
)
263266

ydb/issues.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# -*- coding: utf-8 -*-
2+
import abc
3+
24
from google.protobuf import text_format
35
import enum
46
from six.moves import queue

0 commit comments

Comments
 (0)