Skip to content

Commit 81c714d

Browse files
authored
RATIS-2350. Fix readAfterWrite bugs. (#1311)
1 parent f971f8c commit 81c714d

File tree

6 files changed

+56
-37
lines changed

6 files changed

+56
-37
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,13 +1140,15 @@ public boolean checkLeadership() {
11401140
* @return current readIndex.
11411141
*/
11421142
CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
1143+
final long commitIndex = server.getRaftLog().getLastCommittedIndex();
11431144
final long readIndex;
1144-
if (readAfterWriteConsistentIndex != null) {
1145+
if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > commitIndex) {
11451146
readIndex = readAfterWriteConsistentIndex;
11461147
} else {
1147-
readIndex = server.getRaftLog().getLastCommittedIndex();
1148+
readIndex = commitIndex;
11481149
}
1149-
LOG.debug("readIndex={}, readAfterWriteConsistentIndex={}", readIndex, readAfterWriteConsistentIndex);
1150+
LOG.debug("readIndex={} (commitIndex={}, readAfterWriteConsistentIndex={})",
1151+
readIndex, commitIndex, readAfterWriteConsistentIndex);
11501152

11511153
// if group contains only one member, fast path
11521154
if (server.getRaftConf().isSingleton()) {

ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.ratis.conf.RaftProperties;
2121
import org.apache.ratis.protocol.exceptions.ReadException;
2222
import org.apache.ratis.server.RaftServerConfigKeys;
23-
import org.apache.ratis.statemachine.StateMachine;
23+
import org.apache.ratis.util.Preconditions;
2424
import org.apache.ratis.util.TimeDuration;
2525
import org.apache.ratis.util.TimeoutExecutor;
2626
import org.slf4j.Logger;
@@ -29,25 +29,36 @@
2929
import java.util.NavigableMap;
3030
import java.util.TreeMap;
3131
import java.util.concurrent.CompletableFuture;
32-
import java.util.function.Consumer;
32+
import java.util.function.LongConsumer;
3333

3434
/** For supporting linearizable read. */
3535
class ReadRequests {
3636
private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);
3737

3838
static class ReadIndexQueue {
3939
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
40+
/** The log index known to be applied. */
41+
private long lastAppliedIndex;
42+
/**
43+
* Map : readIndex -> appliedIndexFuture (when completes, readIndex <= appliedIndex).
44+
* Invariant: all keys > lastAppliedIndex.
45+
*/
4046
private final NavigableMap<Long, CompletableFuture<Long>> sorted = new TreeMap<>();
47+
4148
private final TimeDuration readTimeout;
4249

43-
ReadIndexQueue(TimeDuration readTimeout) {
50+
ReadIndexQueue(long lastAppliedIndex, TimeDuration readTimeout) {
51+
this.lastAppliedIndex = lastAppliedIndex;
4452
this.readTimeout = readTimeout;
4553
}
4654

4755
CompletableFuture<Long> add(long readIndex) {
4856
final CompletableFuture<Long> returned;
4957
final boolean create;
5058
synchronized (this) {
59+
if (readIndex <= lastAppliedIndex) {
60+
return CompletableFuture.completedFuture(lastAppliedIndex);
61+
}
5162
// The same as computeIfAbsent except that it also tells if a new value is created.
5263
final CompletableFuture<Long> existing = sorted.get(readIndex);
5364
create = existing == null;
@@ -79,35 +90,36 @@ private void handleTimeout(long readIndex) {
7990

8091

8192
/** Complete all the entries less than or equal to the given applied index. */
82-
synchronized void complete(Long appliedIndex) {
93+
synchronized void complete(long appliedIndex) {
94+
if (appliedIndex > lastAppliedIndex) {
95+
lastAppliedIndex = appliedIndex;
96+
} else {
97+
// appliedIndex <= lastAppliedIndex: nothing to do
98+
if (!sorted.isEmpty()) {
99+
// Assert: all keys > lastAppliedIndex.
100+
final long first = sorted.firstKey();
101+
Preconditions.assertTrue(first > lastAppliedIndex,
102+
() -> "first = " + first + " <= lastAppliedIndex = " + lastAppliedIndex);
103+
}
104+
return;
105+
}
83106
final NavigableMap<Long, CompletableFuture<Long>> headMap = sorted.headMap(appliedIndex, true);
84107
headMap.values().forEach(f -> f.complete(appliedIndex));
85108
headMap.clear();
86109
}
87110
}
88111

89112
private final ReadIndexQueue readIndexQueue;
90-
private final StateMachine stateMachine;
91113

92-
ReadRequests(RaftProperties properties, StateMachine stateMachine) {
93-
this.readIndexQueue = new ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
94-
this.stateMachine = stateMachine;
114+
ReadRequests(long appliedIndex, RaftProperties properties) {
115+
this.readIndexQueue = new ReadIndexQueue(appliedIndex, RaftServerConfigKeys.Read.timeout(properties));
95116
}
96117

97-
Consumer<Long> getAppliedIndexConsumer() {
118+
LongConsumer getAppliedIndexConsumer() {
98119
return readIndexQueue::complete;
99120
}
100121

101122
CompletableFuture<Long> waitToAdvance(long readIndex) {
102-
final long lastApplied = stateMachine.getLastAppliedTermIndex().getIndex();
103-
if (lastApplied >= readIndex) {
104-
return CompletableFuture.completedFuture(lastApplied);
105-
}
106-
final CompletableFuture<Long> f = readIndexQueue.add(readIndex);
107-
final long current = stateMachine.getLastAppliedTermIndex().getIndex();
108-
if (current > lastApplied) {
109-
readIndexQueue.complete(current);
110-
}
111-
return f;
123+
return readIndexQueue.add(readIndex);
112124
}
113125
}

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class ServerState {
125125
this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime());
126126
this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
127127
this.log = JavaUtils.memoize(() -> initRaftLog(() -> getSnapshotIndexFromStateMachine(stateMachine), prop));
128-
this.readRequests = new ReadRequests(prop, stateMachine);
128+
this.readRequests = new ReadRequests(stateMachine.getLastAppliedTermIndex().getIndex(), prop);
129129
this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
130130
stateMachine, server, this, getLog().getSnapshotIndex(), prop,
131131
this.readRequests.getAppliedIndexConsumer()));

ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.TimeUnit;
4646
import java.util.concurrent.atomic.AtomicReference;
4747
import java.util.function.Consumer;
48+
import java.util.function.LongConsumer;
4849
import java.util.stream.LongStream;
4950

5051
/**
@@ -90,12 +91,12 @@ enum State {
9091

9192
private final MemoizedSupplier<StateMachineMetrics> stateMachineMetrics;
9293

93-
private final Consumer<Long> appliedIndexConsumer;
94+
private final LongConsumer appliedIndexConsumer;
9495

9596
private volatile boolean isRemoving;
9697

9798
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
98-
ServerState serverState, long lastAppliedIndex, RaftProperties properties, Consumer<Long> appliedIndexConsumer) {
99+
ServerState serverState, long lastAppliedIndex, RaftProperties properties, LongConsumer appliedIndexConsumer) {
99100
this.name = ServerStringUtils.generateUnifiedName(serverState.getMemberId(), getClass());
100101
this.appliedIndexConsumer = appliedIndexConsumer;
101102
this.infoIndexChange = s -> LOG.info("{}: {}", name, s);

ratis-server/src/main/java/org/apache/ratis/server/impl/WriteIndexCache.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ class WriteIndexCache {
4646
.build();
4747
}
4848

49-
void add(ClientId key, CompletableFuture<Long> future) {
49+
void add(ClientId key, CompletableFuture<Long> current) {
5050
final AtomicReference<CompletableFuture<Long>> ref;
5151
try {
5252
ref = cache.get(key, AtomicReference::new);
5353
} catch (ExecutionException e) {
5454
throw new IllegalStateException(e);
5555
}
56-
ref.set(future);
56+
ref.updateAndGet(previous -> previous == null ? current
57+
: previous.thenCombine(current, Math::max));
5758
}
5859

5960
CompletableFuture<Long> getWriteIndexFuture(RaftClientRequest request) {

ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT;
4848
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast;
4949
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
50-
import static org.apache.ratis.ReadOnlyRequestTests.retrieve;
5150
import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
5251

5352
/** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE} feature. */
@@ -233,20 +232,24 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
233232
assertReplyExact(1, client.io().sendReadAfterWrite(QUERY));
234233

235234
// test asynchronous read-after-write
236-
client.async().send(INCREMENT);
235+
final CompletableFuture<RaftClientReply> writeReply = client.async().send(INCREMENT);
237236
final CompletableFuture<RaftClientReply> asyncReply = client.async().sendReadAfterWrite(QUERY);
238237

239-
for (int i = 0; i < 20; i++) {
240-
client.async().send(INCREMENT);
238+
final int n = 100;
239+
final List<Reply> writeReplies = new ArrayList<>(n);
240+
final List<Reply> readAfterWriteReplies = new ArrayList<>(n);
241+
for (int i = 0; i < n; i++) {
242+
final int count = i + 3;
243+
writeReplies.add(new Reply(count, client.async().send(INCREMENT)));
244+
readAfterWriteReplies.add(new Reply(count, client.async().sendReadAfterWrite(QUERY)));
241245
}
242246

243-
// read-after-write is more consistent than linearizable read
244-
final CompletableFuture<RaftClientReply> linearizable = client.async().sendReadOnly(QUERY);
245-
final CompletableFuture<RaftClientReply> readAfterWrite = client.async().sendReadAfterWrite(QUERY);
246-
final int r = retrieve(readAfterWrite.get());
247-
final int l = retrieve(linearizable.get());
248-
Assertions.assertTrue(r >= l, () -> "readAfterWrite = " + r + " < linearizable = " + l);
247+
for (int i = 0; i < n; i++) {
248+
writeReplies.get(i).assertExact();
249+
readAfterWriteReplies.get(i).assertAtLeast();
250+
}
249251

252+
assertReplyAtLeast(2, writeReply.join());
250253
assertReplyAtLeast(2, asyncReply.join());
251254
}
252255
}

0 commit comments

Comments
 (0)