Skip to content

Commit 0bb4de0

Browse files
authored
RATIS-2318. Add a test to show how to manually restore a snapshot. (#1279)
1 parent d7d6a2c commit 0bb4de0

File tree

8 files changed

+225
-23
lines changed

8 files changed

+225
-23
lines changed

ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.nio.file.FileVisitResult;
3434
import java.nio.file.Files;
3535
import java.nio.file.LinkOption;
36+
import java.nio.file.NotDirectoryException;
3637
import java.nio.file.OpenOption;
3738
import java.nio.file.Path;
3839
import java.nio.file.Paths;
@@ -43,7 +44,10 @@
4344
import java.util.Arrays;
4445
import java.util.List;
4546
import java.util.Objects;
47+
import java.util.function.BiConsumer;
48+
import java.util.function.Consumer;
4649
import java.util.function.Supplier;
50+
import java.util.stream.Stream;
4751

4852
public interface FileUtils {
4953
Logger LOG = LoggerFactory.getLogger(FileUtils.class);
@@ -382,4 +386,26 @@ public FileVisitResult postVisitDirectory(Path dir, IOException e) throws IOExce
382386
}
383387
});
384388
}
389+
390+
static void listDir(File dir, Consumer<Object> out, BiConsumer<String, Throwable> err) {
391+
listDir(dir.toPath(), out, err);
392+
}
393+
394+
static void listDir(Path dir, Consumer<Object> out, BiConsumer<String, Throwable> err) {
395+
try {
396+
listDir(dir, out);
397+
} catch (IOException e) {
398+
err.accept("Failed to listDir: " + dir, e);
399+
}
400+
}
401+
402+
static void listDir(Path dir, Consumer<Object> out) throws IOException {
403+
if (!Files.isDirectory(dir, LinkOption.NOFOLLOW_LINKS)) {
404+
throw new NotDirectoryException( "Failed to listDir: " + dir + " is not a directory.");
405+
}
406+
407+
try(Stream<Path> s = Files.list(dir)) {
408+
s.forEach(out);
409+
}
410+
}
385411
}

ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ TermIndex getApplied() {
7878
int getCounter() {
7979
return counter;
8080
}
81+
82+
@Override
83+
public String toString() {
84+
return counter + "@" + applied;
85+
}
8186
}
8287

8388
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
@@ -94,11 +99,11 @@ public CounterStateMachine() {
9499
}
95100

96101
/** @return the current state. */
97-
private synchronized CounterState getState() {
102+
synchronized CounterState getState() {
98103
return new CounterState(getLastAppliedTermIndex(), counter.get());
99104
}
100105

101-
private synchronized void updateState(TermIndex applied, int counterValue) {
106+
synchronized void updateState(TermIndex applied, int counterValue) {
102107
updateLastAppliedTermIndex(applied);
103108
counter.set(counterValue);
104109
}
@@ -141,36 +146,45 @@ public void reinitialize() throws IOException {
141146
load(storage.loadLatestSnapshot());
142147
}
143148

149+
@Override
150+
public SimpleStateMachineStorage getStateMachineStorage() {
151+
return storage;
152+
}
153+
144154
/**
145155
* Store the current state as a snapshot file in the {@link #storage}.
146156
*
147157
* @return the index of the snapshot
148158
*/
149159
@Override
150-
public long takeSnapshot() {
160+
public long takeSnapshot() throws IOException {
151161
//get the current state
152162
final CounterState state = getState();
153163
final long index = state.getApplied().getIndex();
154164

155165
//create a file with a proper name to store the snapshot
156166
final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
167+
try {
168+
saveSnapshot(state, snapshotFile);
169+
} catch (Exception e) {
170+
throw new IOException("Failed to save snapshot (" + state + ") to file " + snapshotFile, e);
171+
}
157172

173+
//return the index of the stored snapshot (which is the last applied one)
174+
return index;
175+
}
176+
177+
void saveSnapshot(CounterState state, File snapshotFile) throws IOException {
158178
//write the counter value into the snapshot file
159179
try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
160180
Files.newOutputStream(snapshotFile.toPath())))) {
161181
out.writeInt(state.getCounter());
162-
} catch (IOException ioe) {
163-
LOG.warn("Failed to write snapshot file \"" + snapshotFile
164-
+ "\", last applied index=" + state.getApplied());
165182
}
166183

167184
// update storage
168185
final MD5Hash md5 = MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile);
169186
final FileInfo info = new FileInfo(snapshotFile.toPath(), md5);
170187
storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, state.getApplied()));
171-
172-
//return the index of the stored snapshot (which is the last applied one)
173-
return index;
174188
}
175189

