4242 % % ra_machine callbacks
4343 init /1 ,
4444 apply /3 ,
45+ live_indexes /1 ,
4546 state_enter /2 ,
4647 tick /2 ,
4748 overview /1 ,
@@ -680,6 +681,15 @@ apply(_Meta, Cmd, State) ->
680681 ? LOG_DEBUG (" rabbit_fifo: unhandled command ~W " , [Cmd , 10 ]),
681682 {State , ok , []}.
682683
684+ -spec live_indexes (state ()) ->
685+ [ra :index ()].
686+ live_indexes (#? STATE {returns = Returns ,
687+ messages = Messages ,
688+ dlx = Dlx }) ->
689+ DlxIndexes = rabbit_fifo_dlx :live_indexes (Dlx ),
690+ RtnIndexes = [I || ? MSG (I , _ ) <- lqueue :to_list (Returns )],
691+ DlxIndexes ++ RtnIndexes ++ rabbit_fifo_q :indexes (Messages ).
692+
683693convert_v3_to_v4 (#{} = _Meta , StateV3 ) ->
684694 % % TODO: consider emitting release cursors as checkpoints
685695 Messages0 = rabbit_fifo_v3 :get_field (messages , StateV3 ),
@@ -932,7 +942,7 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
932942 end .
933943
934944-spec version () -> pos_integer ().
935- version () -> 7 .
945+ version () -> 8 .
936946
937947which_module (0 ) -> rabbit_fifo_v0 ;
938948which_module (1 ) -> rabbit_fifo_v1 ;
@@ -941,7 +951,8 @@ which_module(3) -> rabbit_fifo_v3;
941951which_module (4 ) -> ? MODULE ;
942952which_module (5 ) -> ? MODULE ;
943953which_module (6 ) -> ? MODULE ;
944- which_module (7 ) -> ? MODULE .
954+ which_module (7 ) -> ? MODULE ;
955+ which_module (8 ) -> ? MODULE .
945956
946957-define (AUX , aux_v3 ).
947958
@@ -951,6 +962,12 @@ which_module(7) -> ?MODULE.
951962 messages_total :: non_neg_integer (),
952963 indexes = ? CHECK_MIN_INDEXES :: non_neg_integer (),
953964 bytes_in = 0 :: non_neg_integer ()}).
965+ -record (snapshot , {index :: ra :index (),
966+ timestamp :: milliseconds (),
967+ % smallest_index :: undefined | ra:index(),
968+ messages_total :: non_neg_integer (),
969+ % indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
970+ bytes_out = 0 :: non_neg_integer ()}).
954971-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
955972-record (aux , {name :: atom (),
956973 capacity :: term (),
@@ -961,7 +978,7 @@ which_module(7) -> ?MODULE.
961978 gc = # aux_gc {} :: # aux_gc {},
962979 tick_pid :: undefined | pid (),
963980 cache = #{} :: map (),
964- last_checkpoint :: # checkpoint {},
981+ last_checkpoint :: # checkpoint {} | # snapshot {} ,
965982 bytes_in = 0 :: non_neg_integer (),
966983 bytes_out = 0 :: non_neg_integer ()}).
967984
@@ -973,10 +990,10 @@ init_aux(Name) when is_atom(Name) ->
973990 Now = erlang :monotonic_time (microsecond ),
974991 #? AUX {name = Name ,
975992 capacity = {inactive , Now , 1 , 1.0 },
976- last_checkpoint = # checkpoint {index = 0 ,
977- timestamp = erlang :system_time (millisecond ),
978- messages_total = 0 ,
979- bytes_in = 0 }}.
993+ last_checkpoint = # snapshot {index = 0 ,
994+ timestamp = erlang :system_time (millisecond ),
995+ messages_total = 0 ,
996+ bytes_out = 0 }}.
980997
981998handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
982999 capacity = Cap ,
@@ -994,13 +1011,16 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
9941011handle_aux (leader , cast , eval ,
9951012 #? AUX {last_decorators_state = LastDec ,
9961013 bytes_in = BytesIn ,
1014+ bytes_out = BytesOut ,
9971015 last_checkpoint = Check0 } = Aux0 ,
9981016 RaAux ) ->
9991017 #? STATE {cfg = # cfg {resource = QName }} = MacState =
10001018 ra_aux :machine_state (RaAux ),
10011019
10021020 Ts = erlang :system_time (millisecond ),
1003- {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
1021+ EffMacVer = ra_aux :effective_machine_version (RaAux ),
1022+ {Check , Effects0 } = do_checkpoints (EffMacVer , Ts , Check0 , RaAux ,
1023+ BytesIn , BytesOut , false ),
10041024
10051025 % % this is called after each batch of commands have been applied
10061026 % % set timer for message expire
@@ -1017,15 +1037,23 @@ handle_aux(leader, cast, eval,
10171037 end ;
10181038handle_aux (_RaftState , cast , eval ,
10191039 #? AUX {last_checkpoint = Check0 ,
1020- bytes_in = BytesIn } = Aux0 ,
1040+ bytes_in = BytesIn ,
1041+ bytes_out = BytesOut
1042+ } = Aux0 ,
10211043 RaAux ) ->
10221044 Ts = erlang :system_time (millisecond ),
1023- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
1045+ EffMacVer = ra_aux :effective_machine_version (RaAux ),
1046+ {Check , Effects } = do_checkpoints (EffMacVer , Ts , Check0 , RaAux ,
1047+ BytesIn , BytesOut , false ),
10241048 {no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects };
10251049handle_aux (_RaftState , cast , {bytes_in , {MetaSize , BodySize }},
10261050 #? AUX {bytes_in = Bytes } = Aux0 ,
10271051 RaAux ) ->
10281052 {no_reply , Aux0 #? AUX {bytes_in = Bytes + MetaSize + BodySize }, RaAux , []};
1053+ handle_aux (_RaftState , cast , {bytes_out , BodySize },
1054+ #? AUX {bytes_out = Bytes } = Aux0 ,
1055+ RaAux ) ->
1056+ {no_reply , Aux0 #? AUX {bytes_out = Bytes + BodySize }, RaAux , []};
10291057handle_aux (_RaftState , cast , {# return {msg_ids = MsgIds ,
10301058 consumer_key = Key } = Ret , Corr , Pid },
10311059 Aux0 , RaAux0 ) ->
@@ -1156,12 +1184,15 @@ handle_aux(_, _, garbage_collection, Aux, RaAux) ->
11561184 {no_reply , force_eval_gc (RaAux , Aux ), RaAux };
11571185handle_aux (_RaState , _ , force_checkpoint ,
11581186 #? AUX {last_checkpoint = Check0 ,
1159- bytes_in = BytesIn } = Aux , RaAux ) ->
1187+ bytes_in = BytesIn ,
1188+ bytes_out = BytesOut } = Aux , RaAux ) ->
11601189 Ts = erlang :system_time (millisecond ),
11611190 #? STATE {cfg = # cfg {resource = QR }} = ra_aux :machine_state (RaAux ),
11621191 ? LOG_DEBUG (" ~ts : rabbit_fifo: forcing checkpoint at ~b " ,
11631192 [rabbit_misc :rs (QR ), ra_aux :last_applied (RaAux )]),
1164- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , true ),
1193+ EffMacVer = ra_aux :effective_machine_version (RaAux ),
1194+ {Check , Effects } = do_checkpoints (EffMacVer , Ts , Check0 , RaAux ,
1195+ BytesIn , BytesOut , true ),
11651196 {no_reply , Aux #? AUX {last_checkpoint = Check }, RaAux , Effects };
11661197handle_aux (RaState , _ , {dlx , _ } = Cmd , Aux0 , RaAux ) ->
11671198 #? STATE {dlx = DlxState ,
@@ -1791,25 +1822,27 @@ complete(Meta, ConsumerKey, [MsgId],
17911822 # consumer {checked_out = Checked0 } = Con0 ,
17921823 #? STATE {ra_indexes = Indexes0 ,
17931824 msg_bytes_checkout = BytesCheckout ,
1794- messages_total = Tot } = State0 ) ->
1825+ messages_total = Tot } = State0 ,
1826+ Effects ) ->
17951827 case maps :take (MsgId , Checked0 ) of
17961828 {? MSG (Idx , Hdr ), Checked } ->
17971829 SettledSize = get_header (size , Hdr ),
17981830 Indexes = rabbit_fifo_index :delete (Idx , Indexes0 ),
17991831 Con = Con0 # consumer {checked_out = Checked ,
18001832 credit = increase_credit (Con0 , 1 )},
18011833 State1 = update_or_remove_con (Meta , ConsumerKey , Con , State0 ),
1802- State1 #? STATE {ra_indexes = Indexes ,
1834+ { State1 #? STATE {ra_indexes = Indexes ,
18031835 msg_bytes_checkout = BytesCheckout - SettledSize ,
1804- messages_total = Tot - 1 };
1836+ messages_total = Tot - 1 },
1837+ [{aux , {bytes_out , SettledSize }}, Effects ]};
18051838 error ->
1806- State0
1839+ { State0 , Effects }
18071840 end ;
18081841complete (Meta , ConsumerKey , MsgIds ,
18091842 # consumer {checked_out = Checked0 } = Con0 ,
18101843 #? STATE {ra_indexes = Indexes0 ,
18111844 msg_bytes_checkout = BytesCheckout ,
1812- messages_total = Tot } = State0 ) ->
1845+ messages_total = Tot } = State0 , Effects ) ->
18131846 {SettledSize , Checked , Indexes }
18141847 = lists :foldl (
18151848 fun (MsgId , {S0 , Ch0 , Idxs }) ->
@@ -1825,9 +1858,10 @@ complete(Meta, ConsumerKey, MsgIds,
18251858 Con = Con0 # consumer {checked_out = Checked ,
18261859 credit = increase_credit (Con0 , Len )},
18271860 State1 = update_or_remove_con (Meta , ConsumerKey , Con , State0 ),
1828- State1 #? STATE {ra_indexes = Indexes ,
1861+ { State1 #? STATE {ra_indexes = Indexes ,
18291862 msg_bytes_checkout = BytesCheckout - SettledSize ,
1830- messages_total = Tot - Len }.
1863+ messages_total = Tot - Len },
1864+ [{aux , {bytes_out , SettledSize }}, Effects ]}.
18311865
18321866increase_credit (# consumer {cfg = # consumer_cfg {lifetime = once },
18331867 credit = Credit }, _ ) ->
@@ -1854,11 +1888,12 @@ increase_credit(#consumer{credit = Current}, Credit) ->
18541888complete_and_checkout (#{} = Meta , MsgIds , ConsumerKey ,
18551889 # consumer {} = Con0 ,
18561890 Effects0 , State0 ) ->
1857- State1 = complete (Meta , ConsumerKey , MsgIds , Con0 , State0 ),
1891+ {State1 , Effects1 } = complete (Meta , ConsumerKey , MsgIds ,
1892+ Con0 , State0 , Effects0 ),
18581893 % % a completion could have removed the active/quiescing consumer
1859- Effects1 = add_active_effect (Con0 , State1 , Effects0 ),
1860- {State2 , Effects2 } = activate_next_consumer (State1 , Effects1 ),
1861- checkout (Meta , State0 , State2 , Effects2 ).
1894+ Effects2 = add_active_effect (Con0 , State1 , Effects1 ),
1895+ {State2 , Effects } = activate_next_consumer (State1 , Effects2 ),
1896+ checkout (Meta , State0 , State2 , Effects ).
18621897
18631898add_active_effect (# consumer {status = quiescing } = Consumer ,
18641899 #? STATE {cfg = # cfg {consumer_strategy = single_active },
@@ -1950,8 +1985,9 @@ return_one(Meta, MsgId, ?MSG(_, _) = Msg0, DelivFailed, Anns,
19501985 {DlxState , DlxEffects } =
19511986 rabbit_fifo_dlx :discard ([Msg ], delivery_limit , DLH , DlxState0 ),
19521987 State1 = State0 #? STATE {dlx = DlxState },
1953- State = complete (Meta , ConsumerKey , [MsgId ], Con0 , State1 ),
1954- {State , DlxEffects ++ Effects0 };
1988+ {State , Effects } = complete (Meta , ConsumerKey , [MsgId ],
1989+ Con0 , State1 , Effects0 ),
1990+ {State , DlxEffects ++ Effects };
19551991 _ ->
19561992 Checked = maps :remove (MsgId , Checked0 ),
19571993 Con = Con0 # consumer {checked_out = Checked ,
@@ -2817,7 +2853,10 @@ convert(Meta, 5, To, State) ->
28172853 convert (Meta , 6 , To , State );
28182854convert (Meta , 6 , To , State ) ->
28192855 % % no conversion needed, this version only includes a logic change
2820- convert (Meta , 7 , To , State ).
2856+ convert (Meta , 7 , To , State );
2857+ convert (Meta , 7 , To , State ) ->
2858+ % % no conversion needed, this version only includes a logic change
2859+ convert (Meta , 8 , To , State ).
28212860
28222861smallest_raft_index (#? STATE {messages = Messages ,
28232862 ra_indexes = Indexes ,
@@ -2987,12 +3026,43 @@ priority_tag(Msg) ->
29873026 end .
29883027
29893028
2990- do_checkpoints (Ts , # checkpoint {index = ChIdx ,
2991- timestamp = ChTime ,
2992- smallest_index = LastSmallest ,
2993- bytes_in = LastBytesIn ,
2994- indexes = MinIndexes } = Check0 ,
2995- RaAux , BytesIn , Force ) ->
3029+ do_checkpoints (MacVer , Ts , # checkpoint {index = _ChIdx ,
3030+ timestamp = _SnapTime },
3031+ RaAux , BytesIn , BytesOut , Force ) when MacVer >= 8 ->
3032+ do_checkpoints (MacVer , Ts , # snapshot {}, RaAux , BytesIn , BytesOut , Force );
3033+ do_checkpoints (MacVer , Ts , # snapshot {index = _ChIdx ,
3034+ timestamp = SnapTime ,
3035+ bytes_out = LastBytesOut } = Snap0 ,
3036+ RaAux , _BytesIn , BytesOut , _Force ) when MacVer >= 8 ->
3037+ LastAppliedIdx = ra_aux :last_applied (RaAux ),
3038+ #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
3039+ TimeSince = Ts - SnapTime ,
3040+ MsgsTot = messages_total (MacState ),
3041+ ra_aux :overview (RaAux ),
3042+ % MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes),
3043+ EnoughDataRemoved = BytesOut - LastBytesOut > ? SNAP_OUT_BYTES ,
3044+ {CheckMinInterval , _CheckMinIndexes , _CheckMaxIndexes } =
3045+ persistent_term :get (quorum_queue_checkpoint_config ,
3046+ {? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
3047+ ? CHECK_MAX_INDEXES }),
3048+ EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
3049+ case (EnoughTimeHasPassed andalso
3050+ EnoughDataRemoved ) of
3051+ true ->
3052+ {# snapshot {index = LastAppliedIdx ,
3053+ timestamp = Ts ,
3054+ messages_total = MsgsTot ,
3055+ bytes_out = BytesOut },
3056+ [{release_cursor , LastAppliedIdx , MacState }]};
3057+ false ->
3058+ {Snap0 , []}
3059+ end ;
3060+ do_checkpoints (MacVer ,Ts , # checkpoint {index = ChIdx ,
3061+ timestamp = ChTime ,
3062+ smallest_index = LastSmallest ,
3063+ bytes_in = LastBytesIn ,
3064+ indexes = MinIndexes } = Check0 ,
3065+ RaAux , BytesIn , _BytesOut , Force ) when MacVer < 8 ->
29963066 LastAppliedIdx = ra_aux :last_applied (RaAux ),
29973067 IndexesSince = LastAppliedIdx - ChIdx ,
29983068 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
@@ -3022,7 +3092,7 @@ do_checkpoints(Ts, #checkpoint{index = ChIdx,
30223092 % % condition 1: enough indexes have been committed since the last
30233093 % % checkpoint
30243094 (IndexesSince > MinIndexes ) orelse
3025- % % condition 2: the queue is empty and _some_ commands
3095+ % % condition 2: the queue is empty and _some_ commands
30263096 % % have been applied since the last checkpoint
30273097 (MsgsTot == 0 andalso IndexesSince > 32 )
30283098 )
0 commit comments