Skip to content

Commit 59b4037

Browse files
SzyWilliamOneSizeFitsQuorum
authored andcommitted
RATIS-2242 change consistency criteria of heartbeat during appendLog (#1215)
1 parent 29fbc50 commit 59b4037

File tree

3 files changed

+130
-8
lines changed

3 files changed

+130
-8
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717
*/
1818
package org.apache.ratis.server.impl;
1919

20-
import java.util.concurrent.CountDownLatch;
2120
import org.apache.ratis.client.impl.ClientProtoUtils;
2221
import org.apache.ratis.conf.RaftProperties;
2322
import org.apache.ratis.metrics.Timekeeper;
2423
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
2524
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
2625
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
2726
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
28-
import org.apache.ratis.proto.RaftProtos.LogInfoProto;
2927
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
3028
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
3129
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
3230
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
31+
import org.apache.ratis.proto.RaftProtos.LogInfoProto;
3332
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
3433
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
3534
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
@@ -82,6 +81,8 @@
8281
import org.apache.ratis.server.RaftServerRpc;
8382
import org.apache.ratis.server.impl.LeaderElection.Phase;
8483
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
84+
import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
85+
import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
8586
import org.apache.ratis.server.leader.LeaderState;
8687
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
8788
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
@@ -112,6 +113,7 @@
112113
import org.apache.ratis.util.ProtoUtils;
113114
import org.apache.ratis.util.TimeDuration;
114115
import org.apache.ratis.util.function.CheckedSupplier;
116+
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
115117

116118
import java.io.File;
117119
import java.io.IOException;
@@ -126,6 +128,7 @@
126128
import java.util.Set;
127129
import java.util.concurrent.CompletableFuture;
128130
import java.util.concurrent.CompletionException;
131+
import java.util.concurrent.CountDownLatch;
129132
import java.util.concurrent.ExecutionException;
130133
import java.util.concurrent.ExecutorService;
131134
import java.util.concurrent.ThreadLocalRandom;
@@ -250,6 +253,7 @@ public long[] getFollowerNextIndices() {
250253
private final ThreadGroup threadGroup;
251254

252255
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
256+
private final NavigableIndices appendLogTermIndices = new NavigableIndices();
253257

254258
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
255259
throws IOException {
@@ -1621,9 +1625,15 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16211625
return reply;
16221626
});
16231627
}
1628+
16241629
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
1630+
final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries);
1631+
appendLogTermIndices.append(entriesTermIndices);
16251632
return appendLogFuture.updateAndGet(f -> f.thenCompose(
1626-
ignored -> JavaUtils.allOf(state.getLog().append(entries))));
1633+
ignored -> JavaUtils.allOf(state.getLog().append(entries))))
1634+
.whenComplete((v, e) -> {
1635+
appendLogTermIndices.removeExisting(entriesTermIndices);
1636+
});
16271637
}
16281638

16291639
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
@@ -1650,7 +1660,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
16501660
}
16511661

16521662
// Check if "previous" is contained in current state.
1653-
if (previous != null && !state.containsTermIndex(previous)) {
1663+
if (previous != null && !(appendLogTermIndices.contains(previous) || state.containsTermIndex(previous))) {
16541664
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
16551665
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
16561666
return replyNextIndex;

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -640,10 +640,12 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ
640640
}
641641

642642
@Override
643-
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
644-
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
645-
return getImplFuture(groupId)
646-
.thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(request)));
643+
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
644+
AppendEntriesRequestProto request) {
645+
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
646+
return getImplFuture(groupId)
647+
.thenCompose(impl -> JavaUtils.callAsUnchecked(
648+
() -> impl.appendEntriesAsync(request), CompletionException::new));
647649
}
648650

649651
@Override

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,121 @@
4040
import org.apache.ratis.util.TimeDuration;
4141

4242
import java.io.IOException;
43+
import java.util.ArrayList;
44+
import java.util.Collections;
4345
import java.util.List;
46+
import java.util.Map;
47+
import java.util.NavigableMap;
48+
import java.util.TreeMap;
4449
import java.util.concurrent.TimeUnit;
4550

4651
/** Server utilities for internal use. */
4752
public final class ServerImplUtils {
53+
/** The consecutive indices within the same term. */
54+
static class ConsecutiveIndices {
55+
/** Convert the given entries to a list of {@link ConsecutiveIndices} */
56+
static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) {
57+
if (entries == null || entries.isEmpty()) {
58+
return Collections.emptyList();
59+
}
60+
61+
List<ConsecutiveIndices> indices = null;
62+
63+
LogEntryProto previous = entries.get(0);
64+
long startIndex = previous.getIndex();
65+
int count = 1;
66+
67+
for (int i = 1; i < entries.size(); i++) {
68+
final LogEntryProto current = entries.get(i);
69+
// validate if the indices are consecutive
70+
Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), "index");
71+
72+
if (current.getTerm() == previous.getTerm()) {
73+
count++;
74+
} else {
75+
// validate if the terms are increasing
76+
Preconditions.assertTrue(previous.getTerm() < current.getTerm(), "term");
77+
if (indices == null) {
78+
indices = new ArrayList<>();
79+
}
80+
indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, count));
81+
82+
startIndex = current.getIndex();
83+
count = 1;
84+
}
85+
previous = current;
86+
}
87+
88+
final ConsecutiveIndices last = new ConsecutiveIndices(previous.getTerm(), startIndex, count);
89+
if (indices == null) {
90+
return Collections.singletonList(last);
91+
} else {
92+
indices.add(last);
93+
return indices;
94+
}
95+
}
96+
97+
private final long term;
98+
private final long startIndex;
99+
private final int count;
100+
101+
ConsecutiveIndices(long term, long startIndex, int count) {
102+
Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 ");
103+
this.term = term;
104+
this.startIndex = startIndex;
105+
this.count = count;
106+
}
107+
108+
long getNextIndex() {
109+
return startIndex + count;
110+
}
111+
112+
Long getTerm(long index) {
113+
final long diff = index - startIndex;
114+
return diff < 0 || diff >= count ? null: term;
115+
}
116+
}
117+
118+
/** A data structure to support the {@link #contains(TermIndex)} method. */
119+
static class NavigableIndices {
120+
private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
121+
122+
boolean contains(TermIndex ti) {
123+
final Long term = getTerm(ti.getIndex());
124+
return term != null && term == ti.getTerm();
125+
}
126+
127+
synchronized Long getTerm(long index) {
128+
if (map.isEmpty()) {
129+
return null;
130+
}
131+
132+
final Map.Entry<Long, ConsecutiveIndices> floorEntry = map.floorEntry(index);
133+
if (floorEntry == null) {
134+
return null;
135+
}
136+
return floorEntry.getValue().getTerm(index);
137+
}
138+
139+
synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
140+
for(ConsecutiveIndices indices : entriesTermIndices) {
141+
// validate startIndex
142+
final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
143+
if (lastEntry != null) {
144+
Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex");
145+
}
146+
map.put(indices.startIndex, indices);
147+
}
148+
}
149+
150+
synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) {
151+
for(ConsecutiveIndices indices : entriesTermIndices) {
152+
final ConsecutiveIndices removed = map.remove(indices.startIndex);
153+
Preconditions.assertSame(indices, removed, "removed");
154+
}
155+
}
156+
}
157+
48158
private ServerImplUtils() {
49159
//Never constructed
50160
}

0 commit comments

Comments
 (0)