158158 % enqueues
159159 % rabbit_fifo_index can be slow when calculating the smallest
160160 % index when there are large gaps but should be faster than gb_trees
161- % for normal appending operations - backed by a map
161+ % for normal appending operations as it's backed by a map
162162 ra_indexes = rabbit_fifo_index :empty () :: rabbit_fifo_index :state (),
163163 % consumers need to reflect consumer state at time of snapshot
164164 % needs to be part of snapshot
170170 cancel_consumer_handler :: maybe (applied_mfa ()),
171171 become_leader_handler :: maybe (applied_mfa ()),
172172 metrics_handler :: maybe (applied_mfa ()),
173+ % % This is a special field that is only used for snapshots
174+ % % It represents the number of queued messages at the time the
175+ % % dehydrated snapshot state was cached.
176+ % % As release_cursors are only emitted for raft indexes where all
177+ % % prior messages no longer contribute to the current state we can
178+ % % replace all message payloads at some index with a single integer
179+ % % to be decremented during `checkout_one' until it's 0 after which
180+ % % it instead takes messages from the `messages' map.
181+ % % This is done so that consumers are still served in a deterministic
182+ % % order on recovery.
173183 prefix_msg_count = 0 :: non_neg_integer ()
174184 }).
175185
@@ -663,7 +673,7 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
663673 end ,
664674 {Cons , SQ , Effects } = update_or_remove_sub (ConsumerId , Con , Cons0 ,
665675 SQ0 , Effects0 ),
666- State1 = lists :foldl (fun (dummy , # state {prefix_msg_count = MsgCount } = S0 ) ->
676+ State1 = lists :foldl (fun ('$prefix_msg' , # state {prefix_msg_count = MsgCount } = S0 ) ->
667677 S0 # state {prefix_msg_count = MsgCount + 1 };
668678 ({MsgNum , Msg }, S0 ) ->
669679 return_one (MsgNum , Msg , S0 )
@@ -833,7 +843,9 @@ take_next_msg(#state{prefix_msg_count = 0,
833843 end ;
834844take_next_msg (# state {prefix_msg_count = MsgCount ,
835845 messages = Messages } = State ) ->
836- {dummy , State , Messages , MsgCount - 1 }.
846+ % % there is still a prefix message count for the consumer
847+ % % "fake" a '$prefix_msg' message
848+ {'$prefix_msg' , State , Messages , MsgCount - 1 }.
837849
838850send_msg_effect ({CTag , CPid }, Msgs ) ->
839851 {send_msg , CPid , {delivery , CTag , Msgs }, ra_event }.
@@ -871,7 +883,7 @@ checkout_one(#state{service_queue = SQ0,
871883 prefix_msg_count = PrefMsgC ,
872884 consumers = Cons },
873885 Msg = case ConsumerMsg of
874- dummy -> dummy ;
886+ '$prefix_msg' -> '$prefix_msg' ;
875887 {_ , {_ , M }} -> M
876888 end ,
877889 {success , ConsumerId , Next , Msg , State };
@@ -957,6 +969,8 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
957969 end .
958970
959971
972+ % % creates a dehydrated version of the current state to be cached and
973+ % % potentially used to for a snaphot at a later point
960974dehydrate_state (# state {messages = Messages0 ,
961975 consumers = Consumers ,
962976 prefix_msg_count = MsgCount } = State ) ->
@@ -1042,15 +1056,20 @@ credit_enq_enq_checkout_settled_credit_test() ->
10421056credit_with_drained_test () ->
10431057 Cid = {? FUNCTION_NAME , self ()},
10441058 State0 = test_init (test ),
1059+ % % checkout with a single credit
10451060 {State1 , _ , _ } =
10461061 apply (meta (1 ), {checkout , {auto , 1 , credited }, Cid }, [], State0 ),
1047- {State2 , _ } = credit (Cid , 2 , 0 , 5 , false , State1 ),
1048- {State , DrainedEffs } = credit (Cid , 3 , 0 , 5 , true , State2 ),
1062+ ? assertMatch (# state {consumers = #{Cid := # consumer {credit = 1 ,
1063+ delivery_count = 0 }}},
1064+ State1 ),
1065+ {State , _Effs , Result } =
1066+ apply (meta (3 ), {credit , 0 , 5 , true , Cid }, [], State1 ),
10491067 ? assertMatch (# state {consumers = #{Cid := # consumer {credit = 0 ,
10501068 delivery_count = 5 }}},
10511069 State ),
1052- ? ASSERT_EFF ({send_msg , _ , {send_drained , [{? FUNCTION_NAME , 5 }]}, cast },
1053- DrainedEffs ),
1070+ ? assertEqual ({multi , [{send_credit_reply , 0 },
1071+ {send_drained , [{? FUNCTION_NAME , 5 }]}]},
1072+ Result ),
10541073 ok .
10551074
10561075credit_and_drain_test () ->
@@ -1062,16 +1081,15 @@ credit_and_drain_test() ->
10621081 apply (meta (3 ), {checkout , {auto , 0 , credited }, Cid }, [], State2 ),
10631082
10641083 ? ASSERT_NO_EFF ({send_msg , _ , {delivery , _ , _ }}, CheckEffs ),
1065- {State4 , Effects , {send_credit_reply , 0 }} =
1066- apply (meta (4 ), {credit , 4 , 0 , true , Cid }, [], State3 ),
1084+ {State4 , Effects , {multi , [{send_credit_reply , 0 },
1085+ {send_drained , [{? FUNCTION_NAME , 2 }]}]}} =
1086+ apply (meta (4 ), {credit , 4 , 0 , true , Cid }, [], State3 ),
10671087 ? assertMatch (# state {consumers = #{Cid := # consumer {credit = 0 ,
10681088 delivery_count = 4 }}},
10691089 State4 ),
10701090
10711091 ? ASSERT_EFF ({send_msg , _ , {delivery , _ , [{_ , {_ , first }},
10721092 {_ , {_ , second }}]}, _ }, Effects ),
1073- ? ASSERT_EFF ({send_msg , _ , {send_drained , [{? FUNCTION_NAME , 2 }]}, cast },
1074- Effects ),
10751093 {_State5 , EnqEffs } = enq (5 , 2 , third , State4 ),
10761094 ? ASSERT_NO_EFF ({send_msg , _ , {delivery , _ , _ }}, EnqEffs ),
10771095 ok .
0 commit comments