176190
/**
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.examples.counter.server;
19+
20+
import org.apache.ratis.BaseTest;
21+
import org.apache.ratis.RaftTestUtil;
22+
import org.apache.ratis.client.RaftClient;
23+
import org.apache.ratis.examples.counter.CounterCommand;
24+
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
25+
import org.apache.ratis.protocol.Message;
26+
import org.apache.ratis.protocol.RaftClientReply;
27+
import org.apache.ratis.protocol.RaftGroup;
28+
import org.apache.ratis.protocol.RaftPeerId;
29+
import org.apache.ratis.server.RaftServer;
30+
import org.apache.ratis.server.impl.MiniRaftCluster;
31+
import org.apache.ratis.server.protocol.TermIndex;
32+
import org.apache.ratis.statemachine.SnapshotInfo;
33+
import org.apache.ratis.statemachine.StateMachine;
34+
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
35+
import org.apache.ratis.util.FileUtils;
36+
import org.apache.ratis.util.JavaUtils;
37+
import org.apache.ratis.util.TimeDuration;
38+
import org.junit.jupiter.api.Test;
39+
40+
import java.io.File;
41+
import java.util.ArrayList;
42+
import java.util.List;
43+
44+
import static org.junit.jupiter.api.Assertions.assertNotNull;
45+
import static org.junit.jupiter.api.Assertions.assertTrue;
46+
47+
/**
48+
* Test manually restoring a snapshot.
49+
* Due to hardware failures or software bugs,
50+
* the state of a state machine can become corrupted.
51+
* In such case, we may manually copy a snapshot from the leader
52+
* and then install it to the corrupted state machine.
53+
*/
54+
public class TestManualRestoreSnapshot extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
55+
public static final int NUM_SERVERS = 3;
56+
57+
{
58+
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, CounterStateMachine.class, StateMachine.class);
59+
}
60+
61+
@Test
62+
public void testManualRestoreSnapshot() throws Exception {
63+
runWithNewCluster(NUM_SERVERS, this::run);
64+
}
65+
66+
void run(MiniRaftCluster cluster) throws Exception {
67+
final RaftGroup group = cluster.getGroup();
68+
69+
// send some messages
70+
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
71+
LOG.info("Leader: {}", leader);
72+
sendMessages(cluster, 5);
73+
74+
// kill a follower
75+
final RaftServer.Division toBeKilled = cluster.getFollowers().get(0);
76+
LOG.info("Follower to be killed: {}", toBeKilled.getId());
77+
final SimpleStateMachineStorage smStorage = ((CounterStateMachine) toBeKilled.getStateMachine())
78+
.getStateMachineStorage();
79+
final File raftLogCurrentDir = toBeKilled.getRaftStorage().getStorageDir().getCurrentDir();
80+
cluster.killServer(toBeKilled.getId());
81+
82+
// send more messages
83+
sendMessages(cluster, 3);
84+
85+
// get a snapshot from the leader
86+
final CounterStateMachine leaderStateMachine = (CounterStateMachine) leader.getStateMachine();
87+
final CounterStateMachine.CounterState snapshot = leaderStateMachine.getState();
88+
LOG.info("{}: Leader {}", leader.getId(), snapshot);
89+
90+
// remove raft log from the killed follower
91+
FileUtils.listDir(raftLogCurrentDir, s -> LOG.info("{}", s), LOG::error);
92+
final String[] logFiles = raftLogCurrentDir.list((dir, name) -> name.startsWith("log"));
93+
assertNotNull(logFiles);
94+
for (String logFile : logFiles) {
95+
FileUtils.deleteFile(new File(raftLogCurrentDir, logFile));
96+
}
97+
98+
// remove the killed follower
99+
final RaftPeerId followerId = toBeKilled.getId();
100+
cluster.removeServer(followerId);
101+
102+
// save the leader snapshot to the killed follower
103+
final TermIndex applied = snapshot.getApplied();
104+
final File snapshotFile = smStorage.getSnapshotFile(applied.getTerm(), applied.getIndex());
105+
final RaftServer toSaveSnapshot = cluster.putNewServer(followerId, group, false);
106+
((CounterStateMachine) toSaveSnapshot.getDivision(group.getGroupId()).getStateMachine())
107+
.saveSnapshot(snapshot, snapshotFile);
108+
109+
// start follower and verify last applied
110+
LOG.info("Restarting {}", followerId);
111+
final RaftServer.Division restartedFollower = cluster.restartServer(followerId, group, false);
112+
final StateMachine stateMachine = restartedFollower.getStateMachine();
113+
final SnapshotInfo info = stateMachine.getLatestSnapshot();
114+
LOG.info("{} restarted snapshot info {} from {}", followerId, info, stateMachine);
115+
116+
JavaUtils.attemptUntilTrue(() -> {
117+
System.out.println(cluster.printServers());
118+
final TermIndex leaderLastApplied = leaderStateMachine.getLastAppliedTermIndex();
119+
LOG.info("Leader {} last applied {}", leader.getId(), leaderLastApplied);
120+
final TermIndex followerLastApplied = stateMachine.getLastAppliedTermIndex();
121+
LOG.info("Follower {} last applied {}", followerId, followerLastApplied);
122+
return followerLastApplied.equals(leaderLastApplied);
123+
}, 10, TimeDuration.ONE_SECOND, "followerLastApplied", LOG);
124+
125+
sendMessages(cluster, 7);
126+
}
127+
128+
static void sendMessages(MiniRaftCluster cluster, int numMessages) throws Exception {
129+
final List<Message> messages = getUpdateRequests(numMessages);
130+
try(final RaftClient client = cluster.createClient()) {
131+
for (Message message : messages) {
132+
final RaftClientReply reply = client.io().send(message);
133+
assertTrue(reply.isSuccess());
134+
}
135+
}
136+
}
137+
138+
static List<Message> getUpdateRequests(int numMessages) {
139+
final List<Message> messages = new ArrayList<>();
140+
for(int i = 0; i < numMessages; i++) {
141+
messages.add(CounterCommand.INCREMENT.getMessage());
142+
}
143+
return messages;
144+
}
145+
146+
}

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,7 @@ class ServerState {
123123
// On start the leader is null, start the clock now
124124
this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime());
125125
this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
126-
127-
final LongSupplier getSnapshotIndexFromStateMachine = () -> Optional.ofNullable(stateMachine.getLatestSnapshot())
128-
.map(SnapshotInfo::getIndex)
129-
.filter(i -> i >= 0)
130-
.orElse(RaftLog.INVALID_LOG_INDEX);
131-
this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop));
126+
this.log = JavaUtils.memoize(() -> initRaftLog(() -> getSnapshotIndexFromStateMachine(stateMachine), prop));
132127
this.readRequests = new ReadRequests(prop, stateMachine);
133128
this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
134129
stateMachine, server, this, getLog().getSnapshotIndex(), prop,
@@ -154,6 +149,16 @@ RaftGroupMemberId getMemberId() {
154149
return memberId;
155150
}
156151

