@@ -213,6 +213,16 @@ def create_message(
213213 )
214214
215215 async def send_message (self , stream_reader , message : PublicMessage ):
216+ await self .send_batch (stream_reader , [message ])
217+
218+ async def send_batch (self , stream_reader , batch : typing .List [PublicMessage ]):
219+ if len (batch ) == 0 :
220+ return
221+
222+ first_message = batch [0 ]
223+ for message in batch :
224+ assert message ._partition_session is first_message ._partition_session
225+
216226 def batch_count ():
217227 return len (stream_reader ._message_batches )
218228
@@ -225,7 +235,7 @@ def batch_count():
225235 server_message = StreamReadMessage .ReadResponse (
226236 partition_data = [
227237 StreamReadMessage .ReadResponse .PartitionData (
228- partition_session_id = message ._partition_session .id ,
238+ partition_session_id = first_message ._partition_session .id ,
229239 batches = [
230240 StreamReadMessage .ReadResponse .Batch (
231241 message_data = [
@@ -237,11 +247,12 @@ def batch_count():
237247 uncompresed_size = len (message .data ),
238248 message_group_id = message .message_group_id ,
239249 )
250+ for message in batch
240251 ],
241- producer_id = message .producer_id ,
242- write_session_meta = message .session_metadata ,
252+ producer_id = first_message .producer_id ,
253+ write_session_meta = first_message .session_metadata ,
243254 codec = Codec .CODEC_RAW ,
244- written_at = message .written_at ,
255+ written_at = first_message .written_at ,
245256 )
246257 ],
247258 )
@@ -1066,13 +1077,15 @@ async def test_read_message(
10661077 async def test_receive_batch_nowait (self , stream , stream_reader , partition_session ):
10671078 assert stream_reader .receive_batch_nowait () is None
10681079
1080+ initial_buffer_size = stream_reader ._buffer_size_bytes
1081+
10691082 mess1 = self .create_message (partition_session , 1 , 1 )
10701083 await self .send_message (stream_reader , mess1 )
10711084
10721085 mess2 = self .create_message (partition_session , 2 , 1 )
10731086 await self .send_message (stream_reader , mess2 )
10741087
1075- initial_buffer_size = stream_reader ._buffer_size_bytes
1088+ assert stream_reader ._buffer_size_bytes == initial_buffer_size - 2 * self . default_batch_size
10761089
10771090 received = stream_reader .receive_batch_nowait ()
10781091 assert received == PublicBatch (
@@ -1090,14 +1103,37 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi
10901103 _codec = Codec .CODEC_RAW ,
10911104 )
10921105
1093- assert stream_reader ._buffer_size_bytes == initial_buffer_size + 2 * self . default_batch_size
1106+ assert stream_reader ._buffer_size_bytes == initial_buffer_size
10941107
10951108 assert StreamReadMessage .ReadRequest (self .default_batch_size ) == stream .from_client .get_nowait ().client_message
10961109 assert StreamReadMessage .ReadRequest (self .default_batch_size ) == stream .from_client .get_nowait ().client_message
10971110
10981111 with pytest .raises (asyncio .QueueEmpty ):
10991112 stream .from_client .get_nowait ()
11001113
1114+ async def test_receive_message_nowait (self , stream , stream_reader , partition_session ):
1115+ assert stream_reader .receive_batch_nowait () is None
1116+
1117+ initial_buffer_size = stream_reader ._buffer_size_bytes
1118+
1119+ await self .send_batch (
1120+ stream_reader , [self .create_message (partition_session , 1 , 1 ), self .create_message (partition_session , 2 , 1 )]
1121+ )
1122+ await self .send_batch (
1123+ stream_reader ,
1124+ [
1125+ self .create_message (partition_session , 10 , 1 ),
1126+ ],
1127+ )
1128+
1129+ assert stream_reader ._buffer_size_bytes == initial_buffer_size - 2 * self .default_batch_size
1130+
1131+ for expected_seqno in [1 , 2 , 10 ]:
1132+ mess = stream_reader .receive_message_nowait ()
1133+ assert mess .seqno == expected_seqno
1134+
1135+ assert stream_reader ._buffer_size_bytes == initial_buffer_size
1136+
11011137 async def test_update_token (self , stream ):
11021138 settings = PublicReaderSettings (
11031139 consumer = "test-consumer" ,
0 commit comments