Skip to content

Commit 5d94c17

Browse files
author
Valeriya Popova
committed
topic reader: add flush method
1 parent 918d1b2 commit 5d94c17

File tree

3 files changed

+63
-12
lines changed

3 files changed

+63
-12
lines changed

ydb/_topic_reader/datatypes.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
9696
return waiter
9797

9898
# fast way
99-
if len(self._ack_waiters) > 0 and self._ack_waiters[-1].end_offset < end_offset:
99+
if self._ack_waiters and self._ack_waiters[-1].end_offset < end_offset:
100100
self._ack_waiters.append(waiter)
101101
else:
102102
bisect.insort(self._ack_waiters, waiter)
@@ -106,25 +106,23 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
106106
def _create_future(self) -> asyncio.Future:
107107
if self._loop:
108108
return self._loop.create_future()
109-
else:
110-
return asyncio.Future()
109+
return asyncio.Future()
111110

112111
def ack_notify(self, offset: int):
113112
self._ensure_not_closed()
114113

115114
self.committed_offset = offset
116115

117-
if len(self._ack_waiters) == 0:
116+
if not self._ack_waiters:
118117
# todo log warning
119118
# must be never receive ack for not sended request
120119
return
121120

122-
while len(self._ack_waiters) > 0:
123-
if self._ack_waiters[0].end_offset <= offset:
124-
waiter = self._ack_waiters.popleft()
125-
waiter._finish_ok()
126-
else:
121+
while self._ack_waiters:
122+
if self._ack_waiters[0].end_offset > offset:
127123
break
124+
waiter = self._ack_waiters.popleft()
125+
waiter._finish_ok()
128126

129127
def close(self):
130128
if self.closed:

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,20 @@ async def commit_with_ack(
120120
waiter = self._reconnector.commit(batch)
121121
await waiter.future
122122

123-
async def close(self):
123+
async def flush(self):
124+
"""
125+
force send all commit messages from internal buffers to server and wait acks for all of them.
126+
127+
use asyncio.wait_for for wait with timeout.
128+
"""
129+
await self._reconnector.flush()
130+
131+
async def close(self, flush: bool = True):
124132
if self._closed:
125133
raise TopicReaderClosedError()
126134

127135
self._closed = True
128-
await self._reconnector.close()
136+
await self._reconnector.close(flush)
129137

130138

131139
class ReaderReconnector:
@@ -199,14 +207,20 @@ def commit(
199207
) -> datatypes.PartitionSession.CommitAckWaiter:
200208
return self._stream_reader.commit(batch)
201209

202-
async def close(self):
210+
async def close(self, flush: bool):
203211
if self._stream_reader:
212+
if flush:
213+
await self.flush()
204214
await self._stream_reader.close()
205215
for task in self._background_tasks:
206216
task.cancel()
207217

208218
await asyncio.wait(self._background_tasks)
209219

220+
async def flush(self):
221+
if self._stream_reader:
222+
await self._stream_reader.flush()
223+
210224
def _set_first_error(self, err: issues.Error):
211225
try:
212226
self._first_error.set_result(err)
@@ -641,6 +655,16 @@ def _get_first_error(self) -> Optional[YdbError]:
641655
if self._first_error.done():
642656
return self._first_error.result()
643657

658+
async def flush(self):
659+
if self._closed:
660+
raise RuntimeError("Flush on closed Stream")
661+
662+
futures = []
663+
for session in self._partition_sessions.values():
664+
futures.extend(w.future for w in session._ack_waiters)
665+
666+
await asyncio.gather(*futures)
667+
644668
async def close(self):
645669
if self._closed:
646670
return

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,35 @@ async def test_close_ack_waiters_when_close_stream_reader(
389389
with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
390390
waiter.future.result()
391391

392+
async def test_flush(
393+
self, stream, stream_reader_started: ReaderStream, partition_session
394+
):
395+
offset = self.partition_session_committed_offset + 1
396+
waiter = partition_session.add_waiter(offset)
397+
398+
with pytest.raises(WaitConditionError):
399+
await wait_for_fast(stream_reader_started.flush(), timeout=0.1)
400+
401+
stream.from_server.put_nowait(
402+
StreamReadMessage.FromServer(
403+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
404+
server_message=StreamReadMessage.CommitOffsetResponse(
405+
partitions_committed_offsets=[
406+
StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset(
407+
partition_session_id=partition_session.id,
408+
committed_offset=offset,
409+
),
410+
]
411+
),
412+
)
413+
)
414+
415+
await stream_reader_started.flush()
416+
# don't raises
417+
assert waiter.future.result() is None
418+
419+
await wait_for_fast(stream_reader_started.close())
420+
392421
async def test_commit_ranges_for_received_messages(
393422
self, stream, stream_reader_started: ReaderStream, partition_session
394423
):

0 commit comments

Comments
 (0)