Skip to content

Commit 65fd444

Browse files
authored
RATIS-1995. Prevent data loss when a storage is accidentally re-formatted. (#1261)
1 parent 282d7a1 commit 65fd444

File tree

7 files changed

+382
-27
lines changed

7 files changed

+382
-27
lines changed

ratis-proto/src/main/proto/Raft.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ message RequestVoteReplyProto {
169169
RaftRpcReplyProto serverReply = 1;
170170
uint64 term = 2;
171171
bool shouldShutdown = 3;
172+
TermIndexProto lastEntry = 4; // to determine if the voter log is empty.
172173
}
173174

174175
message CommitInfoProto {

ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
2222
import org.apache.ratis.server.raftlog.RaftLog;
2323
import org.apache.ratis.util.BiWeakValueCache;
24+
import org.apache.ratis.util.MemoizedSupplier;
2425

2526
import java.util.Comparator;
2627
import java.util.Optional;
28+
import java.util.function.Supplier;
2729

2830
/** The term and the log index defined in the Raft consensus algorithm. */
2931
public interface TermIndex extends Comparable<TermIndex> {
@@ -37,6 +39,7 @@ public interface TermIndex extends Comparable<TermIndex> {
3739
* are respectively 1 and 0 (= {@link RaftLog#LEAST_VALID_LOG_INDEX}).
3840
*/
3941
TermIndex INITIAL_VALUE = valueOf(0, RaftLog.INVALID_LOG_INDEX);
42+
TermIndex PROTO_DEFAULT = valueOf(TermIndexProto.getDefaultInstance());
4043

4144
/** An empty {@link TermIndex} array. */
4245
TermIndex[] EMPTY_ARRAY = {};
@@ -93,6 +96,8 @@ static BiWeakValueCache<Long, Long, TermIndex> getCache() {
9396

9497
private static TermIndex newTermIndex(long term, long index) {
9598
return new TermIndex() {
99+
private final Supplier<TermIndexProto> protoSupplier = MemoizedSupplier.valueOf(TermIndex.super::toProto);
100+
96101
@Override
97102
public long getTerm() {
98103
return term;
@@ -121,12 +126,22 @@ public int hashCode() {
121126
return Long.hashCode(term) ^ Long.hashCode(index);
122127
}
123128

129+
@Override
130+
public TermIndexProto toProto() {
131+
return protoSupplier.get();
132+
}
133+
124134
private String longToString(long n) {
125135
return n >= 0L ? String.valueOf(n) : "~";
126136
}
127137

128138
@Override
129139
public String toString() {
140+
if (this.equals(INITIAL_VALUE)) {
141+
return "<INITIAL_VALUE>";
142+
} else if (this.equals(PROTO_DEFAULT)) {
143+
return "<PROTO_DEFAULT>";
144+
}
130145
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
131146
}
132147
};

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java

Lines changed: 167 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import org.apache.ratis.metrics.Timekeeper;
2121
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
2222
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
23+
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
24+
import org.apache.ratis.protocol.RaftGroupMemberId;
2325
import org.apache.ratis.protocol.RaftPeer;
2426
import org.apache.ratis.protocol.RaftPeerId;
25-
import org.apache.ratis.server.DivisionInfo;
2627
import org.apache.ratis.server.RaftConfiguration;
2728
import org.apache.ratis.server.RaftServerConfigKeys;
2829
import org.apache.ratis.server.protocol.TermIndex;
30+
import org.apache.ratis.server.raftlog.RaftLog;
2931
import org.apache.ratis.server.util.ServerStringUtils;
3032
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
3133
import org.apache.ratis.util.Daemon;
@@ -78,6 +80,121 @@
7880
class LeaderElection implements Runnable {
7981
public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
8082

83+
interface ServerInterface {
84+
default RaftPeerId getId() {
85+
return getMemberId().getPeerId();
86+
}
87+
88+
RaftGroupMemberId getMemberId();
89+
boolean isAlive();
90+
boolean isCandidate();
91+
92+
long getCurrentTerm();
93+
long getLastCommittedIndex();
94+
TermIndex getLastEntry();
95+
96+
boolean isPreVoteEnabled();
97+
ConfAndTerm initElection(Phase phase) throws IOException;
98+
RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException;
99+
100+
void changeToLeader();
101+
void rejected(long term, ResultAndTerm result) throws IOException;
102+
void shutdown();
103+
104+
Timekeeper getLeaderElectionTimer();
105+
void onNewLeaderElectionCompletion();
106+
107+
TimeDuration getRandomElectionTimeout();
108+
ThreadGroup getThreadGroup();
109+
110+
static ServerInterface get(RaftServerImpl server) {
111+
final boolean preVote = RaftServerConfigKeys.LeaderElection.preVote(server.getRaftServer().getProperties());
112+
113+
return new ServerInterface() {
114+
@Override
115+
public RaftGroupMemberId getMemberId() {
116+
return server.getMemberId();
117+
}
118+
119+
@Override
120+
public boolean isAlive() {
121+
return server.getInfo().isAlive();
122+
}
123+
124+
@Override
125+
public boolean isCandidate() {
126+
return server.getInfo().isCandidate();
127+
}
128+
129+
@Override
130+
public long getCurrentTerm() {
131+
return server.getState().getCurrentTerm();
132+
}
133+
134+
@Override
135+
public long getLastCommittedIndex() {
136+
return server.getRaftLog().getLastCommittedIndex();
137+
}
138+
139+
@Override
140+
public TermIndex getLastEntry() {
141+
return server.getState().getLastEntry();
142+
}
143+
144+
@Override
145+
public boolean isPreVoteEnabled() {
146+
return preVote;
147+
}
148+
149+
@Override
150+
public ConfAndTerm initElection(Phase phase) throws IOException {
151+
return server.getState().initElection(phase);
152+
}
153+
154+
@Override
155+
public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException {
156+
return server.getServerRpc().requestVote(r);
157+
}
158+
159+
@Override
160+
public void changeToLeader() {
161+
server.changeToLeader();
162+
}
163+
164+
@Override
165+
public void rejected(long term, ResultAndTerm result) throws IOException {
166+
server.changeToFollowerAndPersistMetadata(term, false, result);
167+
}
168+
169+
@Override
170+
public void shutdown() {
171+
server.close();
172+
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(), false);
173+
}
174+
175+
@Override
176+
public Timekeeper getLeaderElectionTimer() {
177+
return server.getLeaderElectionMetrics().getLeaderElectionTimer();
178+
}
179+
180+
@Override
181+
public void onNewLeaderElectionCompletion() {
182+
server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
183+
}
184+
185+
@Override
186+
public TimeDuration getRandomElectionTimeout() {
187+
return server.getRandomElectionTimeout();
188+
}
189+
190+
@Override
191+
public ThreadGroup getThreadGroup() {
192+
return server.getThreadGroup();
193+
}
194+
};
195+
}
196+
}
197+
81198
private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
82199
List<Exception> exceptions) {
83200
return logAndReturn(phase, result, responses, exceptions, null);
@@ -106,7 +223,7 @@ enum Phase {
106223

107224
enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
108225

109-
private static class ResultAndTerm {
226+
static class ResultAndTerm {
110227
private final Result result;
111228
private final Long term;
112229

@@ -185,22 +302,24 @@ public String toString() {
185302
private final Daemon daemon;
186303
private final CompletableFuture<Void> stopped = new CompletableFuture<>();
187304

188-
private final RaftServerImpl server;
305+
private final ServerInterface server;
189306
private final boolean skipPreVote;
190307
private final ConfAndTerm round0;
191308

192309
LeaderElection(RaftServerImpl server, boolean force) {
310+
this(ServerInterface.get(server), force);
311+
}
312+
313+
LeaderElection(ServerInterface server, boolean force) {
193314
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()) + COUNT.incrementAndGet();
194315
this.lifeCycle = new LifeCycle(this);
195316
this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
196317
.setThreadGroup(server.getThreadGroup()).build();
197318
this.server = server;
198-
this.skipPreVote = force ||
199-
!RaftServerConfigKeys.LeaderElection.preVote(
200-
server.getRaftServer().getProperties());
319+
this.skipPreVote = force || !server.isPreVoteEnabled();
201320
try {
202321
// increase term of the candidate in advance if it's forced to election
203-
this.round0 = force ? server.getState().initElection(Phase.ELECTION) : null;
322+
this.round0 = force ? server.initElection(Phase.ELECTION) : null;
204323
} catch (IOException e) {
205324
throw new IllegalStateException(name + ": Failed to initialize election", e);
206325
}
@@ -250,7 +369,7 @@ private void runImpl() {
250369
return;
251370
}
252371

253-
try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
372+
try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionTimer())) {
254373
for (int round = 0; shouldRun(); round++) {
255374
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
256375
if (askForVotes(Phase.ELECTION, round)) {
@@ -264,29 +383,28 @@ private void runImpl() {
264383
}
265384
final LifeCycle.State state = lifeCycle.getCurrentState();
266385
if (state.isClosingOrClosed()) {
267-
LOG.info(this + ": since this is already " + state + ", safely ignore " + e);
386+
LOG.info("{}: since this is already {}, safely ignore {}", this, state, e.toString());
268387
} else {
269-
if (!server.getInfo().isAlive()) {
270-
LOG.info(this + ": since the server is not alive, safely ignore " + e);
388+
if (!server.isAlive()) {
389+
LOG.info("{}: since the server is not alive, safely ignore {}", this, e.toString());
271390
} else {
272391
LOG.error("{}: Failed, state={}", this, state, e);
273392
}
274393
shutdown();
275394
}
276395
} finally {
277396
// Update leader election completion metric(s).
278-
server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
397+
server.onNewLeaderElectionCompletion();
279398
lifeCycle.checkStateAndClose(() -> {});
280399
}
281400
}
282401

283402
private boolean shouldRun() {
284-
final DivisionInfo info = server.getInfo();
285-
return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive();
403+
return lifeCycle.getCurrentState().isRunning() && server.isCandidate() && server.isAlive();
286404
}
287405

288406
private boolean shouldRun(long electionTerm) {
289-
return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
407+
return shouldRun() && server.getCurrentTerm() == electionTerm;
290408
}
291409

292410
private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationImpl conf, long electionTerm)
@@ -299,7 +417,7 @@ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationI
299417
if (others.isEmpty()) {
300418
r = new ResultAndTerm(Result.PASSED, electionTerm);
301419
} else {
302-
final TermIndex lastEntry = server.getState().getLastEntry();
420+
final TermIndex lastEntry = server.getLastEntry();
303421
final Executor voteExecutor = new Executor(this, others.size());
304422
try {
305423
final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
@@ -322,8 +440,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
322440
}
323441
// If round0 is non-null, we have already called initElection in the constructor,
324442
// reuse round0 to avoid initElection again for the first round
325-
final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ?
326-
round0 : server.getState().initElection(phase);
443+
final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ? round0 : server.initElection(phase);
327444
electionTerm = confAndTerm.getTerm();
328445
conf = confAndTerm.getConf();
329446
}
@@ -343,15 +460,14 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
343460
return true;
344461
case NOT_IN_CONF:
345462
case SHUTDOWN:
346-
server.close();
347-
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(), false);
463+
server.shutdown();
348464
return false;
349465
case TIMEOUT:
350466
return false; // should retry
351467
case REJECTED:
352468
case DISCOVERED_A_NEW_TERM:
353-
final long term = r.maxTerm(server.getState().getCurrentTerm());
354-
server.changeToFollowerAndPersistMetadata(term, false, r);
469+
final long term = r.maxTerm(server.getCurrentTerm());
470+
server.rejected(term, r);
355471
return false;
356472
default: throw new IllegalArgumentException("Unable to process result " + r.result);
357473
}
@@ -364,7 +480,7 @@ private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
364480
for (final RaftPeer peer : others) {
365481
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
366482
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
367-
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
483+
voteExecutor.submit(() -> server.requestVote(r));
368484
submitted++;
369485
}
370486
return submitted;
@@ -390,6 +506,9 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
390506
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
391507
final boolean singleMode = conf.isSingleMode(server.getId());
392508

509+
// true iff this server does not have any commits
510+
final boolean emptyCommit = server.getLastCommittedIndex() < RaftLog.LEAST_VALID_LOG_INDEX;
511+
393512
while (waitForNum > 0 && shouldRun(electionTerm)) {
394513
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
395514
if (waitTime.isNonPositive()) {
@@ -439,7 +558,10 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
439558
// all higher priority peers have replied
440559
higherPriorityPeers.remove(replierId);
441560

442-
if (r.getServerReply().getSuccess()) {
561+
final boolean acceptVote = r.getServerReply().getSuccess()
562+
// When the commits are non-empty, do not accept votes from empty log voters.
563+
&& (emptyCommit || nonEmptyLog(r));
564+
if (acceptVote) {
443565
votedPeers.add(replierId);
444566
// If majority and all peers with higher priority have voted, candidate pass vote
445567
if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers, server.getId())) {
@@ -448,6 +570,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
448570
} else {
449571
rejectedPeers.add(replierId);
450572
if (conf.majorityRejectVotes(rejectedPeers)) {
573+
LOG.info("rejectedPeers: {}, emptyCommit? {}", rejectedPeers, emptyCommit);
451574
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
452575
}
453576
}
@@ -467,6 +590,26 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
467590
}
468591
}
469592

593+
/**
594+
* @return true if the given reply indicates that the voter has a non-empty raft log.
595+
* Note that a voter running with an old version may not include the lastEntry in the reply.
596+
* For compatibility, this method returns true for such case.
597+
*/
598+
static boolean nonEmptyLog(RequestVoteReplyProto reply) {
599+
final TermIndexProto lastEntry = reply.getLastEntry();
600+
// valid term >= 1 and valid index >= 0; therefore, (0, 0) can only be the proto default
601+
if (lastEntry.equals(TermIndexProto.getDefaultInstance())) { // default: (0,0)
602+
LOG.info("Reply missing lastEntry: {} ", ServerStringUtils.toRequestVoteReplyString(reply));
603+
return true; // accept voters with an older version
604+
}
605+
if (lastEntry.getTerm() > 0) { // when log is empty, lastEntry is (0,-1).
606+
return true; // accept voters with a non-empty log
607+
}
608+
609+
LOG.info("Replier log is empty: {} ", ServerStringUtils.toRequestVoteReplyString(reply));
610+
return false; // reject voters with an empty log
611+
}
612+
470613
@Override
471614
public String toString() {
472615
return name;

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1504,7 +1504,7 @@ private RequestVoteReplyProto requestVote(Phase phase,
15041504
shouldShutdown = true;
15051505
}
15061506
reply = toRequestVoteReplyProto(candidateId, getMemberId(),
1507-
voteGranted, state.getCurrentTerm(), shouldShutdown);
1507+
voteGranted, state.getCurrentTerm(), shouldShutdown, state.getLastEntry());
15081508
if (LOG.isInfoEnabled()) {
15091509
LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
15101510
getMemberId(), phase, toRequestVoteReplyString(reply), state);

0 commit comments

Comments
 (0)