From 669528264010a413daff35cfef93dec25fccce9a Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 21 Mar 2025 14:30:16 +0000 Subject: [PATCH] QQ: Revise checkpointing logic To take more frequent checkpoints for large message workload Lower the min_checkpoint_interval substantially to allow quorum queues better control over when checkpoints are taken. Track bytes enqueued in the aux state and suggest a checkpoint after every 64MB enqueued (this value is scaled according to backlog just like the indexes condition). This should help with more timely checkpointing when very large messages is used. Try evaluating byte size independently of time window also increase max size --- deps/rabbit/src/rabbit_fifo.erl | 73 +++++++++++++++++-------- deps/rabbit/src/rabbit_fifo.hrl | 5 +- deps/rabbit/src/rabbit_quorum_queue.erl | 5 +- deps/rabbit/test/quorum_queue_SUITE.erl | 2 + 4 files changed, 59 insertions(+), 26 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 7fd616245532..29740cc325da 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -932,7 +932,7 @@ which_module(5) -> ?MODULE. smallest_index :: undefined | ra:index(), messages_total :: non_neg_integer(), indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), - unused_1 = ?NIL}). + bytes_in = 0 :: non_neg_integer()}). -record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux, {name :: atom(), capacity :: term(), @@ -943,7 +943,9 @@ which_module(5) -> ?MODULE. gc = #aux_gc{} :: #aux_gc{}, tick_pid :: undefined | pid(), cache = #{} :: map(), - last_checkpoint :: #checkpoint{}}). + last_checkpoint :: #checkpoint{}, + bytes_in = 0 :: non_neg_integer(), + bytes_out = 0 :: non_neg_integer()}). init_aux(Name) when is_atom(Name) -> %% TODO: catch specific exception throw if table already exists @@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) -> last_checkpoint = #checkpoint{index = 0, timestamp = erlang:system_time(millisecond), messages_total = 0, - unused_1 = ?NIL}}. + bytes_in = 0}}. handle_aux(RaftState, Tag, Cmd, #aux{name = Name, capacity = Cap, @@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux) handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux); handle_aux(leader, cast, eval, #?AUX{last_decorators_state = LastDec, + bytes_in = BytesIn, last_checkpoint = Check0} = Aux0, RaAux) -> #?STATE{cfg = #cfg{resource = QName}} = MacState = ra_aux:machine_state(RaAux), Ts = erlang:system_time(millisecond), - {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false), + {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false), %% this is called after each batch of commands have been applied %% set timer for message expire @@ -995,11 +998,16 @@ handle_aux(leader, cast, eval, last_decorators_state = NewLast}, RaAux, Effects} end; handle_aux(_RaftState, cast, eval, - #?AUX{last_checkpoint = Check0} = Aux0, + #?AUX{last_checkpoint = Check0, + bytes_in = BytesIn} = Aux0, RaAux) -> Ts = erlang:system_time(millisecond), - {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false), + {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false), {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects}; +handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}}, + #?AUX{bytes_in = Bytes} = Aux0, + RaAux) -> + {no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []}; handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, consumer_key = Key} = Ret, Corr, Pid}, Aux0, RaAux0) -> @@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, handle_aux(_, _, garbage_collection, Aux, RaAux) -> {no_reply, force_eval_gc(RaAux, Aux), RaAux}; handle_aux(_RaState, _, force_checkpoint, - #?AUX{last_checkpoint = Check0} = Aux, RaAux) -> + #?AUX{last_checkpoint = Check0, + bytes_in = BytesIn} = Aux, RaAux) -> Ts = erlang:system_time(millisecond), #?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux), rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b", [rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]), - {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true), + {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, true), {no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects}; handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) -> #?STATE{dlx = DlxState, @@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey, apply_enqueue(#{index := RaftIdx, system_time := Ts} = Meta, From, Seq, RawMsg, Size, State0) -> - case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, [], State0) of + Effects0 = [{aux, {bytes_in, Size}}], + case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, + Effects0, State0) of {ok, State1, Effects1} -> checkout(Meta, State0, State1, Effects1); {out_of_sequence, State, Effects} -> @@ -2918,11 +2929,12 @@ priority_tag(Msg) -> end. -do_checkpoints(Ts, - #checkpoint{index = ChIdx, - timestamp = ChTime, - smallest_index = LastSmallest, - indexes = MinIndexes} = Check0, RaAux, Force) -> +do_checkpoints(Ts, #checkpoint{index = ChIdx, + timestamp = ChTime, + smallest_index = LastSmallest, + bytes_in = LastBytesIn, + indexes = MinIndexes} = Check0, + RaAux, BytesIn, Force) -> LastAppliedIdx = ra_aux:last_applied(RaAux), IndexesSince = LastAppliedIdx - ChIdx, #?STATE{} = MacState = ra_aux:machine_state(RaAux), @@ -2934,21 +2946,35 @@ do_checkpoints(Ts, Smallest end, MsgsTot = messages_total(MacState), + %% more than 64MB (by default) of message data has been written to the log + %% best take a checkpoint + {CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} = persistent_term:get(quorum_queue_checkpoint_config, {?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES, ?CHECK_MAX_INDEXES}), + + %% scale the bytes limit as the backlog increases + MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes), + EnoughDataWritten = BytesIn - LastBytesIn > (?CHECK_MAX_BYTES * MaxBytesFactor), EnoughTimeHasPassed = TimeSince > CheckMinInterval, - %% enough time has passed and enough indexes have been committed - case (IndexesSince > MinIndexes andalso - EnoughTimeHasPassed) orelse - %% the queue is empty and some commands have been - %% applied since the last checkpoint - (MsgsTot == 0 andalso - IndexesSince > CheckMinIndexes andalso - EnoughTimeHasPassed) orelse - Force of + case (EnoughTimeHasPassed andalso + ( + %% condition 1: enough indexes have been committed since the last + %% checkpoint + (IndexesSince > MinIndexes) orelse + %% condition 2: the queue is empty and _some_ commands + %% have been applied since the last checkpoint + (MsgsTot == 0 andalso IndexesSince > 32) + ) + ) orelse + %% condition 3: enough message data has been written to warrant a new + %% checkpoint, this ignores the time windowing + EnoughDataWritten orelse + %% force was requested, e.g. after a purge + Force + of true -> %% take fewer checkpoints the more messages there are on queue NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes), @@ -2957,6 +2983,7 @@ do_checkpoints(Ts, timestamp = Ts, smallest_index = NewSmallest, messages_total = MsgsTot, + bytes_in = BytesIn, indexes = NextIndexes}, [{checkpoint, LastAppliedIdx, MacState} | release_cursor(LastSmallest, NewSmallest)]}; diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index c74740149925..b8b69bff7f45 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -100,8 +100,11 @@ % represents a partially applied module call -define(CHECK_MIN_INTERVAL_MS, 1000). --define(CHECK_MIN_INDEXES, 4096). +-define(CHECK_MIN_INDEXES, 4096 * 2). -define(CHECK_MAX_INDEXES, 666_667). +%% once these many bytes have been written since the last checkpoint +%% we request a checkpoint irrespectively +-define(CHECK_MAX_BYTES, 128_000_000). -define(USE_AVG_HALF_LIFE, 10000.0). %% an average QQ without any message uses about 100KB so setting this limit diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 69dc09b97c19..156a2092fe53 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -145,8 +145,9 @@ -define(DELETE_TIMEOUT, 5000). -define(MEMBER_CHANGE_TIMEOUT, 20_000). -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 -% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra --define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384 +%% setting a low default here to allow quorum queues to better chose themselves +%% when to take a checkpoint +-define(MIN_CHECKPOINT_INTERVAL, 64). -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 6a3167bdcc51..a47ce4ec8119 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1527,6 +1527,8 @@ gh_12635(Config) -> publish_confirm(Ch0, QQ), publish_confirm(Ch0, QQ), + %% a QQ will not take checkpoints more frequently than every 1s + timer:sleep(1000), %% force a checkpoint on leader ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]), rabbit_ct_helpers:await_condition(