|
88 | 88 | snapshot_available/3, |
89 | 89 | adjust_membership/3, |
90 | 90 | adjust_membership/4, |
91 | | - promote/1, |
| 91 | + trigger_election/1, |
| 92 | + trigger_election/2, |
92 | 93 | promote/2, |
93 | 94 | promote/3, |
94 | 95 | promote/4, |
|
175 | 176 | disabled | |
176 | 177 | witness. |
177 | 178 |
|
| 179 | +-type term_or_offset() :: wa_raft_log:log_term() | current | next | {next, Offset :: pos_integer()}. |
| 180 | + |
178 | 181 | -type peer() :: {Name :: atom(), Node :: node()}. |
179 | 182 | -type membership() :: [peer()]. |
180 | 183 |
|
|
222 | 225 | -type legacy_rpc() :: ?LEGACY_RAFT_RPC(atom(), wa_raft_log:log_term(), node(), undefined | tuple()). |
223 | 226 | -type rpc_named() :: ?RAFT_NAMED_RPC(atom(), wa_raft_log:log_term(), atom(), node(), undefined | tuple()). |
224 | 227 |
|
225 | | --type command() :: commit_command() | read_command() | status_command() | promote_command() | resign_command() | adjust_membership_command() | snapshot_available_command() | |
226 | | - handover_candidates_command() | handover_command() | enable_command() | disable_command() | bootstrap_command(). |
| 228 | +-type command() :: commit_command() | read_command() | status_command() | trigger_election_command() | |
| 229 | + promote_command() | resign_command() | adjust_membership_command() | |
| 230 | + snapshot_available_command() | handover_candidates_command() | handover_command() | |
| 231 | + enable_command() | disable_command() | bootstrap_command(). |
| 232 | + |
227 | 233 | -type commit_command() :: ?COMMIT_COMMAND(wa_raft_acceptor:op()). |
228 | 234 | -type read_command() :: ?READ_COMMAND(wa_raft_acceptor:read_op()). |
229 | 235 | -type status_command() :: ?STATUS_COMMAND. |
230 | | --type promote_command() :: ?PROMOTE_COMMAND() | ?PROMOTE_COMMAND(wa_raft_log:log_term() | next, boolean(), config() | undefined). |
| 236 | +-type trigger_election_command() :: ?TRIGGER_ELECTION_COMMAND(term_or_offset()). |
| 237 | +-type promote_command() :: ?PROMOTE_COMMAND(wa_raft_log:log_term() | next, boolean(), config() | undefined). |
231 | 238 | -type resign_command() :: ?RESIGN_COMMAND. |
232 | 239 | -type adjust_membership_command() :: ?ADJUST_MEMBERSHIP_COMMAND(membership_action(), peer() | undefined, wa_raft_log:log_index() | undefined). |
233 | 240 | -type snapshot_available_command() :: ?SNAPSHOT_AVAILABLE_COMMAND(string(), wa_raft_log:log_pos()). |
@@ -460,9 +467,15 @@ adjust_membership(Server, Action, Peer) -> |
460 | 467 | adjust_membership(Server, Action, Peer, ConfigIndex) -> |
461 | 468 | gen_statem:call(Server, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_RPC_CALL_TIMEOUT()). |
462 | 469 |
|
463 | | --spec promote(Server :: gen_statem:server_ref()) -> ok. |
464 | | -promote(Server) -> |
465 | | - gen_statem:call(Server, ?PROMOTE_COMMAND(), ?RAFT_RPC_CALL_TIMEOUT()). |
| 470 | +%% Request the specified RAFT server to start an election in the next term. |
| 471 | +-spec trigger_election(Server :: gen_statem:server_ref()) -> ok | wa_raft:error(). |
| 472 | +trigger_election(Server) -> |
| 473 | + trigger_election(Server, current). |
| 474 | + |
| 475 | +%% Request the specified RAFT server to trigger a new election in the term *after* the specified term. |
| 476 | +-spec trigger_election(Server :: gen_statem:server_ref(), Term :: term_or_offset()) -> ok | wa_raft:error(). |
| 477 | +trigger_election(Server, Term) -> |
| 478 | + gen_statem:call(Server, ?TRIGGER_ELECTION_COMMAND(Term), ?RAFT_RPC_CALL_TIMEOUT()). |
466 | 479 |
|
467 | 480 | -spec promote( |
468 | 481 | Server :: gen_statem:server_ref(), |
@@ -756,6 +769,10 @@ stalled(_Type, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _PrevLogTerm, _Entr |
756 | 769 | send_rpc(Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, 0), NewState), |
757 | 770 | {keep_state, NewState}; |
758 | 771 |
|
| 772 | +stalled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{name = Name, current_term = CurrentTerm}) -> |
| 773 | + ?LOG_WARNING("Server[~0p, term ~0p, stalled] cannot start an election.", [Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}), |
| 774 | + {keep_state_and_data, {reply, From, {error, invalid_state}}}; |
| 775 | + |
759 | 776 | stalled({call, From}, ?BOOTSTRAP_COMMAND(#raft_log_pos{index = Index, term = Term} = Position, Config, Data), |
760 | 777 | #raft_state{name = Name, self = Self, data_dir = PartitionPath, storage = Storage, current_term = CurrentTerm, last_applied = LastApplied} = State0) -> |
761 | 778 | case Index > LastApplied andalso Term > 0 of |
@@ -1546,9 +1563,9 @@ disabled(_Type, ?REMOTE(_, ?APPEND_ENTRIES(_, _, _, _, _)), _State) -> |
1546 | 1563 | disabled(_Type, ?REMOTE(_, ?REQUEST_VOTE(_, _, _)), _State) -> |
1547 | 1564 | keep_state_and_data; |
1548 | 1565 |
|
1549 | | -disabled({call, From}, ?PROMOTE_COMMAND(), #raft_state{name = Name, current_term = CurrentTerm}) -> |
1550 | | - ?LOG_WARNING("Server[~0p, term ~0p, disabled] cannot promote to candidate.", [Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}), |
1551 | | - {keep_state_and_data, {reply, From, {error, rejected}}}; |
| 1566 | +disabled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{name = Name, current_term = CurrentTerm}) -> |
| 1567 | + ?LOG_WARNING("Server[~0p, term ~0p, disabled] cannot start an election.", [Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}), |
| 1568 | + {keep_state_and_data, {reply, From, {error, invalid_state}}}; |
1552 | 1569 |
|
1553 | 1570 | disabled({call, From}, ?PROMOTE_COMMAND(_Term, _Force, _Config), #raft_state{name = Name, current_term = CurrentTerm}) -> |
1554 | 1571 | ?LOG_WARNING("Server[~0p, term ~0p, disabled] cannot be promoted.", [Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}), |
@@ -1627,6 +1644,10 @@ witness(_Type, ?REMOTE(Sender, ?HANDOVER(Ref, _, _, _)), |
1627 | 1644 | witness(_Type, ?REMOTE(_, ?HANDOVER_FAILED(_)), _State) -> |
1628 | 1645 | keep_state_and_data; |
1629 | 1646 |
|
| 1647 | +witness({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{name = Name, current_term = CurrentTerm}) -> |
| 1648 | + ?LOG_WARNING("Server[~0p, term ~0p, witness] cannot start an election.", [Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}), |
| 1649 | + {keep_state_and_data, {reply, From, {error, invalid_state}}}; |
| 1650 | + |
1630 | 1651 | witness(Type, ?SNAPSHOT_AVAILABLE_COMMAND(undefined, #raft_log_pos{index = SnapshotIndex, term = SnapshotTerm} = SnapshotPos), |
1631 | 1652 | #raft_state{name = Name, current_term = CurrentTerm, last_applied = LastApplied} = State) -> |
1632 | 1653 | case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of |
@@ -1710,25 +1731,42 @@ command(StateName, {call, From}, ?STATUS_COMMAND, State) -> |
1710 | 1731 | ], |
1711 | 1732 | {keep_state_and_data, {reply, From, Status}}; |
1712 | 1733 |
|
1713 | | -%% [Promote] Promote full replica nodes to candidate which will advance to next term. |
| 1734 | +%% [Promote] Request full replica nodes to start a new election. |
1714 | 1735 | command( |
1715 | 1736 | StateName, |
1716 | 1737 | {call, From}, |
1717 | | - ?PROMOTE_COMMAND(), |
| 1738 | + ?TRIGGER_ELECTION_COMMAND(TermOrOffset), |
1718 | 1739 | #raft_state{application = App, name = Name, current_term = CurrentTerm} = State |
1719 | 1740 | ) when StateName =/= stalled, StateName =/= witness, StateName =/= disabled -> |
1720 | | - case ?RAFT_LEADER_ELIGIBLE(App) of |
| 1741 | + Term = case TermOrOffset of |
| 1742 | + current -> CurrentTerm; |
| 1743 | + next -> CurrentTerm + 1; |
| 1744 | + {next, Offset} -> CurrentTerm + Offset; |
| 1745 | + _ -> TermOrOffset |
| 1746 | + end, |
| 1747 | + case is_integer(Term) andalso Term >= CurrentTerm of |
1721 | 1748 | true -> |
1722 | | - ?LOG_NOTICE("Server[~0p, term ~0p, ~0p] is switching to candidate after promotion request.", |
1723 | | - [Name, CurrentTerm, StateName], #{domain => [whatsapp, wa_raft]}), |
1724 | | - case StateName of |
1725 | | - candidate -> {repeat_state, State, {reply, From, ok}}; |
1726 | | - _Other -> {next_state, candidate, State, {reply, From, ok}} |
| 1749 | + case ?RAFT_LEADER_ELIGIBLE(App) of |
| 1750 | + true -> |
| 1751 | + ?LOG_NOTICE("Server[~0p, term ~0p, ~0p] is switching to candidate after promotion request.", |
| 1752 | + [Name, CurrentTerm, StateName], #{domain => [whatsapp, wa_raft]}), |
| 1753 | + NewState = case Term > CurrentTerm of |
| 1754 | + true -> advance_term(StateName, Term, undefined, State); |
| 1755 | + false -> State |
| 1756 | + end, |
| 1757 | + case StateName of |
| 1758 | + candidate -> {repeat_state, NewState, {reply, From, ok}}; |
| 1759 | + _Other -> {next_state, candidate, NewState, {reply, From, ok}} |
| 1760 | + end; |
| 1761 | + false -> |
| 1762 | + ?LOG_ERROR("Server[~0p, term ~0p, ~0p] cannot be promoted as candidate while ineligible.", |
| 1763 | + [Name, CurrentTerm, StateName], #{domain => [whatsapp, wa_raft]}), |
| 1764 | + {keep_state_and_data, {reply, From, {error, ineligible}}} |
1727 | 1765 | end; |
1728 | 1766 | false -> |
1729 | | - ?LOG_ERROR("Server[~0p, term ~0p, ~0p] cannot be promoted as candidate while ineligible.", |
1730 | | - [Name, CurrentTerm, StateName], #{domain => [whatsapp, wa_raft]}), |
1731 | | - {keep_state_and_data, {reply, From, {error, ineligible}}} |
| 1767 | + ?LOG_ERROR("Server[~0p, term ~0p, ~0p] refusing to promote to current, older, or invalid term ~0p.", |
| 1768 | + [Name, CurrentTerm, StateName, Term], #{domain => [whatsapp, wa_raft]}), |
| 1769 | + {keep_state_and_data, {reply, From, {error, rejected}}} |
1732 | 1770 | end; |
1733 | 1771 |
|
1734 | 1772 | %% [Promote] Non-disabled nodes check if eligible to promote and then promote to leader. |
|
0 commit comments