Skip to content

Commit df43312

Browse files
Sarah Hassanfacebook-github-bot
authored andcommitted
transfer full log entries to witnesses
Reviewed By: jaher Differential Revision: D72331130 Privacy Context Container: L1179610 fbshipit-source-id: cb74b67b4ffd11f273e1bf8ae1877beeddf42b4d
1 parent 6bed0a2 commit df43312

File tree

3 files changed

+20
-105
lines changed

3 files changed

+20
-105
lines changed

src/wa_raft_log.erl

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@
3030
fold/5,
3131
fold/6,
3232

33-
fold_terms/5,
3433

3534
term/2,
3635
get/2,
3736
get/3,
3837
get/4,
39-
get_terms/3,
4038

4139
config/1
4240
]).
@@ -437,45 +435,6 @@ fold_impl(Log, First, Last, SizeLimit, Func, AccIn) ->
437435
{error, Reason}
438436
end.
439437

440-
%% Folds over the terms in the log view of raw entries from the log provider
441-
%% between the provided first and last log indices (inclusive).
442-
%% If there exists a log term between the provided first and last indices then
443-
%% the accumulator function will be called on at least that term.
444-
%% This API provides no validation of the log indices and term passed by the
445-
%% provider to the callback function.
446-
-spec fold_terms(LogOrView :: log() | view(),
447-
First :: log_index(),
448-
Last :: log_index(),
449-
Func :: fun((Index :: log_index(), Term :: log_term(), Acc) -> Acc),
450-
Acc) ->
451-
{ok, Acc} | wa_raft:error().
452-
fold_terms(#log_view{log = Log, first = LogFirst, last = LogLast}, First, Last, Func, Acc) ->
453-
fold_terms_impl(Log, max(First, LogFirst), min(Last, LogLast), Func, Acc);
454-
fold_terms(Log, First, Last, Func, Acc) ->
455-
Provider = provider(Log),
456-
LogFirst = Provider:first_index(Log),
457-
LogLast = Provider:last_index(Log),
458-
fold_terms_impl(Log, max(First, LogFirst), min(Last, LogLast), Func, Acc).
459-
460-
-spec fold_terms_impl(
461-
Log :: log(),
462-
First :: log_index(),
463-
Last :: log_index(),
464-
Func :: fun((Index :: log_index(), Term :: log_term(), Acc) -> Acc),
465-
Acc :: term()
466-
) -> {ok, Acc} | wa_raft:error().
467-
fold_terms_impl(Log, First, Last, Func, AccIn) ->
468-
?RAFT_COUNT('raft.log.fold_terms'),
469-
?RAFT_COUNTV('raft.log.fold_terms.total', Last - First + 1),
470-
Provider = provider(Log),
471-
case Provider:fold_terms(Log, First, Last, Func, AccIn) of
472-
{ok, AccOut} ->
473-
{ok, AccOut};
474-
{error, Reason} ->
475-
?RAFT_COUNT('raft.log.fold_terms.error'),
476-
{error, Reason}
477-
end.
478-
479438
%% Gets the term of entry at the provided log index. When using a log view
480439
%% this function may return 'not_found' even if the underlying log entry still
481440
%% exists if the entry is outside of the log view.
@@ -534,25 +493,6 @@ get(LogOrView, First, CountLimit, SizeLimit) ->
534493
{error, corruption}
535494
end.
536495

537-
-spec get_terms(LogOrView :: log() | view(), First :: log_index(), Limit :: non_neg_integer()) ->
538-
{ok, Terms :: [wa_raft_log:log_term()]} | wa_raft:error().
539-
get_terms(LogOrView, First, Limit) ->
540-
try
541-
fold_terms(LogOrView, First, First + Limit - 1,
542-
fun
543-
(Index, Term, {Index, Acc}) -> {Index + 1, [Term | Acc]};
544-
(_Index, _Term, {ExpectedIndex, _Acc}) -> throw({missing, ExpectedIndex})
545-
end, {First, []})
546-
of
547-
{ok, {_, TermsRev}} -> {ok, lists:reverse(TermsRev)};
548-
{error, Reason} -> {error, Reason}
549-
catch
550-
throw:{missing, Index} ->
551-
?LOG_WARNING("[~p] detected log is missing index ~p during get of ~p ~~ ~p",
552-
[log_name(LogOrView), Index, First, First + Limit - 1], #{domain => [whatsapp, wa_raft]}),
553-
{error, corruption}
554-
end.
555-
556496
-spec config(LogOrView :: log() | view()) -> {ok, Index :: log_index(), Config :: wa_raft_server:config() | undefined} | not_found.
557497
config(#log_view{config_index = undefined}) ->
558498
not_found;

src/wa_raft_log_catchup.erl

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
%% API
4343
-export([
44-
start_catchup_request/6,
44+
start_catchup_request/5,
4545
cancel_catchup_request/2,
4646
is_catching_up/2
4747
]).
@@ -81,7 +81,7 @@
8181

8282
%% An entry in the catchup request ETS table representing a request to
8383
%% trigger log catchup for a particular peer.
84-
-define(CATCHUP_REQUEST(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, Witness), {Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, Witness}).
84+
-define(CATCHUP_REQUEST(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex), {Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex}).
8585

8686
%% An entry in the catchup ETS table that indicates an in-progress log
8787
%% catchup to the specified node.
@@ -116,9 +116,9 @@ start_link(#raft_options{log_catchup_name = Name} = Options) ->
116116

117117
%% Submit a request to trigger log catchup for a particular follower starting at the index provided.
118118
-spec start_catchup_request(Catchup :: atom(), Peer :: #raft_identity{}, FollowerLastIndex :: wa_raft_log:log_index(),
119-
LeaderTerm :: wa_raft_log:log_term(), LeaderCommitIndex :: wa_raft_log:log_index(), Witness :: boolean()) -> ok.
120-
start_catchup_request(Catchup, Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, Witness) ->
121-
ets:insert(Catchup, ?CATCHUP_REQUEST(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, Witness)),
119+
LeaderTerm :: wa_raft_log:log_term(), LeaderCommitIndex :: wa_raft_log:log_index()) -> ok.
120+
start_catchup_request(Catchup, Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex) ->
121+
ets:insert(Catchup, ?CATCHUP_REQUEST(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex)),
122122
ok.
123123

124124
%% Cancel a request to trigger log catchup for a particular follower.
@@ -206,8 +206,8 @@ handle_info(timeout, #state{name = Name} = State) ->
206206
{noreply, State, ?IDLE_TIMEOUT};
207207
Requests ->
208208
% Select a random log catchup request to process.
209-
?CATCHUP_REQUEST(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, Witness) = lists:nth(rand:uniform(length(Requests)), Requests),
210-
NewState = send_logs(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, Witness, State),
209+
?CATCHUP_REQUEST(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex) = lists:nth(rand:uniform(length(Requests)), Requests),
210+
NewState = send_logs(Peer, FollowerLastIndex, LeaderTerm, LeaderCommitIndex, State),
211211
% erlint-ignore garbage_collect
212212
erlang:garbage_collect(),
213213
{noreply, NewState, ?CONTINUE_TIMEOUT}
@@ -224,8 +224,8 @@ terminate(_Reason, #state{name = Name}) ->
224224
%% Private functions - Send logs to follower
225225
%%
226226

227-
-spec send_logs(#raft_identity{}, wa_raft_log:log_index(), wa_raft_log:log_term(), wa_raft_log:log_index(), boolean(), #state{}) -> #state{}.
228-
send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, #state{name = Name, lockouts = Lockouts} = State) ->
227+
-spec send_logs(#raft_identity{}, wa_raft_log:log_index(), wa_raft_log:log_term(), wa_raft_log:log_index(), #state{}) -> #state{}.
228+
send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, #state{name = Name, lockouts = Lockouts} = State) ->
229229
StartMillis = erlang:system_time(millisecond),
230230
LockoutMillis = maps:get(Peer, Lockouts, 0),
231231
NewState = case LockoutMillis =< StartMillis of
@@ -235,7 +235,7 @@ send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, #state{nam
235235
true ->
236236
counters:add(Counters, ?COUNTER_CONCURRENT_CATCHUP, 1),
237237
ets:insert(?MODULE, ?CATCHUP_RECORD(Name, Peer)),
238-
try send_logs_impl(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, State) catch
238+
try send_logs_impl(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, State) catch
239239
T:E:S ->
240240
?RAFT_COUNT('raft.catchup.error'),
241241
?LOG_ERROR("Catchup[~p, term ~p] bulk logs transfer to ~0p failed with ~0p ~0p at ~p",
@@ -258,22 +258,15 @@ send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness, #state{nam
258258
ets:delete(?MODULE, Name),
259259
NewState.
260260

261-
-spec send_logs_impl(#raft_identity{}, wa_raft_log:log_index(), wa_raft_log:log_term(), wa_raft_log:log_index(), boolean(), #state{}) -> term().
262-
send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, Witness,
261+
-spec send_logs_impl(#raft_identity{}, wa_raft_log:log_index(), wa_raft_log:log_term(), wa_raft_log:log_index(), #state{}) -> term().
262+
send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex,
263263
#state{application = App, name = Name, self = Self, identifier = Identifier, distribution_module = DistributionModule, server_name = Server, log = Log} = State) ->
264264
PrevLogIndex = NextLogIndex - 1,
265265
{ok, PrevLogTerm} = wa_raft_log:term(Log, PrevLogIndex),
266266

267267
LogBatchEntries = ?RAFT_CATCHUP_MAX_ENTRIES_PER_BATCH(App),
268-
Entries = case Witness of
269-
false ->
270-
LogBatchBytes = ?RAFT_CATCHUP_MAX_BYTES_PER_BATCH(App),
271-
{ok, E} = wa_raft_log:get(Log, NextLogIndex, LogBatchEntries, LogBatchBytes),
272-
E;
273-
true ->
274-
{ok, T} = wa_raft_log:get_terms(Log, NextLogIndex, min(LogBatchEntries, LeaderCommitIndex - NextLogIndex + 1)),
275-
[{Term, []} || Term <- T]
276-
end,
268+
LogBatchBytes = ?RAFT_CATCHUP_MAX_BYTES_PER_BATCH(App),
269+
{ok, Entries} = wa_raft_log:get(Log, NextLogIndex, LogBatchEntries, LogBatchBytes),
277270

278271
case Entries of
279272
[] ->
@@ -287,7 +280,7 @@ send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm,
287280

288281
try wa_raft_server:parse_rpc(Self, DistributionModule:call(Dest, Identifier, Command, Timeout)) of
289282
{LeaderTerm, _, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, true, FollowerEndIndex)} ->
290-
send_logs_impl(Peer, FollowerEndIndex + 1, LeaderTerm, LeaderCommitIndex, Witness, State);
283+
send_logs_impl(Peer, FollowerEndIndex + 1, LeaderTerm, LeaderCommitIndex, State);
291284
{LeaderTerm, _, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, _FollowerEndIndex)} ->
292285
exit(append_failed);
293286
{LeaderTerm, _, Other} ->

src/wa_raft_server.erl

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,10 +2101,6 @@ max_index_to_apply(MatchIndex, LastIndex, Config) ->
21012101
to_member_list(Mapping, Default, Config) ->
21022102
[maps:get(Node, Mapping, Default) || {_, Node} <- config_membership(Config)].
21032103

2104-
-spec compute_max(Mapping :: #{node() => Value}, Default :: Value, Config :: config()) -> Max :: Value.
2105-
compute_max(Mapping, Default, Config) ->
2106-
lists:max(to_member_list(Mapping, Default, Config)).
2107-
21082104
%% Compute the quorum maximum value for the current membership given a config for
21092105
%% the values represented by the given a mapping of peers (see note on config about
21102106
%% RAFT RPC ids) to values assuming a default value for peers who are not represented
@@ -2314,8 +2310,6 @@ heartbeat(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
23142310
LastFollowerHeartbeatTs = maps:get(FollowerId, LastHeartbeatTs, undefined),
23152311
State1 = State0#raft_state{last_heartbeat_ts = LastHeartbeatTs#{FollowerId => NowTs}, leader_heartbeat_ts = NowTs},
23162312
LastIndex = wa_raft_log:last_index(View),
2317-
Config = config(State0),
2318-
Witnesses = config_witnesses(Config),
23192313
case PrevLogTermRes =:= not_found orelse IsCatchingUp of %% catching up, or prep
23202314
true ->
23212315
{ok, LastTerm} = wa_raft_log:term(View, LastIndex),
@@ -2326,20 +2320,9 @@ heartbeat(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
23262320
LastFollowerHeartbeatTs =/= undefined andalso ?RAFT_GATHER('raft.leader.heartbeat.interval_ms', erlang:monotonic_time(millisecond) - LastFollowerHeartbeatTs),
23272321
State1;
23282322
false ->
2329-
Entries =
2330-
case lists:member({Name, FollowerId}, Witnesses) of
2331-
true ->
2332-
MaxWitnessLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES_TO_WITNESS(App),
2333-
MaxMatchIndex = compute_max(MatchIndex, 0, Config),
2334-
Limit = max(0, min(MaxWitnessLogEntries, MaxMatchIndex - FollowerNextIndex + 1)),
2335-
{ok, Terms} = wa_raft_log:get_terms(View, FollowerNextIndex, Limit),
2336-
[{Term, []} || Term <- Terms];
2337-
false ->
2338-
MaxLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES(App),
2339-
MaxHeartbeatSize = ?RAFT_HEARTBEAT_MAX_BYTES(App),
2340-
{ok, Ret} = wa_raft_log:get(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize),
2341-
Ret
2342-
end,
2323+
MaxLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES(App),
2324+
MaxHeartbeatSize = ?RAFT_HEARTBEAT_MAX_BYTES(App),
2325+
{ok, Entries} = wa_raft_log:get(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize),
23432326
{ok, PrevLogTerm} = PrevLogTermRes,
23442327
?RAFT_GATHER('raft.leader.heartbeat.size', length(Entries)),
23452328
?LOG_DEBUG("Server[~0p, term ~0p, leader] heartbeat to follower ~p from ~p(~p entries). Commit index ~p",
@@ -2707,11 +2690,10 @@ request_snapshot_for_follower(FollowerId, #raft_state{application = App, name =
27072690
end.
27082691

27092692
-spec request_bulk_logs_for_follower(#raft_identity{}, wa_raft_log:log_index(), #raft_state{}) -> ok.
2710-
request_bulk_logs_for_follower(#raft_identity{node = FollowerId} = Peer, FollowerEndIndex, #raft_state{name = Name, catchup = Catchup, current_term = CurrentTerm, commit_index = CommitIndex} = State) ->
2693+
request_bulk_logs_for_follower(Peer, FollowerEndIndex, #raft_state{name = Name, catchup = Catchup, current_term = CurrentTerm, commit_index = CommitIndex}) ->
27112694
?LOG_DEBUG("Server[~0p, term ~0p, leader] requesting bulk logs catchup for follower ~0p.",
27122695
[Name, CurrentTerm, Peer], #{domain => [whatsapp, wa_raft]}),
2713-
Witness = lists:member({Name, FollowerId}, config_witnesses(config(State))),
2714-
wa_raft_log_catchup:start_catchup_request(Catchup, Peer, FollowerEndIndex, CurrentTerm, CommitIndex, Witness).
2696+
wa_raft_log_catchup:start_catchup_request(Catchup, Peer, FollowerEndIndex, CurrentTerm, CommitIndex).
27152697

27162698
-spec cancel_bulk_logs_for_follower(#raft_identity{}, #raft_state{}) -> ok.
27172699
cancel_bulk_logs_for_follower(Peer, #raft_state{catchup = Catchup}) ->

0 commit comments

Comments
 (0)