Skip to content

Commit fde8c7d

Browse files
authored
RATIS-2242 change consistency criteria of heartbeat during appendLog (#1215)
1 parent 663a44b commit fde8c7d

File tree

3 files changed

+129
-5
lines changed

3 files changed

+129
-5
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717
*/
1818
package org.apache.ratis.server.impl;
1919

20-
import java.util.concurrent.CountDownLatch;
2120
import org.apache.ratis.client.impl.ClientProtoUtils;
2221
import org.apache.ratis.conf.RaftProperties;
2322
import org.apache.ratis.metrics.Timekeeper;
2423
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
2524
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
2625
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
2726
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
28-
import org.apache.ratis.proto.RaftProtos.LogInfoProto;
2927
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
3028
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
3129
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
3230
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
31+
import org.apache.ratis.proto.RaftProtos.LogInfoProto;
3332
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
3433
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
3534
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
@@ -82,6 +81,8 @@
8281
import org.apache.ratis.server.RaftServerRpc;
8382
import org.apache.ratis.server.impl.LeaderElection.Phase;
8483
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
84+
import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
85+
import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
8586
import org.apache.ratis.server.leader.LeaderState;
8687
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
8788
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
@@ -113,6 +114,7 @@
113114
import org.apache.ratis.util.ReferenceCountedObject;
114115
import org.apache.ratis.util.TimeDuration;
115116
import org.apache.ratis.util.function.CheckedSupplier;
117+
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
116118

117119
import java.io.File;
118120
import java.io.IOException;
@@ -128,6 +130,7 @@
128130
import java.util.Set;
129131
import java.util.concurrent.CompletableFuture;
130132
import java.util.concurrent.CompletionException;
133+
import java.util.concurrent.CountDownLatch;
131134
import java.util.concurrent.ExecutionException;
132135
import java.util.concurrent.ExecutorService;
133136
import java.util.concurrent.ThreadLocalRandom;
@@ -260,6 +263,7 @@ public long[] getFollowerMatchIndices() {
260263
private final ThreadGroup threadGroup;
261264

262265
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
266+
private final NavigableIndices appendLogTermIndices = new NavigableIndices();
263267

264268
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
265269
throws IOException {
@@ -1687,10 +1691,19 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16871691
});
16881692
}
16891693
private CompletableFuture<Void> appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
1694+
final List<ConsecutiveIndices> entriesTermIndices;
1695+
try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries = entriesRef.retainAndReleaseOnClose()) {
1696+
entriesTermIndices = ConsecutiveIndices.convert(entries.get());
1697+
appendLogTermIndices.append(entriesTermIndices);
1698+
}
1699+
16901700
entriesRef.retain();
16911701
return appendLogFuture.updateAndGet(f -> f.thenCompose(
16921702
ignored -> JavaUtils.allOf(state.getLog().append(entriesRef))))
1693-
.whenComplete((v, e) -> entriesRef.release());
1703+
.whenComplete((v, e) -> {
1704+
entriesRef.release();
1705+
appendLogTermIndices.removeExisting(entriesTermIndices);
1706+
});
16941707
}
16951708

16961709
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
@@ -1717,7 +1730,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
17171730
}
17181731

17191732
// Check if "previous" is contained in current state.
1720-
if (previous != null && !state.containsTermIndex(previous)) {
1733+
if (previous != null && !(appendLogTermIndices.contains(previous) || state.containsTermIndex(previous))) {
17211734
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
17221735
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
17231736
return replyNextIndex;

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,8 @@ public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
653653
try {
654654
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
655655
return getImplFuture(groupId)
656-
.thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(requestRef)));
656+
.thenCompose(impl -> JavaUtils.callAsUnchecked(
657+
() -> impl.appendEntriesAsync(requestRef), CompletionException::new));
657658
} finally {
658659
requestRef.release();
659660
}

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,121 @@
4040
import org.apache.ratis.util.TimeDuration;
4141

4242
import java.io.IOException;
43+
import java.util.ArrayList;
44+
import java.util.Collections;
4345
import java.util.List;
46+
import java.util.Map;
47+
import java.util.NavigableMap;
48+
import java.util.TreeMap;
4449
import java.util.concurrent.TimeUnit;
4550

4651
/** Server utilities for internal use. */
4752
public final class ServerImplUtils {
53+
/** The consecutive indices within the same term. */
54+
static class ConsecutiveIndices {
55+
/** Convert the given entries to a list of {@link ConsecutiveIndices} */
56+
static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) {
57+
if (entries == null || entries.isEmpty()) {
58+
return Collections.emptyList();
59+
}
60+
61+
List<ConsecutiveIndices> indices = null;
62+
63+
LogEntryProto previous = entries.get(0);
64+
long startIndex = previous.getIndex();
65+
int count = 1;
66+
67+
for (int i = 1; i < entries.size(); i++) {
68+
final LogEntryProto current = entries.get(i);
69+
// validate if the indices are consecutive
70+
Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), "index");
71+
72+
if (current.getTerm() == previous.getTerm()) {
73+
count++;
74+
} else {
75+
// validate if the terms are increasing
76+
Preconditions.assertTrue(previous.getTerm() < current.getTerm(), "term");
77+
if (indices == null) {
78+
indices = new ArrayList<>();
79+
}
80+
indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, count));
81+
82+
startIndex = current.getIndex();
83+
count = 1;
84+
}
85+
previous = current;
86+
}
87+
88+
final ConsecutiveIndices last = new ConsecutiveIndices(previous.getTerm(), startIndex, count);
89+
if (indices == null) {
90+
return Collections.singletonList(last);
91+
} else {
92+
indices.add(last);
93+
return indices;
94+
}
95+
}
96+
97+
private final long term;
98+
private final long startIndex;
99+
private final int count;
100+
101+
ConsecutiveIndices(long term, long startIndex, int count) {
102+
Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 ");
103+
this.term = term;
104+
this.startIndex = startIndex;
105+
this.count = count;
106+
}
107+
108+
long getNextIndex() {
109+
return startIndex + count;
110+
}
111+
112+
Long getTerm(long index) {
113+
final long diff = index - startIndex;
114+
return diff < 0 || diff >= count ? null: term;
115+
}
116+
}
117+
118+
/** A data structure to support the {@link #contains(TermIndex)} method. */
119+
static class NavigableIndices {
120+
private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
121+
122+
boolean contains(TermIndex ti) {
123+
final Long term = getTerm(ti.getIndex());
124+
return term != null && term == ti.getTerm();
125+
}
126+
127+
synchronized Long getTerm(long index) {
128+
if (map.isEmpty()) {
129+
return null;
130+
}
131+
132+
final Map.Entry<Long, ConsecutiveIndices> floorEntry = map.floorEntry(index);
133+
if (floorEntry == null) {
134+
return null;
135+
}
136+
return floorEntry.getValue().getTerm(index);
137+
}
138+
139+
synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
140+
for(ConsecutiveIndices indices : entriesTermIndices) {
141+
// validate startIndex
142+
final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
143+
if (lastEntry != null) {
144+
Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex");
145+
}
146+
map.put(indices.startIndex, indices);
147+
}
148+
}
149+
150+
synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) {
151+
for(ConsecutiveIndices indices : entriesTermIndices) {
152+
final ConsecutiveIndices removed = map.remove(indices.startIndex);
153+
Preconditions.assertSame(indices, removed, "removed");
154+
}
155+
}
156+
}
157+
48158
private ServerImplUtils() {
49159
//Never constructed
50160
}

0 commit comments

Comments
 (0)