@@ -932,7 +932,7 @@ which_module(5) -> ?MODULE.
932932 smallest_index :: undefined | ra :index (),
933933 messages_total :: non_neg_integer (),
934934 indexes = ? CHECK_MIN_INDEXES :: non_neg_integer (),
935- unused_1 = ? NIL }).
935+ bytes_in = 0 :: non_neg_integer () }).
936936-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
937937-record (aux , {name :: atom (),
938938 capacity :: term (),
@@ -943,7 +943,9 @@ which_module(5) -> ?MODULE.
943943 gc = # aux_gc {} :: # aux_gc {},
944944 tick_pid :: undefined | pid (),
945945 cache = #{} :: map (),
946- last_checkpoint :: # checkpoint {}}).
946+ last_checkpoint :: # checkpoint {},
947+ bytes_in = 0 :: non_neg_integer (),
948+ bytes_out = 0 :: non_neg_integer ()}).
947949
948950init_aux (Name ) when is_atom (Name ) ->
949951 % % TODO: catch specific exception throw if table already exists
@@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) ->
956958 last_checkpoint = # checkpoint {index = 0 ,
957959 timestamp = erlang :system_time (millisecond ),
958960 messages_total = 0 ,
959- unused_1 = ? NIL }}.
961+ bytes_in = 0 }}.
960962
961963handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
962964 capacity = Cap ,
@@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
973975 handle_aux (RaftState , Tag , Cmd , AuxV3 , RaAux );
974976handle_aux (leader , cast , eval ,
975977 #? AUX {last_decorators_state = LastDec ,
978+ bytes_in = BytesIn ,
976979 last_checkpoint = Check0 } = Aux0 ,
977980 RaAux ) ->
978981 #? STATE {cfg = # cfg {resource = QName }} = MacState =
979982 ra_aux :machine_state (RaAux ),
980983
981984 Ts = erlang :system_time (millisecond ),
982- {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , false ),
985+ {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
983986
984987 % % this is called after each batch of commands have been applied
985988 % % set timer for message expire
@@ -995,11 +998,16 @@ handle_aux(leader, cast, eval,
995998 last_decorators_state = NewLast }, RaAux , Effects }
996999 end ;
9971000handle_aux (_RaftState , cast , eval ,
998- #? AUX {last_checkpoint = Check0 } = Aux0 ,
1001+ #? AUX {last_checkpoint = Check0 ,
1002+ bytes_in = BytesIn } = Aux0 ,
9991003 RaAux ) ->
10001004 Ts = erlang :system_time (millisecond ),
1001- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , false ),
1005+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
10021006 {no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects };
1007+ handle_aux (_RaftState , cast , {bytes_in , {MetaSize , BodySize }},
1008+ #? AUX {bytes_in = Bytes } = Aux0 ,
1009+ RaAux ) ->
1010+ {no_reply , Aux0 #? AUX {bytes_in = Bytes + MetaSize + BodySize }, RaAux , []};
10031011handle_aux (_RaftState , cast , {# return {msg_ids = MsgIds ,
10041012 consumer_key = Key } = Ret , Corr , Pid },
10051013 Aux0 , RaAux0 ) ->
@@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11291137handle_aux (_ , _ , garbage_collection , Aux , RaAux ) ->
11301138 {no_reply , force_eval_gc (RaAux , Aux ), RaAux };
11311139handle_aux (_RaState , _ , force_checkpoint ,
1132- #? AUX {last_checkpoint = Check0 } = Aux , RaAux ) ->
1140+ #? AUX {last_checkpoint = Check0 ,
1141+ bytes_in = BytesIn } = Aux , RaAux ) ->
11331142 Ts = erlang :system_time (millisecond ),
11341143 #? STATE {cfg = # cfg {resource = QR }} = ra_aux :machine_state (RaAux ),
11351144 rabbit_log :debug (" ~ts : rabbit_fifo: forcing checkpoint at ~b " ,
11361145 [rabbit_misc :rs (QR ), ra_aux :last_applied (RaAux )]),
1137- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , true ),
1146+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , true ),
11381147 {no_reply , Aux #? AUX {last_checkpoint = Check }, RaAux , Effects };
11391148handle_aux (RaState , _ , {dlx , _ } = Cmd , Aux0 , RaAux ) ->
11401149 #? STATE {dlx = DlxState ,
@@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
15781587apply_enqueue (#{index := RaftIdx ,
15791588 system_time := Ts } = Meta , From ,
15801589 Seq , RawMsg , Size , State0 ) ->
1581- case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size , [], State0 ) of
1590+ Effects0 = [{aux , {bytes_in , Size }}],
1591+ case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size ,
1592+ Effects0 , State0 ) of
15821593 {ok , State1 , Effects1 } ->
15831594 checkout (Meta , State0 , State1 , Effects1 );
15841595 {out_of_sequence , State , Effects } ->
@@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
29182929 end .
29192930
29202931
2921- do_checkpoints (Ts ,
2922- # checkpoint {index = ChIdx ,
2923- timestamp = ChTime ,
2924- smallest_index = LastSmallest ,
2925- indexes = MinIndexes } = Check0 , RaAux , Force ) ->
2932+ do_checkpoints (Ts , # checkpoint {index = ChIdx ,
2933+ timestamp = ChTime ,
2934+ smallest_index = LastSmallest ,
2935+ bytes_in = LastBytesIn ,
2936+ indexes = MinIndexes } = Check0 ,
2937+ RaAux , BytesIn , Force ) ->
29262938 LastAppliedIdx = ra_aux :last_applied (RaAux ),
29272939 IndexesSince = LastAppliedIdx - ChIdx ,
29282940 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
@@ -2934,21 +2946,30 @@ do_checkpoints(Ts,
29342946 Smallest
29352947 end ,
29362948 MsgsTot = messages_total (MacState ),
2949+ % % more than 64MB (by default) of message data has been written to the log
2950+ % % best take a checkpoint
2951+ EnoughDataWritten = BytesIn - LastBytesIn > ? CHECK_MAX_BYTES ,
2952+
29372953 {CheckMinInterval , CheckMinIndexes , CheckMaxIndexes } =
29382954 persistent_term :get (quorum_queue_checkpoint_config ,
29392955 {? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
29402956 ? CHECK_MAX_INDEXES }),
29412957 EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
29422958
2943- % % enough time has passed and enough indexes have been committed
2944- case (IndexesSince > MinIndexes andalso
2945- EnoughTimeHasPassed ) orelse
2946- % % the queue is empty and some commands have been
2947- % % applied since the last checkpoint
2948- (MsgsTot == 0 andalso
2949- IndexesSince > CheckMinIndexes andalso
2950- EnoughTimeHasPassed ) orelse
2951- Force of
2959+ case EnoughTimeHasPassed andalso
2960+ (
2961+ % % condition 1: enough indexes have been committed since the last
2962+ % % checkpoint
2963+ (IndexesSince > MinIndexes ) orelse
2964+ % % condition 2: the queue is empty and _some_ commands (more than 64)
2965+ % % have been applied since the last checkpoint
2966+ (MsgsTot == 0 andalso IndexesSince > 32 ) orelse
2967+ % % condition 3: enough message data has been written to warrant a new
2968+ % % checkpoint
2969+ EnoughDataWritten orelse
2970+ % % force was requested, e.g. after a purge
2971+ Force
2972+ ) of
29522973 true ->
29532974 % % take fewer checkpoints the more messages there are on queue
29542975 NextIndexes = min (max (MsgsTot , CheckMinIndexes ), CheckMaxIndexes ),
@@ -2957,6 +2978,7 @@ do_checkpoints(Ts,
29572978 timestamp = Ts ,
29582979 smallest_index = NewSmallest ,
29592980 messages_total = MsgsTot ,
2981+ bytes_in = BytesIn ,
29602982 indexes = NextIndexes },
29612983 [{checkpoint , LastAppliedIdx , MacState } |
29622984 release_cursor (LastSmallest , NewSmallest )]};
0 commit comments