152+
private long getSnapshotIndexFromStateMachine(StateMachine stateMachine) {
153+
final SnapshotInfo latest = stateMachine.getLatestSnapshot();
154+
LOG.info("{}: getLatestSnapshot({}) returns {}", getMemberId(), stateMachine, latest);
155+
if (latest == null) {
156+
return RaftLog.INVALID_LOG_INDEX;
157+
}
158+
final long index = latest.getIndex();
159+
return index >= 0 ? index : RaftLog.INVALID_LOG_INDEX;
160+
}
161+
157162
void writeRaftConfiguration(LogEntryProto conf) {
158163
getStorage().writeRaftConfiguration(conf);
159164
}

ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ protected RaftLogBase(RaftGroupMemberId memberId,
8686
RaftProperties properties) {
8787
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
8888
this.memberId = memberId;
89-
long index = getSnapshotIndexFromStateMachine.getAsLong();
89+
final long index = getSnapshotIndexFromStateMachine.getAsLong();
90+
LOG.info("{}: snapshotIndexFromStateMachine = {}", name, index);
9091
this.commitIndex = new RaftLogIndex("commitIndex", index);
9192
this.snapshotIndex = new RaftLogIndex("snapshotIndex", index);
9293
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);

ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,17 @@
4242
import java.util.SortedMap;
4343
import java.util.TreeMap;
4444
import java.util.concurrent.CompletableFuture;
45+
import java.util.concurrent.atomic.AtomicInteger;
4546
import java.util.concurrent.atomic.AtomicReference;
4647

