9292 trigger_election /2 ,
9393 promote /2 ,
9494 promote /3 ,
95- promote /4 ,
9695 resign /1 ,
9796 handover /1 ,
9897 handover /2 ,
234233-type read_command () :: ? READ_COMMAND (wa_raft_acceptor :read_op ()).
235234-type status_command () :: ? STATUS_COMMAND .
236235-type trigger_election_command () :: ? TRIGGER_ELECTION_COMMAND (term_or_offset ()).
237- -type promote_command () :: ? PROMOTE_COMMAND (wa_raft_log : log_term () | next , boolean (), config () | undefined ).
236+ -type promote_command () :: ? PROMOTE_COMMAND (term_or_offset () , boolean ()).
238237-type resign_command () :: ? RESIGN_COMMAND .
239238-type adjust_membership_command () :: ? ADJUST_MEMBERSHIP_COMMAND (membership_action (), peer () | undefined , wa_raft_log :log_index () | undefined ).
240239-type snapshot_available_command () :: ? SNAPSHOT_AVAILABLE_COMMAND (string (), wa_raft_log :log_pos ()).
@@ -477,29 +476,14 @@ trigger_election(Server) ->
477476trigger_election (Server , Term ) ->
478477 gen_statem :call (Server , ? TRIGGER_ELECTION_COMMAND (Term ), ? RAFT_RPC_CALL_TIMEOUT ()).
479478
480- -spec promote (
481- Server :: gen_statem :server_ref (),
482- Term :: wa_raft_log :log_term () | next
483- ) -> ok | wa_raft :error ().
479+ % % Request the specified RAFT server to promote itself to leader of the specified term.
480+ -spec promote (Server :: gen_statem :server_ref (), Term :: term_or_offset ()) -> ok | wa_raft :error ().
484481promote (Server , Term ) ->
485482 promote (Server , Term , false ).
486483
487- -spec promote (
488- Server :: gen_statem :server_ref (),
489- Term :: wa_raft_log :log_term () | next ,
490- Force :: boolean ()
491- ) -> ok | wa_raft :error ().
484+ -spec promote (Server :: gen_statem :server_ref (), Term :: term_or_offset (), Force :: boolean ()) -> ok | wa_raft :error ().
492485promote (Server , Term , Force ) ->
493- promote (Server , Term , Force , undefined ).
494-
495- -spec promote (
496- Server :: gen_statem :server_ref (),
497- Term :: wa_raft_log :log_term () | next ,
498- Force :: boolean (),
499- Config :: config () | config_all () | undefined
500- ) -> ok | wa_raft :error ().
501- promote (Server , Term , Force , Config ) ->
502- gen_statem :call (Server , ? PROMOTE_COMMAND (Term , Force , Config ), ? RAFT_RPC_CALL_TIMEOUT ()).
486+ gen_statem :call (Server , ? PROMOTE_COMMAND (Term , Force ), ? RAFT_RPC_CALL_TIMEOUT ()).
503487
504488-spec resign (Server :: gen_statem :server_ref ()) -> ok | wa_raft :error ().
505489resign (Server ) ->
@@ -773,6 +757,10 @@ stalled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{name = Name, cur
773757 ? LOG_WARNING (" Server[~0p , term ~0p , stalled] cannot start an election." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
774758 {keep_state_and_data , {reply , From , {error , invalid_state }}};
775759
760+ stalled ({call , From }, ? PROMOTE_COMMAND (_ , _ ), # raft_state {name = Name , current_term = CurrentTerm }) ->
761+ ? LOG_WARNING (" Server[~0p , term ~0p , stalled] cannot be promoted to leader." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
762+ {keep_state_and_data , {reply , From , {error , invalid_state }}};
763+
776764stalled ({call , From }, ? BOOTSTRAP_COMMAND (# raft_log_pos {index = Index , term = Term } = Position , Config , Data ),
777765 # raft_state {name = Name , self = Self , data_dir = PartitionPath , storage = Storage , current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
778766 case LastApplied =:= 0 of
@@ -882,36 +870,28 @@ leader(enter, PreviousStateName, #raft_state{name = Name, self = Self, current_t
882870 State1 = enter_state (? FUNCTION_NAME , State0 ),
883871 State2 = set_leader (? FUNCTION_NAME , Self , State1 ),
884872
885- % During promotion, we may add a config update operation to the end of the log.
886- % If there is such an operation and it has the same term number as the current term,
887- % then make sure to set the start of the current term so that it gets replicated
888- % immediately.
873+ % Attempt to refresh the label state as necessary for new log entries
889874 LastLogIndex = wa_raft_log :last_index (View0 ),
890- State5 = case wa_raft_log :get (View0 , LastLogIndex ) of
891- {ok , {CurrentTerm , {_Ref , {config , _NewConfig }}}} ->
892- State2 # raft_state {first_current_term_log_index = LastLogIndex , last_label = undefined };
893- {ok , {CurrentTerm , {_Ref , LastLabel , {config , _NewConfig }}}} ->
894- State2 # raft_state {first_current_term_log_index = LastLogIndex , last_label = LastLabel };
895- {ok , LastLogEntry } ->
896- % Otherwise, the server should add a new log entry to start the current term.
897- % This will flush pending commits, clear out and log mismatches on replicas,
898- % and establish a quorum as soon as possible.
899- State3 = case LastLogEntry of
900- {_CurrentTerm , {_Ref , LastLabel , _Command }} ->
901- State2 # raft_state {last_label = LastLabel };
902- {_ , undefined } ->
903- % The RAFT log could have been reset (i.e. after snapshot installation).
904- % In such case load the log label state from storage.
905- LastLabel = load_label_state (State2 ),
906- State2 # raft_state {last_label = LastLabel };
907- _ ->
908- State2 # raft_state {last_label = undefined }
909- end ,
910- {State4 , LogEntry } = get_log_entry (State3 , {make_ref (), noop }),
911- {ok , NewLastLogIndex , View1 } = wa_raft_log :append (View0 , [LogEntry ]),
912- State4 # raft_state {log_view = View1 , first_current_term_log_index = NewLastLogIndex }
875+ State3 = case wa_raft_log :get (View0 , LastLogIndex ) of
876+ {ok , {_Term , {_Key , LastLabel , _Command }}} ->
877+ State2 # raft_state {last_label = LastLabel };
878+ {ok , {_Term , undefined }} ->
879+ % The RAFT log could have been reset (i.e. after snapshot installation).
880+ % In such case load the log label state from storage.
881+ LastLabel = load_label_state (State2 ),
882+ State2 # raft_state {last_label = LastLabel };
883+ {ok , _Entry } ->
884+ State2 # raft_state {last_label = undefined }
913885 end ,
914886
887+ % At the start of a new term, the leader should append a new log
888+ % entry that will start the process of establishing the first
889+ % quorum in the new term by starting replication and clearing out
890+ % any log mismatches on follower replicas.
891+ {State4 , LogEntry } = get_log_entry (State3 , {make_ref (), noop }),
892+ {ok , NewLastLogIndex , View1 } = wa_raft_log :append (View0 , [LogEntry ]),
893+ State5 = State4 # raft_state {log_view = View1 , first_current_term_log_index = NewLastLogIndex },
894+
915895 % Perform initial heartbeat and log entry resolution
916896 State6 = append_entries_to_followers (State5 ),
917897 State7 = apply_single_node_cluster (State6 ), % apply immediately for single node cluster
@@ -1568,9 +1548,9 @@ disabled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{name = Name, cu
15681548 ? LOG_WARNING (" Server[~0p , term ~0p , disabled] cannot start an election." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
15691549 {keep_state_and_data , {reply , From , {error , invalid_state }}};
15701550
1571- disabled ({call , From }, ? PROMOTE_COMMAND (_Term , _Force , _Config ), # raft_state {name = Name , current_term = CurrentTerm }) ->
1572- ? LOG_WARNING (" Server[~0p , term ~0p , disabled] cannot be promoted." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
1573- {keep_state_and_data , {reply , From , {error , disabled }}};
1551+ disabled ({call , From }, ? PROMOTE_COMMAND (_ , _ ), # raft_state {name = Name , current_term = CurrentTerm }) ->
1552+ ? LOG_WARNING (" Server[~0p , term ~0p , disabled] cannot be promoted to leader ." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
1553+ {keep_state_and_data , {reply , From , {error , invalid_state }}};
15741554
15751555disabled ({call , From }, ? ENABLE_COMMAND , # raft_state {name = Name , current_term = CurrentTerm } = State0 ) ->
15761556 ? LOG_NOTICE (" Server[~0p , term ~0p , disabled] re-enabling by request from ~0p by moving to stalled state." ,
@@ -1649,6 +1629,10 @@ witness({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{name = Name, cur
16491629 ? LOG_WARNING (" Server[~0p , term ~0p , witness] cannot start an election." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
16501630 {keep_state_and_data , {reply , From , {error , invalid_state }}};
16511631
1632+ witness ({call , From }, ? PROMOTE_COMMAND (_ , _ ), # raft_state {name = Name , current_term = CurrentTerm }) ->
1633+ ? LOG_WARNING (" Server[~0p , term ~0p , witness] cannot be promoted to leader." , [Name , CurrentTerm ], #{domain => [whatsapp , wa_raft ]}),
1634+ {keep_state_and_data , {reply , From , {error , invalid_state }}};
1635+
16521636witness (Type , ? SNAPSHOT_AVAILABLE_COMMAND (undefined , # raft_log_pos {index = SnapshotIndex , term = SnapshotTerm } = SnapshotPos ),
16531637 # raft_state {name = Name , current_term = CurrentTerm , last_applied = LastApplied } = State0 ) ->
16541638 case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of
@@ -1775,25 +1759,22 @@ command(
17751759 end ;
17761760
17771761% % [Promote] Non-disabled nodes check if eligible to promote and then promote to leader.
1778- command (StateName , {call , From }, ? PROMOTE_COMMAND (RawTerm , Force , ConfigAll ),
1779- # raft_state {application = App , name = Name , log_view = View0 , current_term = CurrentTerm , leader_heartbeat_ts = HeartbeatTs , leader_id = LeaderId } = State0 ) when StateName =/= disabled ->
1780- ElectionWeight = ? RAFT_ELECTION_WEIGHT (App ),
1781- Term = case RawTerm of
1782- next -> CurrentTerm + 1 ;
1783- _ when is_integer (RawTerm ) -> RawTerm
1784- end ,
1785- Config = case ConfigAll of
1786- undefined -> undefined ;
1787- _ -> maybe_upgrade_config (ConfigAll )
1788- end ,
1789- SavedConfig = config (State0 ),
1790- SavedMembership = get_config_members (SavedConfig ),
1791- OverrideMembership = case Config of
1792- undefined -> [];
1793- _ -> get_config_members (Config )
1794- end ,
1762+ command (
1763+ StateName ,
1764+ {call , From },
1765+ ? PROMOTE_COMMAND (TermOrOffset , Force ),
1766+ # raft_state {application = App , name = Name , current_term = CurrentTerm , leader_heartbeat_ts = HeartbeatTs , leader_id = LeaderId } = State
1767+ ) when StateName =/= stalled ; StateName =/= witness ; StateName =/= disabled ->
17951768 Now = erlang :monotonic_time (millisecond ),
1769+ Eligible = ? RAFT_LEADER_ELIGIBLE (App ),
17961770 HeartbeatGracePeriodMs = ? RAFT_PROMOTION_GRACE_PERIOD (App ) * 1000 ,
1771+ Term = case TermOrOffset of
1772+ current -> CurrentTerm ;
1773+ next -> CurrentTerm + 1 ;
1774+ {next , Offset } -> CurrentTerm + Offset ;
1775+ _ -> TermOrOffset
1776+ end ,
1777+ Membership = get_config_members (config (State )),
17971778 Allowed = if
17981779 % Prevent promotions to older or invalid terms
17991780 not is_integer (Term ) orelse Term < CurrentTerm ->
@@ -1804,65 +1785,42 @@ command(StateName, {call, From}, ?PROMOTE_COMMAND(RawTerm, Force, ConfigAll),
18041785 ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] refusing to promote to leader of current term already led by ~0p ." ,
18051786 [Name , CurrentTerm , StateName , LeaderId ], #{domain => [whatsapp , wa_raft ]}),
18061787 invalid_term ;
1807- ElectionWeight =:= 0 ->
1808- ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] node election weight is zero and cannot be promoted as leader." ,
1788+ % Prevent promotions that will immediately result in a resignation.
1789+ not Eligible ->
1790+ ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] cannot promote to leader as the node is ineligible." ,
18091791 [Name , CurrentTerm , StateName ], #{domain => [whatsapp , wa_raft ]}),
18101792 ineligible ;
18111793 StateName =:= witness ->
18121794 ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] cannot promote a witness node." ,
18131795 [Name , CurrentTerm , StateName ], #{domain => [whatsapp , wa_raft ]}),
18141796 invalid_state ;
18151797 % Prevent promotions to any operational state when there is no cluster membership configuration.
1816- SavedMembership =:= [] andalso OverrideMembership =:= [] ->
1817- ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] cannot promote with neither existing nor forced config having configured membership." ,
1798+ Membership =:= [] ->
1799+ ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] cannot promote to leader with no existing membership." ,
18181800 [Name , CurrentTerm , StateName ], #{domain => [whatsapp , wa_raft ]}),
18191801 invalid_configuration ;
18201802 Force ->
18211803 true ;
1822- Config =/= undefined ->
1823- ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] cannot forcibly apply a configuration when doing a non-forced promotion." ,
1824- [Name , CurrentTerm , StateName ], #{domain => [whatsapp , wa_raft ]}),
1825- not_forced ;
1826- StateName =:= stalled ->
1827- ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] cannot promote a stalled node." ,
1828- [Name , CurrentTerm , StateName ], #{domain => [whatsapp , wa_raft ]}),
1829- invalid_state ;
18301804 HeartbeatTs =:= undefined ->
18311805 true ;
18321806 Now - HeartbeatTs >= HeartbeatGracePeriodMs ->
18331807 true ;
18341808 true ->
1809+ ? LOG_ERROR (" Server[~0p , term ~0p , ~0p ] rejecting request to promote to leader as a valid heartbeat was recently received." ,
1810+ [Name , CurrentTerm , StateName ], #{domain => [whatsapp , wa_raft ]}),
18351811 rejected
18361812 end ,
18371813 case Allowed of
18381814 true ->
1839- NewStateName = case Force of
1840- true -> leader ;
1841- false -> candidate
1842- end ,
1843- ? LOG_NOTICE (" Server[~0p , term ~0p , ~0p ] promoted to ~p of term ~p ." ,
1844- [Name , CurrentTerm , StateName , NewStateName , Term ], #{domain => [whatsapp , wa_raft ]}),
1845-
1846- % Advance to the term requested for promotion to
1847- State1 = case Term > CurrentTerm of
1848- true -> advance_term (StateName , Term , undefined , State0 );
1849- false -> State0
1850- end ,
1851- State3 = case Config of
1852- undefined ->
1853- State1 ;
1854- _ ->
1855- % If this promotion is part of the bootstrapping operation, then we must append
1856- % the new configuration to the log before we can transition to leader; otherwise,
1857- % the server will not know who to replicate to as leader.
1858- Op = {make_ref (), {config , Config }},
1859- {State2 , LogEntry } = get_log_entry (State1 , Op ),
1860- {ok , _ , View1 } = wa_raft_log :append (View0 , [LogEntry ]),
1861- State2 # raft_state {log_view = View1 }
1815+ ? LOG_NOTICE (" Server[~0p , term ~0p , ~0p ] is promoting to leader of term ~0p ." ,
1816+ [Name , CurrentTerm , StateName , Term ], #{domain => [whatsapp , wa_raft ]}),
1817+ NewState = case Term > CurrentTerm of
1818+ true -> advance_term (StateName , Term , node (), State );
1819+ false -> State
18621820 end ,
1863- case StateName =:= NewStateName of
1864- true -> {repeat_state , State3 , {reply , From , ok }};
1865- false -> {next_state , NewStateName , State3 , {reply , From , ok }}
1821+ case StateName of
1822+ leader -> {repeat_state , NewState , {reply , From , ok }};
1823+ _ -> {next_state , leader , NewState , {reply , From , ok }}
18661824 end ;
18671825 Reason ->
18681826 ? LOG_NOTICE (" Server[~0p , term ~0p , ~0p ] rejected leader promotion to term ~0p due to ~0p ." ,
0 commit comments