@@ -732,6 +732,75 @@ def session_count():
732732 with pytest .raises (asyncio .QueueEmpty ):
733733 stream .from_client .get_nowait ()
734734
735+ @pytest .mark .parametrize (
736+ "graceful" ,
737+ (
738+ [True ],
739+ [False ],
740+ ),
741+ )
742+ async def test_free_buffer_after_partition_stop (self , stream , stream_reader , partition_session , graceful ):
743+ initial_buffer_size = stream_reader ._buffer_size_bytes
744+ message_size = initial_buffer_size - 1
745+
746+ t = datetime .datetime .now ()
747+
748+ stream .from_server .put_nowait (
749+ StreamReadMessage .FromServer (
750+ server_status = ServerStatus (issues .StatusCode .SUCCESS , []),
751+ server_message = StreamReadMessage .ReadResponse (
752+ bytes_size = message_size ,
753+ partition_data = [
754+ StreamReadMessage .ReadResponse .PartitionData (
755+ partition_session_id = partition_session .id ,
756+ batches = [
757+ StreamReadMessage .ReadResponse .Batch (
758+ message_data = [
759+ StreamReadMessage .ReadResponse .MessageData (
760+ partition_session .committed_offset + 1 ,
761+ seq_no = 123 ,
762+ created_at = t ,
763+ data = bytes (),
764+ uncompresed_size = message_size ,
765+ message_group_id = "test-message-group" ,
766+ )
767+ ],
768+ producer_id = "asd" ,
769+ write_session_meta = {},
770+ codec = Codec .CODEC_RAW ,
771+ written_at = t ,
772+ )
773+ ],
774+ )
775+ ],
776+ ),
777+ )
778+ )
779+
780+ def message_received ():
781+ return len (stream_reader ._message_batches ) > 0
782+
783+ await wait_condition (message_received )
784+
785+ assert stream_reader ._buffer_size_bytes == initial_buffer_size - message_size
786+
787+ stream .from_server .put_nowait (
788+ StreamReadMessage .FromServer (
789+ server_status = ServerStatus (issues .StatusCode .SUCCESS , []),
790+ server_message = StreamReadMessage .StopPartitionSessionRequest (
791+ partition_session_id = partition_session .id ,
792+ graceful = graceful ,
793+ committed_offset = partition_session .committed_offset ,
794+ ),
795+ )
796+ )
797+
798+ await wait_condition (lambda : partition_session .closed )
799+
800+ batch = stream_reader .receive_batch_nowait ()
801+ assert not batch .alive
802+ assert stream_reader ._buffer_size_bytes == initial_buffer_size
803+
735804 async def test_receive_message_from_server (
736805 self ,
737806 stream_reader ,
0 commit comments