@@ -3571,12 +3571,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35713571 lists :any (fun (Id ) -> Id =:= SubscriptionId end , SubscriptionIds ).
35723572
35733573send_file_callback (? VERSION_1 ,
3574- Transport ,
35753574 _Log ,
35763575 # consumer {configuration =
3577- # consumer_configuration {socket = S ,
3578- subscription_id =
3579- SubscriptionId ,
3576+ # consumer_configuration {subscription_id = SubId ,
35803577 counters = Counters }},
35813578 Counter ) ->
35823579 fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3587,19 +3584,16 @@ send_file_callback(?VERSION_1,
35873584 ? REQUEST :1 ,
35883585 ? COMMAND_DELIVER :15 ,
35893586 ? VERSION_1 :16 ,
3590- SubscriptionId :8 /unsigned >>,
3591- Transport :send (S , FrameBeginning ),
3587+ SubId :8 /unsigned >>,
35923588 atomics :add (Counter , 1 , Size ),
35933589 increase_messages_consumed (Counters , NumEntries ),
3594- set_consumer_offset (Counters , FirstOffsetInChunk )
3590+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3591+ FrameBeginning
35953592 end ;
35963593send_file_callback (? VERSION_2 ,
3597- Transport ,
35983594 Log ,
35993595 # consumer {configuration =
3600- # consumer_configuration {socket = S ,
3601- subscription_id =
3602- SubscriptionId ,
3596+ # consumer_configuration {subscription_id = SubId ,
36033597 counters = Counters }},
36043598 Counter ) ->
36053599 fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3611,12 +3605,12 @@ send_file_callback(?VERSION_2,
36113605 ? REQUEST :1 ,
36123606 ? COMMAND_DELIVER :15 ,
36133607 ? VERSION_2 :16 ,
3614- SubscriptionId :8 /unsigned ,
3608+ SubId :8 /unsigned ,
36153609 CommittedChunkId :64 >>,
3616- Transport :send (S , FrameBeginning ),
36173610 atomics :add (Counter , 1 , Size ),
36183611 increase_messages_consumed (Counters , NumEntries ),
3619- set_consumer_offset (Counters , FirstOffsetInChunk )
3612+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3613+ FrameBeginning
36203614 end .
36213615
36223616send_chunks (DeliverVersion ,
@@ -3686,9 +3680,7 @@ send_chunks(DeliverVersion,
36863680 Retry ,
36873681 Counter ) ->
36883682 case osiris_log :send_file (Socket , Log ,
3689- send_file_callback (DeliverVersion ,
3690- Transport ,
3691- Log ,
3683+ send_file_callback (DeliverVersion , Log ,
36923684 Consumer ,
36933685 Counter ))
36943686 of
0 commit comments