Skip to content

Commit 32e7784

Browse files
authored
Merge pull request #348 add test for free buffer after stop partition
2 parents 6c745e1 + 643205b commit 32e7784

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed

ydb/_topic_common/test_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async def wait_condition(
6868

6969

7070
async def wait_for_fast(
71-
awaitable: typing.Awaitable,
71+
awaitable: typing.Union[typing.Awaitable, typing.Coroutine],
7272
timeout: typing.Optional[typing.Union[float, int]] = None,
7373
):
7474
fut = asyncio.ensure_future(awaitable)

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,75 @@ def session_count():
732732
with pytest.raises(asyncio.QueueEmpty):
733733
stream.from_client.get_nowait()
734734

735+
@pytest.mark.parametrize(
736+
"graceful",
737+
(
738+
[True],
739+
[False],
740+
),
741+
)
742+
async def test_free_buffer_after_partition_stop(self, stream, stream_reader, partition_session, graceful):
743+
initial_buffer_size = stream_reader._buffer_size_bytes
744+
message_size = initial_buffer_size - 1
745+
746+
t = datetime.datetime.now()
747+
748+
stream.from_server.put_nowait(
749+
StreamReadMessage.FromServer(
750+
server_status=ServerStatus(issues.StatusCode.SUCCESS, []),
751+
server_message=StreamReadMessage.ReadResponse(
752+
bytes_size=message_size,
753+
partition_data=[
754+
StreamReadMessage.ReadResponse.PartitionData(
755+
partition_session_id=partition_session.id,
756+
batches=[
757+
StreamReadMessage.ReadResponse.Batch(
758+
message_data=[
759+
StreamReadMessage.ReadResponse.MessageData(
760+
partition_session.committed_offset + 1,
761+
seq_no=123,
762+
created_at=t,
763+
data=bytes(),
764+
uncompresed_size=message_size,
765+
message_group_id="test-message-group",
766+
)
767+
],
768+
producer_id="asd",
769+
write_session_meta={},
770+
codec=Codec.CODEC_RAW,
771+
written_at=t,
772+
)
773+
],
774+
)
775+
],
776+
),
777+
)
778+
)
779+
780+
def message_received():
781+
return len(stream_reader._message_batches) > 0
782+
783+
await wait_condition(message_received)
784+
785+
assert stream_reader._buffer_size_bytes == initial_buffer_size - message_size
786+
787+
stream.from_server.put_nowait(
788+
StreamReadMessage.FromServer(
789+
server_status=ServerStatus(issues.StatusCode.SUCCESS, []),
790+
server_message=StreamReadMessage.StopPartitionSessionRequest(
791+
partition_session_id=partition_session.id,
792+
graceful=graceful,
793+
committed_offset=partition_session.committed_offset,
794+
),
795+
)
796+
)
797+
798+
await wait_condition(lambda: partition_session.closed)
799+
800+
batch = stream_reader.receive_batch_nowait()
801+
assert not batch.alive
802+
assert stream_reader._buffer_size_bytes == initial_buffer_size
803+
735804
async def test_receive_message_from_server(
736805
self,
737806
stream_reader,

0 commit comments

Comments
 (0)