@@ -972,10 +972,8 @@ leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPE
972972 State2 = maybe_apply (State1 ),
973973 {keep_state , maybe_heartbeat (State2 ), ? HEARTBEAT_TIMEOUT (State2 )};
974974
975- % % [RequestVote RPC] We are already leader for the current term, so always decline votes (5.1, 5.2)
976- leader (_Type , ? REMOTE (Sender , ? REQUEST_VOTE (_ElectionType , _LastLogIndex , _LastLogTerm )),
977- # raft_state {} = State ) ->
978- send_rpc (Sender , ? VOTE (false ), State ),
975+ % % [RequestVote RPC] Ignore any vote requests as leadership is aleady established (5.1, 5.2)
976+ leader (_Type , ? REMOTE (_ , ? REQUEST_VOTE (_ , _ , _ )), _State ) ->
979977 keep_state_and_data ;
980978
981979% % [Vote RPC] We are already leader, so we don't need to consider any more votes (5.1)
@@ -1295,51 +1293,9 @@ follower(Type, ?REMOTE(Leader, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entrie
12951293follower (_Type , ? REMOTE (_ , ? APPEND_ENTRIES_RESPONSE (_ , _ , _ )), _State ) ->
12961294 keep_state_and_data ;
12971295
1298- % % [Follower] Handle RequestVote RPCs (5.2)
1299- % % [RequestVote] A follower with an unallocated vote should decide if the requesting candidate is eligible to receive
1300- % % its vote for the current term and affirm or reject accordingly.
1301- follower (_Type , ? REMOTE (? IDENTITY_REQUIRES_MIGRATION (_ , CandidateId ) = Candidate , ? REQUEST_VOTE (_ElectionType , CandidateIndex , CandidateTerm )),
1302- # raft_state {name = Name , log_view = View , current_term = CurrentTerm , voted_for = undefined } = State ) ->
1303- Index = wa_raft_log :last_index (View ),
1304- {ok , Term } = wa_raft_log :term (View , Index ),
1305- % Followers should only vote for candidates whose logs are at least as up-to-date as the local log.
1306- % Logs are ordered in up-to-dateness by the lexicographic order of the {Term, Index} pair of their latest entry. (5.4.1)
1307- case {CandidateTerm , CandidateIndex } >= {Term , Index } of
1308- true ->
1309- ? LOG_NOTICE (" Server[~0p , term ~0p , follower] decides to vote for candidate ~0p with up-to-date log at ~0p :~0p versus local log at ~0p :~0p ." ,
1310- [Name , CurrentTerm , Candidate , CandidateIndex , CandidateTerm , Index , Term ], #{domain => [whatsapp , wa_raft ]}),
1311- NewState = State # raft_state {voted_for = CandidateId },
1312- % Persist the vote to stable storage before responding to the vote request. (Fig. 2)
1313- wa_raft_durable_state :store (NewState ),
1314- send_rpc (Candidate , ? VOTE (true ), State ),
1315- {keep_state , NewState };
1316- false ->
1317- ? LOG_NOTICE (" Server[~0p , term ~0p , follower] refuses to vote for candidate ~0p with outdated log at ~0p :~0p versus local log at ~0p :~0p ." ,
1318- [Name , CurrentTerm , Candidate , CandidateIndex , CandidateTerm , Index , Term ], #{domain => [whatsapp , wa_raft ]}),
1319- send_rpc (Candidate , ? VOTE (false ), State ),
1320- keep_state_and_data
1321- end ;
1322- % % [RequestVote] A follower should affirm any vote requests for the candidate it already voted for in the current term.
1323- follower (_Type , ? REMOTE (? IDENTITY_REQUIRES_MIGRATION (_ , CandidateId ) = Candidate , ? REQUEST_VOTE (_ElectionType , _CandidateIndex , _CandidateTerm )),
1324- # raft_state {name = Name , current_term = CurrentTerm , voted_for = CandidateId } = State ) ->
1325- ? LOG_NOTICE (" Server[~0p , term ~0p , follower] repeating prior vote for candidate ~0p ." ,
1326- [Name , CurrentTerm , Candidate ], #{domain => [whatsapp , wa_raft ]}),
1327- send_rpc (Candidate , ? VOTE (true ), State ),
1328- keep_state_and_data ;
1329- % % [RequestVote] A follower should reject any vote requests for the candidate it did not vote for in the current term.
1330- follower (_Type , ? REMOTE (Candidate , ? REQUEST_VOTE (_ElectionType , _CandidateIndex , _CandidateTerm )),
1331- # raft_state {name = Name , current_term = CurrentTerm , voted_for = VotedFor } = State ) ->
1332- ? LOG_NOTICE (" Server[~0p , term ~0p , follower] refusing to vote for candidate ~0p after previously voting for candidate ~0p in the current term." ,
1333- [Name , CurrentTerm , Candidate , VotedFor ], #{domain => [whatsapp , wa_raft ]}),
1334- send_rpc (Candidate , ? VOTE (false ), State ),
1335- keep_state_and_data ;
1336-
1337- % % [Follower] Handle responses to RequestVote RPCs (5.2)
1338- % % [Vote] A follower should ignore any votes before it never initiated or has already lost the election for the current term.
1339- follower (_Type , ? REMOTE (Sender , ? VOTE (Voted )), # raft_state {name = Name , current_term = CurrentTerm }) ->
1340- ? LOG_WARNING (" Server[~0p , term ~0p , follower] got extra vote ~p from ~p ." ,
1341- [Name , CurrentTerm , Voted , Sender ], #{domain => [whatsapp , wa_raft ]}),
1342- keep_state_and_data ;
1296+ % % [RequestVote RPC] Handle incoming vote requests (5.2)
1297+ follower (_Type , ? REMOTE (Candidate , ? REQUEST_VOTE (_ , CandidateIndex , CandidateTerm )), State ) ->
1298+ request_vote_impl (? FUNCTION_NAME , Candidate , CandidateIndex , CandidateTerm , State );
13431299
13441300% % [Handover][Handover RPC] The leader is requesting this follower to take over leadership in a new term
13451301follower (_Type , ? REMOTE (? IDENTITY_REQUIRES_MIGRATION (_ , LeaderId ) = Sender , ? HANDOVER (Ref , PrevLogIndex , PrevLogTerm , LogEntries )),
@@ -1475,9 +1431,8 @@ candidate(Type, ?REMOTE(Sender, ?APPEND_ENTRIES(_PrevLogIndex, _PrevLogTerm, _En
14751431 [Name , CurrentTerm , Sender ], #{domain => [whatsapp , wa_raft ]}),
14761432 {next_state , follower , State , {next_event , Type , Event }};
14771433
1478- % % [RequestVote RPC] Candidate has always voted for itself, so vote false on anyone else (5.2)
1479- candidate (_Type , ? REMOTE (Sender , ? REQUEST_VOTE (_ElectionType , _LastLogIndex , _LastLogTerm )), State ) ->
1480- send_rpc (Sender , ? VOTE (false ), State ),
1434+ % % [RequestVote RPC] Candidates should ignore incoming vote requests as they always vote for themselves (5.2)
1435+ candidate (_Type , ? REMOTE (_ , ? REQUEST_VOTE (_ , _ , _ )), _State ) ->
14811436 keep_state_and_data ;
14821437
14831438% % [Vote RPC] Candidate receives an affirmative vote (5.2)
@@ -1501,10 +1456,9 @@ candidate(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, NodeId), ?VOTE(true)),
15011456 {keep_state , State1 }
15021457 end ;
15031458
1504- % % [Vote RPC] Candidate receives a negative vote (Candidate cannot become leader here. Losing
1505- % % an election does not need to convert candidate to follower.) (5.2)
1506- candidate (cast , ? REMOTE (? IDENTITY_REQUIRES_MIGRATION (_ , NodeId ), ? VOTE (false )), # raft_state {votes = Votes } = State ) ->
1507- {keep_state , State # raft_state {votes = Votes #{NodeId => false }}};
1459+ % % [Vote RPC] Candidates should ignore negative votes (5.2)
1460+ candidate (cast , ? REMOTE (_ , ? VOTE (_ )), _State ) ->
1461+ keep_state_and_data ;
15081462
15091463% % [Handover][Handover RPC] Switch to follower because current term now has a leader (5.2, 5.3)
15101464candidate (Type , ? REMOTE (_Sender , ? HANDOVER (_Ref , _PrevLogIndex , _PrevLogTerm , _LogEntries )) = Event ,
@@ -1678,45 +1632,9 @@ witness(Type, ?SNAPSHOT_AVAILABLE_COMMAND(_, #raft_log_pos{index = SnapshotIndex
16781632 keep_state_and_data
16791633 end ;
16801634
1681- % % [RequestVote RPC] A witness with an unallocated vote should decide if the requesting candidate is eligible to receive
1682- % % its vote for the current term and affirm or reject accordingly.
1683- witness (_Type , ? REMOTE (? IDENTITY_REQUIRES_MIGRATION (_ , CandidateId ) = Candidate , ? REQUEST_VOTE (_ElectionType , CandidateIndex , CandidateTerm )),
1684- # raft_state {name = Name , log_view = View , current_term = CurrentTerm , voted_for = undefined } = State ) ->
1685- Index = wa_raft_log :last_index (View ),
1686- {ok , Term } = wa_raft_log :term (View , Index ),
1687- % Witnesses should only vote for candidates whose logs are at least as up-to-date as the local log.
1688- % Logs are ordered in up-to-dateness by the lexicographic order of the {Term, Index} pair of their latest entry. (5.4.1)
1689- case {CandidateTerm , CandidateIndex } >= {Term , Index } of
1690- true ->
1691- ? LOG_NOTICE (" Server[~0p , term ~0p , witness] decides to vote for candidate ~0p with up-to-date log at ~0p :~0p versus local log at ~0p :~0p ." ,
1692- [Name , CurrentTerm , Candidate , CandidateIndex , CandidateTerm , Index , Term ], #{domain => [whatsapp , wa_raft ]}),
1693- NewState = State # raft_state {voted_for = CandidateId },
1694- % Persist the vote to stable storage before responding to the vote request. (Fig. 2)
1695- wa_raft_durable_state :store (NewState ),
1696- send_rpc (Candidate , ? VOTE (true ), State ),
1697- {keep_state , NewState };
1698- false ->
1699- ? LOG_NOTICE (" Server[~0p , term ~0p , witness] refuses to vote for candidate ~0p with outdated log at ~0p :~0p versus local log at ~0p :~0p ." ,
1700- [Name , CurrentTerm , Candidate , CandidateIndex , CandidateTerm , Index , Term ], #{domain => [whatsapp , wa_raft ]}),
1701- send_rpc (Candidate , ? VOTE (false ), State ),
1702- keep_state_and_data
1703- end ;
1704-
1705- % % [RequestVote RPC] A witness should affirm any vote requests for the candidate it already voted for in the current term.
1706- witness (_Type , ? REMOTE (? IDENTITY_REQUIRES_MIGRATION (_ , CandidateId ) = Candidate , ? REQUEST_VOTE (_ElectionType , _CandidateIndex , _CandidateTerm )),
1707- # raft_state {name = Name , current_term = CurrentTerm , voted_for = CandidateId } = State ) ->
1708- ? LOG_NOTICE (" Server[~0p , term ~0p , witness] repeating prior vote for candidate ~0p ." ,
1709- [Name , CurrentTerm , Candidate ], #{domain => [whatsapp , wa_raft ]}),
1710- send_rpc (Candidate , ? VOTE (true ), State ),
1711- keep_state_and_data ;
1712-
1713- % % [RequestVote RPC] A witness should reject any vote requests for the candidate it did not vote for in the current term.
1714- witness (_Type , ? REMOTE (Candidate , ? REQUEST_VOTE (_ElectionType , _CandidateIndex , _CandidateTerm )),
1715- # raft_state {name = Name , current_term = CurrentTerm , voted_for = VotedFor } = State ) ->
1716- ? LOG_NOTICE (" Server[~0p , term ~0p , witness] refusing to vote for candidate ~0p after previously voting for candidate ~0p in the current term." ,
1717- [Name , CurrentTerm , Candidate , VotedFor ], #{domain => [whatsapp , wa_raft ]}),
1718- send_rpc (Candidate , ? VOTE (false ), State ),
1719- keep_state_and_data ;
1635+ % % [RequestVote RPC] Handle incoming vote requests (5.2)
1636+ witness (_Type , ? REMOTE (Candidate , ? REQUEST_VOTE (_ , CandidateIndex , CandidateTerm )), State ) ->
1637+ request_vote_impl (? FUNCTION_NAME , Candidate , CandidateIndex , CandidateTerm , State );
17201638
17211639% % [Vote RPC] Witnesses should not act upon any incoming votes as they cannot become leader
17221640witness (_Type , ? REMOTE (_ , ? VOTE (_ )), _State ) ->
@@ -2492,14 +2410,23 @@ adjust_config(Action, Config, #raft_state{self = Self}) ->
24922410
24932411
24942412% %------------------------------------------------------------------------------
2495- % % RAFT Server - State Machine Implementation - Follower Methods
2413+ % % RAFT Server - State Machine Implementation - Heartbeat
24962414% %------------------------------------------------------------------------------
24972415
24982416% % Attempt to append the log entries declared by a leader in a heartbeat,
24992417% % apply committed but not yet applied log entries, trim the log, and reset
25002418% % the election timeout timer as necessary.
2501- -spec handle_heartbeat (State :: state (), Event :: gen_statem :event_type (), Leader :: # raft_identity {}, PrevLogIndex :: wa_raft_log :log_index (), PrevLogTerm :: wa_raft_log :log_term (),
2502- Entries :: [wa_raft_log :log_entry ()], CommitIndex :: wa_raft_log :log_index (), TrimIndex :: wa_raft_log :log_index (), Data :: # raft_state {}) -> gen_statem :event_handler_result (state (), # raft_state {}).
2419+ -spec handle_heartbeat (
2420+ State :: state (),
2421+ Event :: gen_statem :event_type (),
2422+ Leader :: # raft_identity {},
2423+ PrevLogIndex :: wa_raft_log :log_index (),
2424+ PrevLogTerm :: wa_raft_log :log_term (),
2425+ Entries :: [wa_raft_log :log_entry ()],
2426+ CommitIndex :: wa_raft_log :log_index (),
2427+ TrimIndex :: wa_raft_log :log_index (),
2428+ Data :: # raft_state {}
2429+ ) -> gen_statem :event_handler_result (state (), # raft_state {}).
25032430handle_heartbeat (State , Event , Leader , PrevLogIndex , PrevLogTerm , Entries , CommitIndex , TrimIndex , # raft_state {application = App , name = Name , current_term = CurrentTerm , log_view = View } = Data0 ) ->
25042431 EntryCount = length (Entries ),
25052432
@@ -2533,8 +2460,14 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi
25332460% % the previous log entry is not available locally. If an unrecoverable error
25342461% % is encountered, returns a diagnostic that can be used as a reason to
25352462% % disable the current replica.
2536- -spec append_entries (State :: state (), PrevLogIndex :: wa_raft_log :log_index (), PrevLogTerm :: wa_raft_log :log_term (), Entries :: [wa_raft_log :log_entry ()], EntryCount :: non_neg_integer (), Data :: # raft_state {}) ->
2537- {ok , Accepted :: boolean (), NewMatchIndex :: wa_raft_log :log_index (), NewData :: # raft_state {}} | {fatal , Reason :: term ()}.
2463+ -spec append_entries (
2464+ State :: state (),
2465+ PrevLogIndex :: wa_raft_log :log_index (),
2466+ PrevLogTerm :: wa_raft_log :log_term (),
2467+ Entries :: [wa_raft_log :log_entry ()],
2468+ EntryCount :: non_neg_integer (),
2469+ Data :: # raft_state {}
2470+ ) -> {ok , Accepted :: boolean (), NewMatchIndex :: wa_raft_log :log_index (), NewData :: # raft_state {}} | {fatal , Reason :: term ()}.
25382471append_entries (State , PrevLogIndex , PrevLogTerm , Entries , EntryCount , # raft_state {name = Name , log_view = View , last_applied = LastApplied , current_term = CurrentTerm , leader_id = LeaderId } = Data ) ->
25392472 % Inspect the locally stored term associated with the previous log entry to discern if
25402473 % appending the provided range of log entries is allowed.
@@ -2585,6 +2518,58 @@ append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_stat
25852518 {ok , false , wa_raft_log :last_index (View ), Data }
25862519 end .
25872520
2521+ % %------------------------------------------------------------------------------
2522+ % % RAFT Server - State Machine Implementation - Vote Requests
2523+ % %------------------------------------------------------------------------------
2524+
2525+ % % [RequestVote RPC]
2526+ -spec request_vote_impl (
2527+ State :: state (),
2528+ Candidate :: # raft_identity {},
2529+ CandidateIndex :: wa_raft_log :log_index (),
2530+ CandidateTerm :: wa_raft_log :log_term (),
2531+ Data :: # raft_state {}
2532+ ) -> gen_statem :event_handler_result (state (), # raft_state {}).
2533+ % % A replica with an available vote in the current term should allocate its vote
2534+ % % if the candidate's log is at least as up-to-date as the local log. (5.4.1)
2535+ request_vote_impl (
2536+ State ,
2537+ ? IDENTITY_REQUIRES_MIGRATION (_ , CandidateId ) = Candidate ,
2538+ CandidateIndex ,
2539+ CandidateTerm ,
2540+ # raft_state {name = Name , log_view = View , current_term = CurrentTerm , voted_for = VotedFor } = Data
2541+ ) when VotedFor =:= undefined ; VotedFor =:= CandidateId ->
2542+ Index = wa_raft_log :last_index (View ),
2543+ {ok , Term } = wa_raft_log :term (View , Index ),
2544+ case {CandidateTerm , CandidateIndex } >= {Term , Index } of
2545+ true ->
2546+ ? LOG_NOTICE (" Server[~0p , term ~0p , ~0p ] decides to vote for candidate ~0p with up-to-date log at ~0p :~0p versus local log at ~0p :~0p ." ,
2547+ [Name , CurrentTerm , State , Candidate , CandidateIndex , CandidateTerm , Index , Term ], #{domain => [whatsapp , wa_raft ]}),
2548+ case VotedFor of
2549+ undefined ->
2550+ % If this vote request causes the current replica to allocate its vote, then
2551+ % persist the vote before responding. (Fig. 2)
2552+ NewData = Data # raft_state {voted_for = CandidateId },
2553+ wa_raft_durable_state :store (NewData ),
2554+ send_rpc (Candidate , ? VOTE (true ), NewData ),
2555+ {keep_state , NewData };
2556+ CandidateId ->
2557+ % Otherwise, the vote allocation did not change, so just send the response.
2558+ send_rpc (Candidate , ? VOTE (true ), Data ),
2559+ keep_state_and_data
2560+ end ;
2561+ false ->
2562+ ? LOG_NOTICE (" Server[~0p , term ~0p , ~0p ] refuses to vote for candidate ~0p with outdated log at ~0p :~0p versus local log at ~0p :~0p ." ,
2563+ [Name , CurrentTerm , State , Candidate , CandidateIndex , CandidateTerm , Index , Term ], #{domain => [whatsapp , wa_raft ]}),
2564+ keep_state_and_data
2565+ end ;
2566+ % % A replica that was already allocated its vote to a specific candidate in the
2567+ % % current term should ignore vote requests from other candidates. (5.4.1)
2568+ request_vote_impl (State , Candidate , _ , _ , # raft_state {name = Name , current_term = CurrentTerm , voted_for = VotedFor }) ->
2569+ ? LOG_NOTICE (" Server[~0p , term ~0p , ~0p ] refusing to vote for candidate ~0p after previously voting for candidate ~0p in the current term." ,
2570+ [Name , CurrentTerm , State , Candidate , VotedFor ], #{domain => [whatsapp , wa_raft ]}),
2571+ keep_state_and_data .
2572+
25882573% %------------------------------------------------------------------------------
25892574% % RAFT Server - State Machine Implementation - Helpers
25902575% %------------------------------------------------------------------------------
0 commit comments