Skip to content

Commit 538d6c7

Browse files
committed
sync with declared public api
1 parent 01dd5bf commit 538d6c7

File tree

2 files changed

+95
-5
lines changed

2 files changed

+95
-5
lines changed

tests/topics/test_topic_reader.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,5 @@ async def test_read_message(self, driver, topic_path, topic_with_messages, topic
1111
consumer=topic_consumer,
1212
topic=topic_path,
1313
))
14-
await reader.wait_messages()
1514

16-
assert reader.receive_batch() is not None
15+
assert await reader.receive_batch() is not None

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,103 @@ def __init__(self, driver: Driver, settings: PublicReaderSettings):
3737
self._loop = asyncio.get_running_loop()
3838
self._reconnector = ReaderReconnector(driver, settings)
3939

40-
async def wait_messages(self):
40+
async def __aenter__(self):
41+
raise NotImplementedError()
42+
43+
async def __aexit__(self, exc_type, exc_val, exc_tb):
44+
raise NotImplementedError()
45+
46+
async def sessions_stat(self) -> typing.List["SessionStat"]:
47+
"""
48+
Receive stat from the server
49+
50+
use asyncio.wait_for for wait with timeout.
51+
"""
52+
raise NotImplementedError()
53+
54+
def messages(
55+
self, *, timeout: typing.Union[float, None] = None
56+
) -> typing.AsyncIterable["PublicMessage"]:
57+
"""
58+
Block until receive new message
59+
60+
if no new messages in timeout seconds: stop iteration by raise StopAsyncIteration
61+
"""
62+
raise NotImplementedError()
63+
64+
async def receive_message(self) -> typing.Union["PublicMessage", None]:
65+
"""
66+
Block until receive new message
67+
68+
use asyncio.wait_for for wait with timeout.
69+
"""
70+
raise NotImplementedError()
71+
72+
def batches(
73+
self,
74+
*,
75+
max_messages: typing.Union[int, None] = None,
76+
max_bytes: typing.Union[int, None] = None,
77+
timeout: typing.Union[float, None] = None,
78+
) -> typing.AsyncIterable["PublicBatch"]:
79+
"""
80+
Block until receive new batch.
81+
All messages in a batch from same partition.
82+
83+
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
84+
"""
85+
raise NotImplementedError()
86+
87+
async def receive_batch(
88+
self, *, max_messages: typing.Union[int, None] = None, max_bytes: typing.Union[int, None] = None
89+
) -> typing.Union["PublicBatch", None]:
90+
"""
91+
Get one messages batch from reader.
92+
All messages in a batch from same partition.
93+
94+
use asyncio.wait_for for wait with timeout.
95+
"""
4196
await self._reconnector.wait_message()
42-
43-
def receive_batch(self):
4497
return self._reconnector.receive_batch_nowait()
4598

99+
async def commit_on_exit(self, mess: "ICommittable") -> typing.AsyncContextManager:
100+
"""
101+
commit the mess match/message if exit from context manager without exceptions
102+
103+
reader will close if exit from context manager with exception
104+
"""
105+
raise NotImplementedError()
106+
107+
def commit(self, mess: "ICommittable"):
108+
"""
109+
Write commit message to a buffer.
110+
111+
For the method no way check the commit result
112+
(for example if lost connection - commits will not re-send and committed messages will receive again)
113+
"""
114+
raise NotImplementedError()
115+
116+
async def commit_with_ack(
117+
self, mess: "ICommittable"
118+
) -> typing.Union["CommitResult", typing.List["CommitResult"]]:
119+
"""
120+
write commit message to a buffer and wait ack from the server.
121+
122+
use asyncio.wait_for for wait with timeout.
123+
"""
124+
raise NotImplementedError()
125+
126+
async def flush(self):
127+
"""
128+
force send all commit messages from internal buffers to server and wait acks for all of them.
129+
130+
use asyncio.wait_for for wait with timeout.
131+
"""
132+
raise NotImplementedError()
133+
134+
async def close(self):
135+
raise NotImplementedError()
136+
46137

47138
class ReaderReconnector:
48139
_settings: PublicReaderSettings

0 commit comments

Comments
 (0)