Skip to content

Commit e71cb81

Browse files
Improve translog corruption detection (#47873)
Today we do not throw a `TranslogCorruptedException` in certain cases of translog corruption, such as for a corrupted checkpoint file or when an expected file (either checkpoint or translog) is completely missing or truncated. This means that `elasticsearch-shard` will not truncate the translog in those cases. This commit strengthens the translog corruption tests to corrupt and/or delete both translog and checkpoint files, and ensures that a `TranslogCorruptedException` is thrown in all cases. It also sometimes simulates a recovery after a crash while rolling the translog generation, including cases where the rolled checkpoint contains incorrect data. This backports #42980, #42744 and #44217 to 6.8. It also backports #41480 to adjust the tool to check shards regardless of whether there is a corruption marker. Co-authored-by: Henning Andersen <[email protected]>
1 parent 5bb1256 commit e71cb81

16 files changed

+457
-407
lines changed

server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedLuceneSegmentsAction.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus
3838
Lock writeLock,
3939
PrintStream printStream,
4040
boolean verbose) throws IOException {
41-
if (RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(indexDirectory) == false) {
42-
return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null);
43-
}
41+
boolean markedCorrupted = RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(indexDirectory);
4442

4543
final CheckIndex.Status status;
4644
try (CheckIndex checker = new CheckIndex(indexDirectory, writeLock)) {
@@ -55,7 +53,9 @@ public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus
5553
}
5654

5755
return status.clean
58-
? Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN_WITH_CORRUPTED_MARKER, null)
56+
? Tuple.tuple(markedCorrupted
57+
? RemoveCorruptedShardDataCommand.CleanStatus.CLEAN_WITH_CORRUPTED_MARKER
58+
: RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null)
5959
: Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CORRUPTED,
6060
"Corrupted Lucene index segments found - " + status.totLoseDocCount + " documents will be lost.");
6161
}
@@ -67,8 +67,6 @@ public void execute(Terminal terminal,
6767
Lock writeLock,
6868
PrintStream printStream,
6969
boolean verbose) throws IOException {
70-
checkCorruptMarkerFileIsPresent(indexDirectory);
71-
7270
final CheckIndex.Status status;
7371
try (CheckIndex checker = new CheckIndex(indexDirectory, writeLock)) {
7472

@@ -90,11 +88,4 @@ public void execute(Terminal terminal,
9088
}
9189
}
9290
}
93-
94-
protected void checkCorruptMarkerFileIsPresent(Directory directory) throws IOException {
95-
if (RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(directory) == false) {
96-
throw new ElasticsearchException("There is no corruption file marker");
97-
}
98-
}
99-
10091
}

server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,7 @@ private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean
511511
: new AllocateEmptyPrimaryAllocationCommand(index, id, nodeId, false));
512512

513513
terminal.println("");
514-
terminal.println("POST /_cluster/reroute'\n"
515-
+ Strings.toString(commands, true, true) + "'");
514+
terminal.println("POST /_cluster/reroute\n" + Strings.toString(commands, true, true));
516515
terminal.println("");
517516
terminal.println("You must accept the possibility of data loss by changing parameter `accept_data_loss` to `true`.");
518517
terminal.println("");

server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public long getLastModifiedTime() throws IOException {
148148
}
149149

