4343 init /1 ,
4444 apply /3 ,
4545 live_indexes /1 ,
46+ snapshot_installed /4 ,
4647 state_enter /2 ,
4748 tick /2 ,
4849 overview /1 ,
@@ -487,7 +488,8 @@ apply(#{index := Idx} = Meta,
487488apply (#{index := Index }, # purge {},
488489 #? STATE {messages_total = Total ,
489490 returns = Returns ,
490- ra_indexes = Indexes0
491+ ra_indexes = Indexes0 ,
492+ msg_bytes_enqueue = MsgBytesEnqueue
491493 } = State0 ) ->
492494 NumReady = messages_ready (State0 ),
493495 Indexes = case Total of
@@ -514,7 +516,9 @@ apply(#{index := Index}, #purge{},
514516 returns = lqueue :new (),
515517 msg_bytes_enqueue = 0
516518 },
517- Effects0 = [{aux , force_checkpoint }, garbage_collection ],
519+ Effects0 = [{aux , {bytes_out , MsgBytesEnqueue }},
520+ {aux , force_checkpoint },
521+ garbage_collection ],
518522 Reply = {purge , NumReady },
519523 {State , _ , Effects } = evaluate_limit (Index , false , State0 ,
520524 State1 , Effects0 ),
@@ -681,14 +685,61 @@ apply(_Meta, Cmd, State) ->
681685 ? LOG_DEBUG (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
682686 {State , ok , []}.
683687
684- -spec live_indexes (state ()) ->
685- [ ra : index ()].
686- live_indexes (# ? STATE { returns = Returns ,
688+ -spec live_indexes (state ()) -> [ ra : index ()].
689+ live_indexes (# ? STATE { cfg = # cfg {},
690+ returns = Returns ,
687691 messages = Messages ,
692+ consumers = Consumers ,
688693 dlx = Dlx }) ->
689694 DlxIndexes = rabbit_fifo_dlx :live_indexes (Dlx ),
690695 RtnIndexes = [I || ? MSG (I , _ ) <- lqueue :to_list (Returns )],
691- DlxIndexes ++ RtnIndexes ++ rabbit_fifo_q :indexes (Messages ).
696+ CheckedIdxs = maps :fold (
697+ fun (_Cid , # consumer {checked_out = Ch }, Acc0 ) ->
698+ maps :fold (
699+ fun (_MsgId , ? MSG (I , _ ), Acc ) ->
700+ [I | Acc ]
701+ end , Acc0 , Ch )
702+ end , RtnIndexes ++ DlxIndexes , Consumers ),
703+
704+
705+ CheckedIdxs ++ rabbit_fifo_q :indexes (Messages ).
706+
707+
708+ -spec snapshot_installed (Meta , State , OldMeta , OldState ) ->
709+ ra_machine :effects () when
710+ Meta :: ra_snapshot :meta (),
711+ State :: state (),
712+ OldMeta :: ra_snapshot :meta (),
713+ OldState :: state ().
714+ snapshot_installed (_Meta , #? MODULE {cfg = # cfg {resource = QR },
715+ consumers = Consumers } = State ,
716+ _OldMeta , _OldState ) ->
717+ % % here we need to redliver all pending consumer messages
718+ % % to local consumers
719+ % % TODO: with some additional state (raft indexes assigned to consumer)
720+ % % we could reduce the number of resends but it is questionable if this
721+ % % complexity is worth the effort. rabbit_fifo_index will de-duplicate
722+ % % deliveries anyway
723+ SendAcc = maps :fold (
724+ fun (_ConsumerKey , # consumer {cfg = # consumer_cfg {tag = Tag ,
725+ pid = Pid },
726+ checked_out = Checked },
727+ Acc ) ->
728+ case node (Pid ) == node () of
729+ true ->
730+ Acc #{{Tag , Pid } => maps :to_list (Checked )};
731+ false ->
732+ Acc
733+ end
734+ end , #{}, Consumers ),
735+ ? LOG_DEBUG (" ~ts : rabbit_fifo: install snapshot sending ~p " ,
736+ [rabbit_misc :rs (QR ), SendAcc ]),
737+ Effs = add_delivery_effects ([], SendAcc , State ),
738+ ? LOG_DEBUG (" ~ts : rabbit_fifo: effs ~p " ,
739+ [rabbit_misc :rs (QR ), Effs ]),
740+ Effs .
741+
742+
692743
693744convert_v3_to_v4 (#{} = _Meta , StateV3 ) ->
694745 % % TODO: consider emitting release cursors as checkpoints
@@ -965,7 +1016,7 @@ which_module(8) -> ?MODULE.
9651016-record (snapshot , {index :: ra :index (),
9661017 timestamp :: milliseconds (),
9671018 % smallest_index :: undefined | ra:index(),
968- messages_total :: non_neg_integer (),
1019+ messages_total = 0 :: non_neg_integer (),
9691020 % indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
9701021 bytes_out = 0 :: non_neg_integer ()}).
9711022-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
@@ -990,10 +1041,9 @@ init_aux(Name) when is_atom(Name) ->
9901041 Now = erlang :monotonic_time (microsecond ),
9911042 #? AUX {name = Name ,
9921043 capacity = {inactive , Now , 1 , 1.0 },
993- last_checkpoint = # snapshot {index = 0 ,
994- timestamp = erlang :system_time (millisecond ),
995- messages_total = 0 ,
996- bytes_out = 0 }}.
1044+ last_checkpoint = # checkpoint {index = 0 ,
1045+ timestamp = erlang :system_time (millisecond ),
1046+ messages_total = 0 }}.
9971047
9981048handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
9991049 capacity = Cap ,
@@ -1018,7 +1068,13 @@ handle_aux(leader, cast, eval,
10181068 ra_aux :machine_state (RaAux ),
10191069
10201070 Ts = erlang :system_time (millisecond ),
1021- EffMacVer = ra_aux :effective_machine_version (RaAux ),
1071+ EffMacVer = try ra_aux :effective_machine_version (RaAux ) of
1072+ V -> V
1073+ catch _ :_ ->
1074+ % % this function is not available in older aux states.
1075+ % % this is a guess
1076+ undefined
1077+ end ,
10221078 {Check , Effects0 } = do_checkpoints (EffMacVer , Ts , Check0 , RaAux ,
10231079 BytesIn , BytesOut , false ),
10241080
@@ -1834,7 +1890,7 @@ complete(Meta, ConsumerKey, [MsgId],
18341890 {State1 #? STATE {ra_indexes = Indexes ,
18351891 msg_bytes_checkout = BytesCheckout - SettledSize ,
18361892 messages_total = Tot - 1 },
1837- [{aux , {bytes_out , SettledSize }}, Effects ]};
1893+ [{aux , {bytes_out , SettledSize }} | Effects ]};
18381894 error ->
18391895 {State0 , Effects }
18401896 end ;
@@ -1861,7 +1917,7 @@ complete(Meta, ConsumerKey, MsgIds,
18611917 {State1 #? STATE {ra_indexes = Indexes ,
18621918 msg_bytes_checkout = BytesCheckout - SettledSize ,
18631919 messages_total = Tot - Len },
1864- [{aux , {bytes_out , SettledSize }}, Effects ]}.
1920+ [{aux , {bytes_out , SettledSize }} | Effects ]}.
18651921
18661922increase_credit (# consumer {cfg = # consumer_cfg {lifetime = once },
18671923 credit = Credit }, _ ) ->
@@ -3025,29 +3081,34 @@ priority_tag(Msg) ->
30253081 no
30263082 end .
30273083
3028-
3029- do_checkpoints (MacVer , Ts , # checkpoint {index = _ChIdx ,
3030- timestamp = _SnapTime },
3031- RaAux , BytesIn , BytesOut , Force ) when MacVer >= 8 ->
3032- do_checkpoints (MacVer , Ts , # snapshot {}, RaAux , BytesIn , BytesOut , Force );
3084+ do_checkpoints (MacVer , Ts , # checkpoint {timestamp = LastTs ,
3085+ index = Idx },
3086+ RaAux , BytesIn , BytesOut , Force )
3087+ when is_integer (MacVer ) andalso MacVer >= 8 ->
3088+ do_checkpoints (MacVer , Ts , # snapshot {index = Idx ,
3089+ timestamp = LastTs }, RaAux , BytesIn ,
3090+ BytesOut , Force );
30333091do_checkpoints (MacVer , Ts , # snapshot {index = _ChIdx ,
3034- timestamp = SnapTime ,
3035- bytes_out = LastBytesOut } = Snap0 ,
3036- RaAux , _BytesIn , BytesOut , _Force ) when MacVer >= 8 ->
3092+ timestamp = SnapTime ,
3093+ bytes_out = LastBytesOut } = Snap0 ,
3094+ RaAux , _BytesIn , BytesOut , Force )
3095+ when is_integer (MacVer ) andalso MacVer >= 8 ->
30373096 LastAppliedIdx = ra_aux :last_applied (RaAux ),
30383097 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
30393098 TimeSince = Ts - SnapTime ,
30403099 MsgsTot = messages_total (MacState ),
3041- ra_aux :overview (RaAux ),
3100+ % ra_aux:overview(RaAux),
30423101 % MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes),
3102+ % TODO: snapshots also need to be triggered by non settled commands
3103+ % that aren't enqueues
30433104 EnoughDataRemoved = BytesOut - LastBytesOut > ? SNAP_OUT_BYTES ,
30443105 {CheckMinInterval , _CheckMinIndexes , _CheckMaxIndexes } =
30453106 persistent_term :get (quorum_queue_checkpoint_config ,
30463107 {? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
30473108 ? CHECK_MAX_INDEXES }),
30483109 EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
3049- case (EnoughTimeHasPassed andalso
3050- EnoughDataRemoved ) of
3110+ case (EnoughTimeHasPassed andalso EnoughDataRemoved ) orelse
3111+ Force of
30513112 true ->
30523113 {# snapshot {index = LastAppliedIdx ,
30533114 timestamp = Ts ,
@@ -3062,7 +3123,8 @@ do_checkpoints(MacVer,Ts, #checkpoint{index = ChIdx,
30623123 smallest_index = LastSmallest ,
30633124 bytes_in = LastBytesIn ,
30643125 indexes = MinIndexes } = Check0 ,
3065- RaAux , BytesIn , _BytesOut , Force ) when MacVer < 8 ->
3126+ RaAux , BytesIn , _BytesOut , Force )
3127+ when not is_integer (MacVer ) orelse MacVer < 8 ->
30663128 LastAppliedIdx = ra_aux :last_applied (RaAux ),
30673129 IndexesSince = LastAppliedIdx - ChIdx ,
30683130 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
0 commit comments