Skip to content

Commit c62147d

Browse files
committed
parse stop partition message
handle unexpected message for topic reader
1 parent 6718229 commit c62147d

File tree

5 files changed

+103
-33
lines changed

5 files changed

+103
-33
lines changed

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import datetime
24
import enum
35
import typing
@@ -642,11 +644,20 @@ def to_proto(
642644
return res
643645

644646
@dataclass
645-
class StopPartitionSessionRequest:
647+
class StopPartitionSessionRequest(IFromProto):
646648
partition_session_id: int
647649
graceful: bool
648650
committed_offset: int
649651

652+
@staticmethod
653+
def from_proto(msg: ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRequest) -> StreamReadMessage.StopPartitionSessionRequest:
654+
return StreamReadMessage.StopPartitionSessionRequest(
655+
partition_session_id=msg.partition_session_id,
656+
graceful=msg.graceful,
657+
committed_offset=msg.committed_offset,
658+
)
659+
660+
650661
@dataclass
651662
class StopPartitionSessionResponse:
652663
partition_session_id: int
@@ -707,6 +718,13 @@ def from_proto(
707718
msg.start_partition_session_request,
708719
),
709720
)
721+
elif mess_type == "stop_partition_session_request":
722+
return StreamReadMessage.FromServer(
723+
server_status=server_status,
724+
server_message=StreamReadMessage.StopPartitionSessionRequest.from_proto(
725+
msg.stop_partition_session_request
726+
)
727+
)
710728
elif mess_type == "update_token_response":
711729
return StreamReadMessage.FromServer(
712730
server_status=server_status,
@@ -720,9 +738,6 @@ def from_proto(
720738
)
721739
)
722740

723-
# todo replace exception to log
724-
raise NotImplementedError()
725-
726741

