@@ -3565,12 +3565,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35653565 lists :any (fun (Id ) -> Id =:= SubscriptionId end , SubscriptionIds ).
35663566
35673567send_file_callback (? VERSION_1 ,
3568- Transport ,
35693568 _Log ,
35703569 # consumer {configuration =
3571- # consumer_configuration {socket = S ,
3572- subscription_id =
3573- SubscriptionId ,
3570+ # consumer_configuration {subscription_id = SubId ,
35743571 counters = Counters }},
35753572 Counter ) ->
35763573 fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3581,19 +3578,16 @@ send_file_callback(?VERSION_1,
35813578 ? REQUEST :1 ,
35823579 ? COMMAND_DELIVER :15 ,
35833580 ? VERSION_1 :16 ,
3584- SubscriptionId :8 /unsigned >>,
3585- Transport :send (S , FrameBeginning ),
3581+ SubId :8 /unsigned >>,
35863582 atomics :add (Counter , 1 , Size ),
35873583 increase_messages_consumed (Counters , NumEntries ),
3588- set_consumer_offset (Counters , FirstOffsetInChunk )
3584+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3585+ FrameBeginning
35893586 end ;
35903587send_file_callback (? VERSION_2 ,
3591- Transport ,
35923588 Log ,
35933589 # consumer {configuration =
3594- # consumer_configuration {socket = S ,
3595- subscription_id =
3596- SubscriptionId ,
3590+ # consumer_configuration {subscription_id = SubId ,
35973591 counters = Counters }},
35983592 Counter ) ->
35993593 fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3605,12 +3599,12 @@ send_file_callback(?VERSION_2,
36053599 ? REQUEST :1 ,
36063600 ? COMMAND_DELIVER :15 ,
36073601 ? VERSION_2 :16 ,
3608- SubscriptionId :8 /unsigned ,
3602+ SubId :8 /unsigned ,
36093603 CommittedChunkId :64 >>,
3610- Transport :send (S , FrameBeginning ),
36113604 atomics :add (Counter , 1 , Size ),
36123605 increase_messages_consumed (Counters , NumEntries ),
3613- set_consumer_offset (Counters , FirstOffsetInChunk )
3606+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3607+ FrameBeginning
36143608 end .
36153609
36163610send_chunks (DeliverVersion ,
@@ -3680,9 +3674,7 @@ send_chunks(DeliverVersion,
36803674 Retry ,
36813675 Counter ) ->
36823676 case osiris_log :send_file (Socket , Log ,
3683- send_file_callback (DeliverVersion ,
3684- Transport ,
3685- Log ,
3677+ send_file_callback (DeliverVersion , Log ,
36863678 Consumer ,
36873679 Counter ))
36883680 of
0 commit comments