4748
/**
4849
* Base implementation for StateMachines.
4950
*/
5051
public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
5152
StateMachine.EventApi, StateMachine.LeaderEventApi, StateMachine.FollowerEventApi {
53+
private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
54+
55+
private final int id = ID_GENERATOR.incrementAndGet();
5256
private final CompletableFuture<RaftServer> server = new CompletableFuture<>();
5357
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
5458
private volatile RaftGroupId groupId;
@@ -226,7 +230,7 @@ public void close() throws IOException {
226230

227231
@Override
228232
public String toString() {
229-
return JavaUtils.getClassSimpleName(getClass()) + ":"
233+
return JavaUtils.getClassSimpleName(getClass()) + "-" + id + ":"
230234
+ (!server.isDone()? "uninitialized": getId() + ":" + groupId);
231235
}
232236
}

ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,12 @@ public SingleFileSnapshotInfo loadLatestSnapshot() {
226226
return null;
227227
}
228228
try {
229-
return updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
230-
} catch (IOException ignored) {
229+
final SingleFileSnapshotInfo latest = updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
230+
LOG.info("Latest snapshot is {} in {}", latest, dir);
231+
return latest;
232+
} catch (IOException e) {
233+
LOG.warn("Failed to updateLatestSnapshot from {}", dir, e);
234+
FileUtils.listDir(dir, s -> LOG.warn(" {}", s), LOG::error);
231235
return null;
232236
}
233237
}

ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,17 +351,19 @@ public void start() throws IOException {
351351
: JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: {}.", printServers()), 10, 10, TimeUnit.SECONDS));
352352
}
353353

354-
/**
355-
* start a stopped server again.
356-
*/
354+
public void removeServer(RaftPeerId serverId) {
355+
servers.remove(serverId);
356+
}
357+
358+
/** Restart the server with the given id. */
357359
public RaftServer.Division restartServer(RaftPeerId serverId, boolean format) throws IOException {
358360
return restartServer(serverId, group, format);
359361
}
360362

361363
public RaftServer.Division restartServer(RaftPeerId serverId, RaftGroup raftGroup, boolean format)
362364
throws IOException {
363365
killServer(serverId);
364-
servers.remove(serverId);
366+
removeServer(serverId);
365367

366368
final RaftServer proxy = putNewServer(serverId, raftGroup, format);
367369
proxy.start();

0 commit comments

Comments
 (0)