Skip to content

Commit 3c8aaf0

Browse files
authored
Merge pull request #229 V3 read one by one
2 parents 9c8950b + 8c10924 commit 3c8aaf0

File tree

5 files changed

+208
-19
lines changed

5 files changed

+208
-19
lines changed

tests/topics/test_topic_reader.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,26 @@
55

66
@pytest.mark.asyncio
77
class TestTopicReaderAsyncIO:
8+
async def test_read_batch(
9+
self, driver, topic_path, topic_with_messages, topic_consumer
10+
):
11+
reader = driver.topic_client.reader(topic_consumer, topic_path)
12+
batch = await reader.receive_batch()
13+
14+
assert batch is not None
15+
assert len(batch.messages) > 0
16+
17+
await reader.close()
18+
819
async def test_read_message(
920
self, driver, topic_path, topic_with_messages, topic_consumer
1021
):
1122
reader = driver.topic_client.reader(topic_consumer, topic_path)
23+
msg = await reader.receive_message()
24+
25+
assert msg is not None
26+
assert msg.seqno
1227

13-
assert await reader.receive_batch() is not None
1428
await reader.close()
1529

1630
async def test_read_and_commit_message(
@@ -59,12 +73,26 @@ def decode(b: bytes):
5973

6074

6175
class TestTopicReaderSync:
76+
def test_read_batch(
77+
self, driver_sync, topic_path, topic_with_messages, topic_consumer
78+
):
79+
reader = driver_sync.topic_client.reader(topic_consumer, topic_path)
80+
batch = reader.receive_batch()
81+
82+
assert batch is not None
83+
assert len(batch.messages) > 0
84+
85+
reader.close()
86+
6287
def test_read_message(
6388
self, driver_sync, topic_path, topic_with_messages, topic_consumer
6489
):
6590
reader = driver_sync.topic_client.reader(topic_consumer, topic_path)
91+
msg = reader.receive_message()
92+
93+
assert msg is not None
94+
assert msg.seqno
6695

67-
assert reader.receive_batch() is not None
6896
reader.close()
6997

7098
def test_read_and_commit_message(

ydb/_topic_reader/datatypes.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
179179
self.messages[-1]._commit_get_offsets_range().end,
180180
)
181181

182+
def empty(self) -> bool:
183+
return len(self.messages) == 0
184+
182185
# ISessionAlive implementation
183186
@property
184187
def is_alive(self) -> bool:
@@ -187,3 +190,6 @@ def is_alive(self) -> bool:
187190
state == PartitionSession.State.Active
188191
or state == PartitionSession.State.GracefulShutdown
189192
)
193+
194+
def pop_message(self) -> PublicMessage:
195+
return self.messages.pop(0)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,6 @@ def messages(
9595
"""
9696
raise NotImplementedError()
9797

98-
async def receive_message(self) -> typing.Union[topic_reader.PublicMessage, None]:
99-
"""
100-
Block until receive new message
101-
102-
use asyncio.wait_for for wait with timeout.
103-
"""
104-
raise NotImplementedError()
105-
10698
def batches(
10799
self,
108100
*,
@@ -133,6 +125,15 @@ async def receive_batch(
133125
await self._reconnector.wait_message()
134126
return self._reconnector.receive_batch_nowait()
135127

128+
async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
129+
"""
130+
Block until receive new message
131+
132+
use asyncio.wait_for for wait with timeout.
133+
"""
134+
await self._reconnector.wait_message()
135+
return self._reconnector.receive_message_nowait()
136+
136137
async def commit_on_exit(
137138
self, mess: datatypes.ICommittable
138139
) -> typing.AsyncContextManager:
@@ -244,6 +245,9 @@ async def wait_message(self):
244245
def receive_batch_nowait(self):
245246
return self._stream_reader.receive_batch_nowait()
246247

248+
def receive_message_nowait(self):
249+
return self._stream_reader.receive_message_nowait()
250+
247251
def commit(
248252
self, batch: datatypes.ICommittable
249253
) -> datatypes.PartitionSession.CommitAckWaiter:
@@ -397,12 +401,24 @@ def receive_batch_nowait(self):
397401
raise self._get_first_error()
398402

399403
if not self._message_batches:
400-
return
404+
return None
401405

402406
batch = self._message_batches.popleft()
403407
self._buffer_release_bytes(batch._bytes_size)
404408
return batch
405409

410+
def receive_message_nowait(self):
411+
try:
412+
batch = self._message_batches[0]
413+
message = batch.pop_message()
414+
except IndexError:
415+
return None
416+
417+
if batch.empty():
418+
self._message_batches.popleft()
419+
420+
return message
421+
406422
def commit(
407423
self, batch: datatypes.ICommittable
408424
) -> datatypes.PartitionSession.CommitAckWaiter:

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import datetime
55
import gzip
66
import typing
7+
from collections import deque
78
from dataclasses import dataclass
89
from unittest import mock
910

@@ -53,6 +54,34 @@ def default_executor():
5354
executor.shutdown()
5455

5556

57+
def stub_partition_session():
58+
return datatypes.PartitionSession(
59+
id=0,
60+
state=datatypes.PartitionSession.State.Active,
61+
topic_path="asd",
62+
partition_id=1,
63+
committed_offset=0,
64+
reader_reconnector_id=415,
65+
reader_stream_id=513,
66+
)
67+
68+
69+
def stub_message(id: int):
70+
return PublicMessage(
71+
seqno=id,
72+
created_at=datetime.datetime(2023, 3, 18, 14, 15),
73+
message_group_id="",
74+
session_metadata={},
75+
offset=0,
76+
written_at=datetime.datetime(2023, 3, 18, 14, 15),
77+
producer_id="",
78+
data=bytes(),
79+
_partition_session=stub_partition_session(),
80+
_commit_start_offset=0,
81+
_commit_end_offset=1,
82+
)
83+
84+
5685
@pytest.fixture()
5786
def default_reader_settings(default_executor):
5887
return PublicReaderSettings(
@@ -179,7 +208,9 @@ async def stream_reader_finish_with_error(
179208

180209
@staticmethod
181210
def create_message(
182-
partition_session: datatypes.PartitionSession, seqno: int, offset_delta: int
211+
partition_session: typing.Optional[datatypes.PartitionSession],
212+
seqno: int,
213+
offset_delta: int,
183214
):
184215
return PublicMessage(
185216
seqno=seqno,
@@ -963,6 +994,101 @@ async def test_read_batches(
963994
_codec=Codec.CODEC_RAW,
964995
)
965996

997+
@pytest.mark.parametrize(
998+
"batches_before,expected_message,batches_after",
999+
[
1000+
([], None, []),
1001+
(
1002+
[
1003+
PublicBatch(
1004+
session_metadata={},
1005+
messages=[stub_message(1)],
1006+
_partition_session=stub_partition_session(),
1007+
_bytes_size=0,
1008+
_codec=Codec.CODEC_RAW,
1009+
)
1010+
],
1011+
stub_message(1),
1012+
[],
1013+
),
1014+
(
1015+
[
1016+
PublicBatch(
1017+
session_metadata={},
1018+
messages=[stub_message(1), stub_message(2)],
1019+
_partition_session=stub_partition_session(),
1020+
_bytes_size=0,
1021+
_codec=Codec.CODEC_RAW,
1022+
),
1023+
PublicBatch(
1024+
session_metadata={},
1025+
messages=[stub_message(3), stub_message(4)],
1026+
_partition_session=stub_partition_session(),
1027+
_bytes_size=0,
1028+
_codec=Codec.CODEC_RAW,
1029+
),
1030+
],
1031+
stub_message(1),
1032+
[
1033+
PublicBatch(
1034+
session_metadata={},
1035+
messages=[stub_message(2)],
1036+
_partition_session=stub_partition_session(),
1037+
_bytes_size=0,
1038+
_codec=Codec.CODEC_RAW,
1039+
),
1040+
PublicBatch(
1041+
session_metadata={},
1042+
messages=[stub_message(3), stub_message(4)],
1043+
_partition_session=stub_partition_session(),
1044+
_bytes_size=0,
1045+
_codec=Codec.CODEC_RAW,
1046+
),
1047+
],
1048+
),
1049+
(
1050+
[
1051+
PublicBatch(
1052+
session_metadata={},
1053+
messages=[stub_message(1)],
1054+
_partition_session=stub_partition_session(),
1055+
_bytes_size=0,
1056+
_codec=Codec.CODEC_RAW,
1057+
),
1058+
PublicBatch(
1059+
session_metadata={},
1060+
messages=[stub_message(2), stub_message(3)],
1061+
_partition_session=stub_partition_session(),
1062+
_bytes_size=0,
1063+
_codec=Codec.CODEC_RAW,
1064+
),
1065+
],
1066+
stub_message(1),
1067+
[
1068+
PublicBatch(
1069+
session_metadata={},
1070+
messages=[stub_message(2), stub_message(3)],
1071+
_partition_session=stub_partition_session(),
1072+
_bytes_size=0,
1073+
_codec=Codec.CODEC_RAW,
1074+
)
1075+
],
1076+
),
1077+
],
1078+
)
1079+
async def test_read_message(
1080+
self,
1081+
stream_reader,
1082+
batches_before: typing.List[datatypes.PublicBatch],
1083+
expected_message: PublicMessage,
1084+
batches_after: typing.List[datatypes.PublicBatch],
1085+
):
1086+
stream_reader._message_batches = deque(batches_before)
1087+
mess = stream_reader.receive_message_nowait()
1088+
1089+
assert mess == expected_message
1090+
assert list(stream_reader._message_batches) == batches_after
1091+
9661092
async def test_receive_batch_nowait(self, stream, stream_reader, partition_session):
9671093
assert stream_reader.receive_batch_nowait() is None
9681094

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,28 @@ def messages(
8383
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
8484
8585
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
86-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
86+
if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses,
87+
get messages from internal buffer only.
8788
"""
8889
raise NotImplementedError()
8990

90-
def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage:
91+
def receive_message(
92+
self, *, timeout: TimeoutType = None
93+
) -> datatypes.PublicMessage:
9194
"""
9295
Block until receive new message
9396
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
97+
receive_message(timeout=0) may return None even right after async_wait_message() is ok - because lost of partition
98+
or connection to server lost
9499
95100
if no new message in timeout seconds (default - infinite): raise TimeoutError()
96-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
101+
if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only.
97102
"""
98-
raise NotImplementedError()
103+
self._check_closed()
104+
105+
return self._caller.safe_call_with_result(
106+
self._async_reader.receive_message(), timeout
107+
)
99108

100109
def async_wait_message(self) -> concurrent.futures.Future:
101110
"""
@@ -105,7 +114,11 @@ def async_wait_message(self) -> concurrent.futures.Future:
105114
Possible situation when receive signal about message available, but no messages when try to receive a message.
106115
If message expired between send event and try to retrieve message (for example connection broken).
107116
"""
108-
raise NotImplementedError()
117+
self._check_closed()
118+
119+
return self._caller.unsafe_call_with_future(
120+
self._async_reader._reconnector.wait_message()
121+
)
109122

110123
def batches(
111124
self,
@@ -119,7 +132,7 @@ def batches(
119132
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
120133
121134
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
122-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
135+
if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only.
123136
"""
124137
raise NotImplementedError()
125138

@@ -135,7 +148,7 @@ def receive_batch(
135148
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
136149
137150
if no new message in timeout seconds (default - infinite): raise TimeoutError()
138-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
151+
if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only.
139152
"""
140153
self._check_closed()
141154

0 commit comments

Comments
 (0)