diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index b771a5cc1cd7..66d4f1e04eaf 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -929,7 +929,7 @@ which_module(5) -> ?MODULE. smallest_index :: undefined | ra:index(), messages_total :: non_neg_integer(), indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), - unused_1 = ?NIL}). + bytes_in = 0 :: non_neg_integer()}). -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), capacity :: term(), @@ -940,7 +940,9 @@ which_module(5) -> ?MODULE. gc = #aux_gc{} :: #aux_gc{}, tick_pid :: undefined | pid(), cache = #{} :: map(), - last_checkpoint :: #checkpoint{}}). + last_checkpoint :: #checkpoint{}, + bytes_in = 0 :: non_neg_integer(), + bytes_out = 0 :: non_neg_integer()}). init_aux(Name) when is_atom(Name) -> %% TODO: catch specific exception throw if table already exists @@ -953,7 +955,7 @@ init_aux(Name) when is_atom(Name) -> last_checkpoint = #checkpoint{index = 0, timestamp = erlang:system_time(millisecond), messages_total = 0, - unused_1 = ?NIL}}. + bytes_in = 0}}. handle_aux(RaftState, Tag, Cmd, #aux{name = Name, capacity = Cap, @@ -970,13 +972,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux) handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux); handle_aux(leader, cast, eval, #?AUX{last_decorators_state = LastDec, + bytes_in = BytesIn, last_checkpoint = Check0} = Aux0, RaAux) -> #?STATE{cfg = #cfg{resource = QName}} = MacState = ra_aux:machine_state(RaAux), Ts = erlang:system_time(millisecond), - {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false), + {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false), %% this is called after each batch of commands have been applied %% set timer for message expire @@ -992,11 +995,16 @@ handle_aux(leader, cast, eval, last_decorators_state = NewLast}, RaAux, Effects} end; handle_aux(_RaftState, cast, eval, - #?AUX{last_checkpoint = Check0} = Aux0, + #?AUX{last_checkpoint = Check0, + bytes_in = BytesIn} = Aux0, RaAux) -> Ts = erlang:system_time(millisecond), - {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false), + {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false), {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects}; +handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}}, + #?AUX{bytes_in = Bytes} = Aux0, + RaAux) -> + {no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []}; handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, consumer_key = Key} = Ret, Corr, Pid}, Aux0, RaAux0) -> @@ -1126,12 +1134,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, handle_aux(_, _, garbage_collection, Aux, RaAux) -> {no_reply, force_eval_gc(RaAux, Aux), RaAux}; handle_aux(_RaState, _, force_checkpoint, - #?AUX{last_checkpoint = Check0} = Aux, RaAux) -> + #?AUX{last_checkpoint = Check0, + bytes_in = BytesIn} = Aux, RaAux) -> Ts = erlang:system_time(millisecond), #?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux), rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b", [rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]), - {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true), + {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, true), {no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects}; handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) -> #?STATE{dlx = DlxState, @@ -1575,7 +1584,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey, apply_enqueue(#{index := RaftIdx, system_time := Ts} = Meta, From, Seq, RawMsg, Size, State0) -> - case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, [], State0) of + Effects0 = [{aux, {bytes_in, Size}}], + case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, + Effects0, State0) of {ok, State1, Effects1} -> checkout(Meta, State0, State1, Effects1); {out_of_sequence, State, Effects} -> @@ -2918,11 +2929,12 @@ priority_tag(Msg) -> end. -do_checkpoints(Ts, - #checkpoint{index = ChIdx, - timestamp = ChTime, - smallest_index = LastSmallest, - indexes = MinIndexes} = Check0, RaAux, Force) -> +do_checkpoints(Ts, #checkpoint{index = ChIdx, + timestamp = ChTime, + smallest_index = LastSmallest, + bytes_in = LastBytesIn, + indexes = MinIndexes} = Check0, + RaAux, BytesIn, Force) -> LastAppliedIdx = ra_aux:last_applied(RaAux), IndexesSince = LastAppliedIdx - ChIdx, #?STATE{} = MacState = ra_aux:machine_state(RaAux), @@ -2934,21 +2946,35 @@ do_checkpoints(Ts, Smallest end, MsgsTot = messages_total(MacState), + %% more than 64MB (by default) of message data has been written to the log + %% best take a checkpoint + {CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} = persistent_term:get(quorum_queue_checkpoint_config, {?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES, ?CHECK_MAX_INDEXES}), + + %% scale the bytes limit as the backlog increases + MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes), + EnoughDataWritten = BytesIn - LastBytesIn > (?CHECK_MAX_BYTES * MaxBytesFactor), EnoughTimeHasPassed = TimeSince > CheckMinInterval, - %% enough time has passed and enough indexes have been committed - case (IndexesSince > MinIndexes andalso - EnoughTimeHasPassed) orelse - %% the queue is empty and some commands have been - %% applied since the last checkpoint - (MsgsTot == 0 andalso - IndexesSince > CheckMinIndexes andalso - EnoughTimeHasPassed) orelse - Force of + case (EnoughTimeHasPassed andalso + ( + %% condition 1: enough indexes have been committed since the last + %% checkpoint + (IndexesSince > MinIndexes) orelse + %% condition 2: the queue is empty and _some_ commands + %% have been applied since the last checkpoint + (MsgsTot == 0 andalso IndexesSince > 32) + ) + ) orelse + %% condition 3: enough message data has been written to warrant a new + %% checkpoint, this ignores the time windowing + EnoughDataWritten orelse + %% force was requested, e.g. after a purge + Force + of true -> %% take fewer checkpoints the more messages there are on queue NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes), @@ -2957,6 +2983,7 @@ do_checkpoints(Ts, timestamp = Ts, smallest_index = NewSmallest, messages_total = MsgsTot, + bytes_in = BytesIn, indexes = NextIndexes}, [{checkpoint, LastAppliedIdx, MacState} | release_cursor(LastSmallest, NewSmallest)]}; diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index c74740149925..b8b69bff7f45 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -100,8 +100,11 @@ % represents a partially applied module call -define(CHECK_MIN_INTERVAL_MS, 1000). --define(CHECK_MIN_INDEXES, 4096). +-define(CHECK_MIN_INDEXES, 4096 * 2). -define(CHECK_MAX_INDEXES, 666_667). +%% once these many bytes have been written since the last checkpoint +%% we request a checkpoint irrespectively +-define(CHECK_MAX_BYTES, 128_000_000). -define(USE_AVG_HALF_LIFE, 10000.0). %% an average QQ without any message uses about 100KB so setting this limit diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c46b59bda21a..547ab7b871d6 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -145,8 +145,9 @@ -define(DELETE_TIMEOUT, 5000). -define(MEMBER_CHANGE_TIMEOUT, 20_000). -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 -% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra --define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384 +%% setting a low default here to allow quorum queues to better chose themselves +%% when to take a checkpoint +-define(MIN_CHECKPOINT_INTERVAL, 64). -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 0d89a18d1d0d..b46ad2a8cb9f 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1488,6 +1488,8 @@ gh_12635(Config) -> publish_confirm(Ch0, QQ), publish_confirm(Ch0, QQ), + %% a QQ will not take checkpoints more frequently than every 1s + timer:sleep(1000), %% force a checkpoint on leader ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]), rabbit_ct_helpers:await_condition(