Skip to content

Commit 7c3942d

Browse files
133tosakarinOneSizeFitsQuorum
authored andcommitted
RATIS-2183. Detect staled snapshot request. (#1173)
1 parent 2981c6b commit 7c3942d

File tree

4 files changed

+21
-0
lines changed

4 files changed

+21
-0
lines changed

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,8 @@ public void onNext(InstallSnapshotReplyProto reply) {
708708
LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}",
709709
reply.getResult(), getServer().getId(), getFollowerId());
710710
break;
711+
case SNAPSHOT_EXPIRED:
712+
LOG.warn("{}: Follower could not install snapshot as it is expired.", this);
711713
default:
712714
break;
713715
}

ratis-proto/src/main/proto/Raft.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ enum InstallSnapshotResult {
155155
CONF_MISMATCH = 4;
156156
SNAPSHOT_INSTALLED = 5;
157157
SNAPSHOT_UNAVAILABLE = 6;
158+
SNAPSHOT_EXPIRED = 7;
158159
}
159160

160161
message RequestVoteRequestProto {

ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class SnapshotInstallationHandler {
7171
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
7272
private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
7373
private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);
74+
/** The callId of the chunk with index 0. */
75+
private final AtomicLong chunk0CallId = new AtomicLong(-1);
7476

7577
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) {
7678
this.server = server;
@@ -176,8 +178,22 @@ private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(Ins
176178
state.setLeader(leaderId, "installSnapshot");
177179

178180
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
181+
long callId = chunk0CallId.get();
182+
// 1. leaderTerm < currentTerm will never come here
183+
// 2. leaderTerm == currentTerm && callId == request.getCallId()
184+
// means the snapshotRequest is staled with the same leader
185+
// 3. leaderTerm > currentTerm means this is a new snapshot request from a new leader,
186+
// chunk0CallId will be reset when a snapshot request with requestIndex == 0 is received .
187+
if (callId > request.getServerRequest().getCallId() && currentTerm == leaderTerm) {
188+
LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}", getMemberId(), callId,
189+
ServerStringUtils.toInstallSnapshotRequestString(request));
190+
InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
191+
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SNAPSHOT_EXPIRED);
192+
return future.thenApply(dummy -> reply);
193+
}
179194
if (snapshotChunkRequest.getRequestIndex() == 0) {
180195
nextChunkIndex.set(0);
196+
chunk0CallId.set(request.getServerRequest().getCallId());
181197
} else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) {
182198
throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get()
183199
+ "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex());
@@ -205,6 +221,7 @@ private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(Ins
205221
// re-load the state machine if this is the last chunk
206222
if (snapshotChunkRequest.getDone()) {
207223
state.reloadStateMachine(lastIncluded);
224+
chunk0CallId.set(-1);
208225
}
209226
} finally {
210227
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);

ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void run() throws InterruptedException, IOException {
150150
case SUCCESS:
151151
case SNAPSHOT_UNAVAILABLE:
152152
case ALREADY_INSTALLED:
153+
case SNAPSHOT_EXPIRED:
153154
getFollower().setAttemptedToInstallSnapshot();
154155
break;
155156
default:

0 commit comments

Comments
 (0)