4343 init /1 ,
4444 apply /3 ,
4545 live_indexes /1 ,
46+ snapshot_installed /4 ,
4647 state_enter /2 ,
4748 tick /2 ,
4849 overview /1 ,
@@ -487,7 +488,8 @@ apply(#{index := Idx} = Meta,
487488apply (#{index := Index }, # purge {},
488489 #? STATE {messages_total = Total ,
489490 returns = Returns ,
490- ra_indexes = Indexes0
491+ ra_indexes = Indexes0 ,
492+ msg_bytes_enqueue = MsgBytesEnqueue
491493 } = State0 ) ->
492494 NumReady = messages_ready (State0 ),
493495 Indexes = case Total of
@@ -514,7 +516,9 @@ apply(#{index := Index}, #purge{},
514516 returns = lqueue :new (),
515517 msg_bytes_enqueue = 0
516518 },
517- Effects0 = [{aux , force_checkpoint }, garbage_collection ],
519+ Effects0 = [{aux , {bytes_out , MsgBytesEnqueue }},
520+ {aux , force_checkpoint },
521+ garbage_collection ],
518522 Reply = {purge , NumReady },
519523 {State , _ , Effects } = evaluate_limit (Index , false , State0 ,
520524 State1 , Effects0 ),
@@ -681,14 +685,56 @@ apply(_Meta, Cmd, State) ->
681685 ? LOG_DEBUG (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
682686 {State , ok , []}.
683687
684- -spec live_indexes (state ()) ->
685- [ ra : index ()].
686- live_indexes (# ? STATE { returns = Returns ,
688+ -spec live_indexes (state ()) -> [ ra : index ()].
689+ live_indexes (# ? STATE { cfg = # cfg {},
690+ returns = Returns ,
687691 messages = Messages ,
692+ ra_indexes = Indexes ,
688693 dlx = Dlx }) ->
689694 DlxIndexes = rabbit_fifo_dlx :live_indexes (Dlx ),
690695 RtnIndexes = [I || ? MSG (I , _ ) <- lqueue :to_list (Returns )],
691- DlxIndexes ++ RtnIndexes ++ rabbit_fifo_q :indexes (Messages ).
696+ CheckedIdxs = rabbit_fifo_index :indexes (Indexes ),
697+ CheckedIdxs ++
698+ DlxIndexes ++
699+ RtnIndexes ++
700+ rabbit_fifo_q :indexes (Messages ).
701+
702+
703+ -spec snapshot_installed (Meta , State , OldMeta , OldState ) ->
704+ ra_machine :effects () when
705+ Meta :: ra_snapshot :meta (),
706+ State :: state (),
707+ OldMeta :: ra_snapshot :meta (),
708+ OldState :: state ().
709+ snapshot_installed (_Meta , #? MODULE {cfg = # cfg {resource = QR },
710+ consumers = Consumers } = State ,
711+ _OldMeta , _OldState ) ->
712+ % % here we need to redliver all pending consumer messages
713+ % % to local consumers
714+ % % TODO: with some additional state (raft indexes assigned to consumer)
715+ % % we could reduce the number of resends but it is questionable if this
716+ % % complexity is worth the effort. rabbit_fifo_index will de-duplicate
717+ % % deliveries anyway
718+ SendAcc = maps :fold (
719+ fun (_ConsumerKey , # consumer {cfg = # consumer_cfg {tag = Tag ,
720+ pid = Pid },
721+ checked_out = Checked },
722+ Acc ) ->
723+ case node (Pid ) == node () of
724+ true ->
725+ Acc #{{Tag , Pid } => maps :to_list (Checked )};
726+ false ->
727+ Acc
728+ end
729+ end , #{}, Consumers ),
730+ ? LOG_DEBUG (" ~ts : rabbit_fifo: install snapshot sending ~p " ,
731+ [rabbit_misc :rs (QR ), SendAcc ]),
732+ Effs = add_delivery_effects ([], SendAcc , State ),
733+ ? LOG_DEBUG (" ~ts : rabbit_fifo: effs ~p " ,
734+ [rabbit_misc :rs (QR ), Effs ]),
735+ Effs .
736+
737+
692738
693739convert_v3_to_v4 (#{} = _Meta , StateV3 ) ->
694740 % % TODO: consider emitting release cursors as checkpoints
@@ -965,7 +1011,7 @@ which_module(8) -> ?MODULE.
9651011-record (snapshot , {index :: ra :index (),
9661012 timestamp :: milliseconds (),
9671013 % smallest_index :: undefined | ra:index(),
968- messages_total :: non_neg_integer (),
1014+ messages_total = 0 :: non_neg_integer (),
9691015 % indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
9701016 bytes_out = 0 :: non_neg_integer ()}).
9711017-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
@@ -990,10 +1036,9 @@ init_aux(Name) when is_atom(Name) ->
9901036 Now = erlang :monotonic_time (microsecond ),
9911037 #? AUX {name = Name ,
9921038 capacity = {inactive , Now , 1 , 1.0 },
993- last_checkpoint = # snapshot {index = 0 ,
994- timestamp = erlang :system_time (millisecond ),
995- messages_total = 0 ,
996- bytes_out = 0 }}.
1039+ last_checkpoint = # checkpoint {index = 0 ,
1040+ timestamp = erlang :system_time (millisecond ),
1041+ messages_total = 0 }}.
9971042
9981043handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
9991044 capacity = Cap ,
@@ -1018,7 +1063,13 @@ handle_aux(leader, cast, eval,
10181063 ra_aux :machine_state (RaAux ),
10191064
10201065 Ts = erlang :system_time (millisecond ),
1021- EffMacVer = ra_aux :effective_machine_version (RaAux ),
1066+ EffMacVer = try ra_aux :effective_machine_version (RaAux ) of
1067+ V -> V
1068+ catch _ :_ ->
1069+ % % this function is not available in older aux states.
1070+ % % this is a guess
1071+ undefined
1072+ end ,
10221073 {Check , Effects0 } = do_checkpoints (EffMacVer , Ts , Check0 , RaAux ,
10231074 BytesIn , BytesOut , false ),
10241075
@@ -1834,7 +1885,7 @@ complete(Meta, ConsumerKey, [MsgId],
18341885 {State1 #? STATE {ra_indexes = Indexes ,
18351886 msg_bytes_checkout = BytesCheckout - SettledSize ,
18361887 messages_total = Tot - 1 },
1837- [{aux , {bytes_out , SettledSize }}, Effects ]};
1888+ [{aux , {bytes_out , SettledSize }} | Effects ]};
18381889 error ->
18391890 {State0 , Effects }
18401891 end ;
@@ -1861,7 +1912,7 @@ complete(Meta, ConsumerKey, MsgIds,
18611912 {State1 #? STATE {ra_indexes = Indexes ,
18621913 msg_bytes_checkout = BytesCheckout - SettledSize ,
18631914 messages_total = Tot - Len },
1864- [{aux , {bytes_out , SettledSize }}, Effects ]}.
1915+ [{aux , {bytes_out , SettledSize }} | Effects ]}.
18651916
18661917increase_credit (# consumer {cfg = # consumer_cfg {lifetime = once },
18671918 credit = Credit }, _ ) ->
@@ -3025,29 +3076,34 @@ priority_tag(Msg) ->
30253076 no
30263077 end .
30273078
3028-
3029- do_checkpoints (MacVer , Ts , # checkpoint {index = _ChIdx ,
3030- timestamp = _SnapTime },
3031- RaAux , BytesIn , BytesOut , Force ) when MacVer >= 8 ->
3032- do_checkpoints (MacVer , Ts , # snapshot {}, RaAux , BytesIn , BytesOut , Force );
3079+ do_checkpoints (MacVer , Ts , # checkpoint {timestamp = LastTs ,
3080+ index = Idx },
3081+ RaAux , BytesIn , BytesOut , Force )
3082+ when is_integer (MacVer ) andalso MacVer >= 8 ->
3083+ do_checkpoints (MacVer , Ts , # snapshot {index = Idx ,
3084+ timestamp = LastTs }, RaAux , BytesIn ,
3085+ BytesOut , Force );
30333086do_checkpoints (MacVer , Ts , # snapshot {index = _ChIdx ,
3034- timestamp = SnapTime ,
3035- bytes_out = LastBytesOut } = Snap0 ,
3036- RaAux , _BytesIn , BytesOut , _Force ) when MacVer >= 8 ->
3087+ timestamp = SnapTime ,
3088+ bytes_out = LastBytesOut } = Snap0 ,
3089+ RaAux , _BytesIn , BytesOut , Force )
3090+ when is_integer (MacVer ) andalso MacVer >= 8 ->
30373091 LastAppliedIdx = ra_aux :last_applied (RaAux ),
30383092 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
30393093 TimeSince = Ts - SnapTime ,
30403094 MsgsTot = messages_total (MacState ),
3041- ra_aux :overview (RaAux ),
3095+ % ra_aux:overview(RaAux),
30423096 % MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes),
3097+ % TODO: snapshots also need to be triggered by non settled commands
3098+ % that aren't enqueues
30433099 EnoughDataRemoved = BytesOut - LastBytesOut > ? SNAP_OUT_BYTES ,
30443100 {CheckMinInterval , _CheckMinIndexes , _CheckMaxIndexes } =
30453101 persistent_term :get (quorum_queue_checkpoint_config ,
30463102 {? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
30473103 ? CHECK_MAX_INDEXES }),
30483104 EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
3049- case (EnoughTimeHasPassed andalso
3050- EnoughDataRemoved ) of
3105+ case (EnoughTimeHasPassed andalso EnoughDataRemoved ) orelse
3106+ Force of
30513107 true ->
30523108 {# snapshot {index = LastAppliedIdx ,
30533109 timestamp = Ts ,
@@ -3062,7 +3118,8 @@ do_checkpoints(MacVer,Ts, #checkpoint{index = ChIdx,
30623118 smallest_index = LastSmallest ,
30633119 bytes_in = LastBytesIn ,
30643120 indexes = MinIndexes } = Check0 ,
3065- RaAux , BytesIn , _BytesOut , Force ) when MacVer < 8 ->
3121+ RaAux , BytesIn , _BytesOut , Force )
3122+ when not is_integer (MacVer ) orelse MacVer < 8 ->
30663123 LastAppliedIdx = ra_aux :last_applied (RaAux ),
30673124 IndexesSince = LastAppliedIdx - ChIdx ,
30683125 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
0 commit comments