727742
ReaderMessagesFromClientToServer = Union[
728743
StreamReadMessage.InitRequest,

ydb/_topic_reader/topic_reader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import concurrent.futures
22
import enum
33
import datetime
4+
import logging
45
from dataclasses import dataclass
56
from typing import (
67
Union,
@@ -52,6 +53,7 @@ class PublicReaderSettings:
5253
# decoder_executor, must be set for handle non raw messages
5354
decoder_executor: Optional[concurrent.futures.Executor] = None
5455
update_token_interval: Union[int, float] = 3600
56+
logger: Optional[logging.Logger] = None
5557

5658
def __post_init__(self):
5759
# check possible create init message

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import concurrent.futures
5+
import copy
56
import gzip
67
import typing
78
from asyncio import Task
@@ -26,7 +27,9 @@
2627
Codec,
2728
)
2829
from .._errors import check_retriable_error
30+
import logging
2931

32+
_module_logger = logging.getLogger(__name__)
3033

3134
class TopicReaderError(YdbError):
3235
pass
@@ -62,6 +65,7 @@ class PublicAsyncIOReader:
6265
_closed: bool
6366
_reconnector: ReaderReconnector
6467
_parent: typing.Any # need for prevent close parent client by GC
68+
_logger: logging.Logger
6569

6670
def __init__(
6771
self,
@@ -70,8 +74,14 @@ def __init__(
7074
*,
7175
_parent=None,
7276
):
77+
if settings.logger:
78+
self._logger = settings.logger
79+
else:
80+
self._logger = _module_logger
81+
7382
self._loop = asyncio.get_running_loop()
7483
self._closed = False
84+
self._logger = settings.logger
7585
self._reconnector = ReaderReconnector(driver, settings)
7686
self._parent = _parent
7787

@@ -139,14 +149,19 @@ class ReaderReconnector:
139149
_settings: topic_reader.PublicReaderSettings
140150
_driver: Driver
141151
_background_tasks: Set[Task]
152+
_logger: logging.Logger
142153

143154
_state_changed: asyncio.Event
144155
_stream_reader: Optional["ReaderStream"]
145156
_first_error: asyncio.Future[YdbError]
146157

147158
def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings):
148-
self._id = self._static_reader_reconnector_counter.inc_and_get()
159+
if settings.logger:
160+
self._logger = settings.logger
161+
else:
162+
self._logger = _module_logger
149163

164+
self._id = self._static_reader_reconnector_counter.inc_and_get()
150165
self._settings = settings
151166
self._driver = driver
152167
self._background_tasks = set()
@@ -234,6 +249,7 @@ class ReaderStream:
234249
_buffer_size_bytes: int # use for init request, then for debug purposes only
235250
_decode_executor: concurrent.futures.Executor
236251
_decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes
252+
_logger: logging.Logger
237253

238254
if typing.TYPE_CHECKING:
239255
_batches_to_decode: asyncio.Queue[datatypes.PublicBatch]
@@ -255,6 +271,11 @@ def __init__(
255271
settings: topic_reader.PublicReaderSettings,
256272
get_token_function: Optional[Callable[[], str]] = None,
257273
):
274+
if settings.logger:
275+
self._logger = settings.logger
276+
else:
277+
self._logger = _module_logger
278+
258279
self._loop = asyncio.get_running_loop()
259280
self._id = ReaderStream._static_id_counter.inc_and_get()
260281
self._reader_reconnector_id = reader_reconnector_id
@@ -395,34 +416,37 @@ async def _read_messages_loop(self):
395416
)
396417
)
397418
while True:
398-
message = await self._stream.receive() # type: StreamReadMessage.FromServer
399-
_process_response(message.server_status)
400-
401-
if isinstance(message.server_message, StreamReadMessage.ReadResponse):
402-
self._on_read_response(message.server_message)
403-
404-
elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
405-
self._on_commit_response(message.server_message)
406-
407-
elif isinstance(
408-
message.server_message,
409-
StreamReadMessage.StartPartitionSessionRequest,
410-
):
411-
self._on_start_partition_session(message.server_message)
412-
413-
elif isinstance(
414-
message.server_message,
415-
StreamReadMessage.StopPartitionSessionRequest,
416-
):
417-
self._on_partition_session_stop(message.server_message)
418-
419-
elif isinstance(message.server_message, UpdateTokenResponse):
420-
self._update_token_event.set()
421-
422-
else:
423-
raise NotImplementedError(
424-
"Unexpected type of StreamReadMessage.FromServer message: %s" % message.server_message
425-
)
419+
try:
420+
message = await self._stream.receive() # type: StreamReadMessage.FromServer
421+
_process_response(message.server_status)
422+
423+
if isinstance(message.server_message, StreamReadMessage.ReadResponse):
424+
self._on_read_response(message.server_message)
425+
426+
elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
427+
self._on_commit_response(message.server_message)
428+
429+
elif isinstance(
430+
message.server_message,
431+
StreamReadMessage.StartPartitionSessionRequest,
432+
):
433+
self._on_start_partition_session(message.server_message)
434+
435+
elif isinstance(
436+
message.server_message,
437+
StreamReadMessage.StopPartitionSessionRequest,
438+
):
439+
self._on_partition_session_stop(message.server_message)
440+
441+
elif isinstance(message.server_message, UpdateTokenResponse):
442+
self._update_token_event.set()
443+
444+
else:
445+
raise issues.UnexpectedGrpcMessage(
446+
"Unexpected message in _read_messages_loop: %s" % type(message.server_message)
447+
)
448+
except issues.UnexpectedGrpcMessage:
449+
self._logger.exception("unexpected message in stream reader")
426450

427451
self._state_changed.set()
428452
except Exception as e:

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,30 @@ async def test_update_token(self, stream):
11271127

11281128
await reader.close()
11291129

1130+
async def test_read_unknown_message(self, stream, stream_reader):
1131+
class TestMessage:
1132+
pass
1133+
1134+
logged = asyncio.Event()
1135+
1136+
def set_logged(*args, **kwargs):
1137+
logged.set()
1138+
1139+
stream_reader._logger = mock.Mock()
1140+
stream_reader._logger.exception = mock.Mock(side_effect=set_logged)
1141+
1142+
# noinspection PyTypeChecker
1143+
stream.from_server.put_nowait(StreamReadMessage.FromServer(
1144+
server_status=ServerStatus(
1145+
status=issues.StatusCode.SUCCESS,
1146+
issues=[],
1147+
),
1148+
server_message=TestMessage(),
1149+
))
1150+
await wait_for_fast(logged.wait())
1151+
1152+
stream_reader._logger.exception.assert_called_once()
1153+
11301154

11311155
@pytest.mark.asyncio
11321156
class TestReaderReconnector:

ydb/issues.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ class SessionPoolEmpty(Error, queue.Empty):
156156
status = StatusCode.SESSION_POOL_EMPTY
157157

158158

159+
class UnexpectedGrpcMessage(Error):
160+
def __init__(self, message: str):
161+
super().__init__(message)
162+
163+
159164
def _format_issues(issues):
160165
if not issues:
161166
return ""

0 commit comments

Comments
 (0)