150150
/**
151-
* Reads a single opertation from the given location.
151+
* Reads a single operation from the given location.
152152
*/
153153
Translog.Operation read(Translog.Location location) throws IOException {
154154
assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation;

server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
package org.elasticsearch.index.translog;
2121

2222
import org.apache.lucene.codecs.CodecUtil;
23+
import org.apache.lucene.index.CorruptIndexException;
24+
import org.apache.lucene.index.IndexFormatTooNewException;
25+
import org.apache.lucene.index.IndexFormatTooOldException;
2326
import org.apache.lucene.store.DataInput;
2427
import org.apache.lucene.store.DataOutput;
2528
import org.apache.lucene.store.Directory;
@@ -33,6 +36,7 @@
3336
import java.io.ByteArrayOutputStream;
3437
import java.io.IOException;
3538
import java.nio.channels.FileChannel;
39+
import java.nio.file.NoSuchFileException;
3640
import java.nio.file.OpenOption;
3741
import java.nio.file.Path;
3842

@@ -200,6 +204,8 @@ public static Checkpoint read(Path path) throws IOException {
200204
assert indexInput.length() == V3_FILE_SIZE : indexInput.length();
201205
return Checkpoint.readCheckpointV6_4_0(indexInput);
202206
}
207+
} catch (CorruptIndexException | NoSuchFileException | IndexFormatTooOldException | IndexFormatTooNewException e) {
208+
throw new TranslogCorruptedException(path.toString(), e);
203209
}
204210
}
205211
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public Translog(
215215
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
216216
boolean success = false;
217217
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
218-
try (ReleasableLock lock = writeLock.acquire()) {
218+
try (ReleasableLock ignored = writeLock.acquire()) {
219219
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
220220

221221
final long minGenerationToRecoverFrom;
@@ -228,22 +228,22 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
228228
minGenerationToRecoverFrom = checkpoint.minTranslogGeneration;
229229
}
230230

231-
final String checkpointTranslogFile = getFilename(checkpoint.generation);
232-
// we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on
231+
// we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on
233232
// the generation id we found in the lucene commit. This gives for better error messages if the wrong
234233
// translog was found.
235-
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
236-
for (long i = checkpoint.generation - 1; i >= minGenerationToRecoverFrom; i--) {
234+
for (long i = checkpoint.generation; i >= minGenerationToRecoverFrom; i--) {
237235
Path committedTranslogFile = location.resolve(getFilename(i));
238236
if (Files.exists(committedTranslogFile) == false) {
239-
throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " +
240-
minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
237+
throw new TranslogCorruptedException(committedTranslogFile.toString(),
238+
"translog file doesn't exist with generation: " + i + " recovering from: " + minGenerationToRecoverFrom
239+
+ " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
241240
}
242-
final TranslogReader reader = openReader(committedTranslogFile,
243-
Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
241+
final Checkpoint readerCheckpoint = i == checkpoint.generation ? checkpoint
242+
: Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)));
243+
final TranslogReader reader = openReader(committedTranslogFile, readerCheckpoint);
244244
assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() :
245-
"Primary terms go backwards; current term [" + primaryTermSupplier.getAsLong() + "]" +
246-
"translog path [ " + committedTranslogFile + ", existing term [" + reader.getPrimaryTerm() + "]";
245+
"Primary terms go backwards; current term [" + primaryTermSupplier.getAsLong() + "] translog path [ "
246+
+ committedTranslogFile + ", existing term [" + reader.getPrimaryTerm() + "]";
247247
foundTranslogs.add(reader);
248248
logger.debug("recovered local translog from checkpoint {}", checkpoint);
249249
}
@@ -258,8 +258,9 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
258258
if (Files.exists(commitCheckpoint)) {
259259
Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint);
260260
if (checkpoint.equals(checkpointFromDisk) == false) {
261-
throw new IllegalStateException("Checkpoint file " + commitCheckpoint.getFileName() +
262-
" already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
261+
throw new TranslogCorruptedException(commitCheckpoint.toString(),
262+
"checkpoint file " + commitCheckpoint.getFileName() + " already exists but has corrupted content: expected "
263+
+ checkpoint + " but got " + checkpointFromDisk);
263264
}
264265
} else {
265266
copyCheckpointTo(commitCheckpoint);

server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
3131
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
3232

33+
import java.io.EOFException;
3334
import java.io.IOException;
3435
import java.nio.channels.FileChannel;
3536
import java.nio.file.Path;
@@ -108,56 +109,61 @@ private static int headerSizeInBytes(int version, int uuidLength) {
108109
* Read a translog header from the given path and file channel
109110
*/
110111
static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException {
111-
// This input is intentionally not closed because closing it will close the FileChannel.
112-
final BufferedChecksumStreamInput in =
113-
new BufferedChecksumStreamInput(
112+
try {
113+
// This input is intentionally not closed because closing it will close the FileChannel.
114+
final BufferedChecksumStreamInput in =
115+
new BufferedChecksumStreamInput(
114116
new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()),
115117
path.toString());
116-
final int version;
117-
try {
118-
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
119-
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
120-
tryReportOldVersionError(path, channel);
121-
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
122-
}
123-
if (version == VERSION_CHECKSUMS) {
124-
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
125-
}
126-
// Read the translogUUID
127-
final int uuidLen = in.readInt();
128-
if (uuidLen > channel.size()) {
129-
throw new TranslogCorruptedException(
130-
path.toString(),
131-
"UUID length can't be larger than the translog");
132-
}
133-
final BytesRef uuid = new BytesRef(uuidLen);
134-
uuid.length = uuidLen;
135-
in.read(uuid.bytes, uuid.offset, uuid.length);
136-
final BytesRef expectedUUID = new BytesRef(translogUUID);
137-
if (uuid.bytesEquals(expectedUUID) == false) {
138-
throw new TranslogCorruptedException(
118+
final int version;
119+
try {
120+
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
121+
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
122+
tryReportOldVersionError(path, channel);
123+
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
124+
}
125+
if (version == VERSION_CHECKSUMS) {
126+
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
127+
}
128+
// Read the translogUUID
129+
final int uuidLen = in.readInt();
130+
if (uuidLen > channel.size()) {
131+
throw new TranslogCorruptedException(path.toString(), "UUID length can't be larger than the translog");
132+
}
133+
if (uuidLen <= 0) {
134+
throw new TranslogCorruptedException(path.toString(), "UUID length must be positive");
135+
}
136+
final BytesRef uuid = new BytesRef(uuidLen);
137+
uuid.length = uuidLen;
138+
in.read(uuid.bytes, uuid.offset, uuid.length);
139+
final BytesRef expectedUUID = new BytesRef(translogUUID);
140+
if (uuid.bytesEquals(expectedUUID) == false) {
141+
throw new TranslogCorruptedException(
139142
path.toString(),
140143
"expected shard UUID " + expectedUUID + " but got: " + uuid +
141-
" this translog file belongs to a different translog");
142-
}
143-
// Read the primary term
144-
final long primaryTerm;
145-
if (version == VERSION_PRIMARY_TERM) {
146-
primaryTerm = in.readLong();
147-
} else {
148-
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
149-
primaryTerm = UNASSIGNED_PRIMARY_TERM;
150-
}
151-
// Verify the checksum
152-
if (version >= VERSION_PRIMARY_TERM) {
153-
Translog.verifyChecksum(in);
144+
" this translog file belongs to a different translog");
145+
}
146+
// Read the primary term
147+
final long primaryTerm;
148+
if (version == VERSION_PRIMARY_TERM) {
149+
primaryTerm = in.readLong();
150+
} else {
151+
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
152+
primaryTerm = UNASSIGNED_PRIMARY_TERM;
153+
}
154+
// Verify the checksum
155+
if (version >= VERSION_PRIMARY_TERM) {
156+
Translog.verifyChecksum(in);
157+
}
158+
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";
159+
160+
final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
161+
assert channel.position() == headerSizeInBytes :
162+
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
163+
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
164+
} catch (EOFException e) {
165+
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
154166
}
155-
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";
156-
157-
final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
158-
assert channel.position() == headerSizeInBytes :
159-
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
160-
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
161167
}
162168

163169
private static void tryReportOldVersionError(final Path path, final FileChannel channel) throws IOException {

server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Translog.Operation next() throws IOException {
7676
return null;
7777
}
7878

79-
protected Translog.Operation readOperation() throws IOException {
79+
private Translog.Operation readOperation() throws IOException {
8080
final int opSize = readSize(reusableBuffer, position);
8181
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
8282
Translog.Operation op = read(reuse);
@@ -93,15 +93,19 @@ public long sizeInBytes() {
9393
* reads an operation at the given position into the given buffer.
9494
*/
9595
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
96-
if (position >= length) {
97-
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
98-
getGeneration() + "], path: [" + path + "]");
99-
}
100-
if (position < getFirstOperationOffset()) {
101-
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
102-
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
96+
try {
97+
if (position >= length) {
98+
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
99+
getGeneration() + "], path: [" + path + "]");
100+
}
101+
if (position < getFirstOperationOffset()) {
102+
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
103+
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
104+
}
105+
Channels.readFromFileChannelWithEofException(channel, position, buffer);
106+
} catch (EOFException e) {
107+
throw new TranslogCorruptedException(path.toString(), "translog truncated", e);
103108
}
104-
Channels.readFromFileChannelWithEofException(channel, position, buffer);
105109
}
106110

107111
@Override

server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec
168168

169169
private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException {
170170
// perform clean check of translog instead of corrupted marker file
171-
boolean clean = true;
172171
try {
173172
final Path translogPath = shardPath.resolveTranslog();
174173
final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
@@ -184,18 +183,19 @@ private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws
184183
try (Translog translog = new Translog(translogConfig, translogUUID,
185184
translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm);
186185
Translog.Snapshot snapshot = translog.newSnapshot()) {
186+
//noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot
187187
while (snapshot.next() != null) {
188-
// just iterate over snapshot
189188
}
190189
}
190+
return true;
191191
} catch (TranslogCorruptedException e) {
192-
clean = false;
192+
return false;
193193
}
194-
return clean;
195194
}
196195

197196
/** Write a checkpoint file to the given location with the given generation */
198-
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
197+
private static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint)
198+
throws IOException {
199199
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
200200
globalCheckpoint, translogGeneration);
201201
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
@@ -234,7 +234,7 @@ private String deletingFilesDetails(Path translogPath, Set<Path> files) {
234234
}
235235

236236
/** Return a Set of all files in a given directory */
237-
public static Set<Path> filesInDirectory(Path directory) throws IOException {
237+
private static Set<Path> filesInDirectory(Path directory) throws IOException {
238238
Set<Path> files = new TreeSet<>();
239239
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
240240
for (Path file : stream) {

server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,7 @@ private int indexDocs(String indexName, Object ... source) throws InterruptedExc
190190
}
191191

192192
private Path getIndexPath(String nodeName, ShardId shardId) {
193-
final Set<Path> indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
194-
assertThat(indexDirs, hasSize(1));
195-
return indexDirs.iterator().next();
193+
return RemoveCorruptedShardDataCommandIT.getPathToShardData(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
196194
}
197195

198196
private Set<String> getAllocationIds(String indexName) {

0 commit comments

Comments
 (0)