Skip to content

Commit 2f1f27a

Browse files
DaveCTurnercbuescher
authored andcommitted
Improve reaction to blob store corruptions (elastic#111954)
Today there are a couple of assertions that can trip if the contents of a snapshot repostiory are corrupted. It makes sense to assert the integrity of snapshots in most tests, but we must also (a) protect against these corruptions in production and (b) allow some tests to verify the behaviour of the system when the repository is corrupted. This commit introduces a flag to disable certain assertions, converts the relevant assertions into production failures too, and introduces a high-level test to verify that we do detect all relevant corruptions without tripping any other assertions. Extracted from elastic#93735 as this change makes sense in its own right. Relates elastic#52622.
1 parent acd961e commit 2f1f27a

File tree

6 files changed

+300
-2
lines changed

6 files changed

+300
-2
lines changed

docs/reference/release-notes/8.15.0.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ after it is killed up to four times in 24 hours. (issue: {es-issue}110530[#11053
1616
* Pipeline aggregations under `time_series` and `categorize_text` aggregations are never
1717
returned (issue: {es-issue}111679[#111679])
1818

19+
* Elasticsearch will not start on Windows machines when the recommended [bootstrap.memory_lock: true](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration-memory.html#bootstrap-memory_lock) setting is configured due to [native access refactoring](https://github.com/elastic/elasticsearch/pull/111866). The workaround for 8.15.0 is to downgrade to the previous version. This issue will be fixed in 8.15.1.
20+
1921
[[breaking-8.15.0]]
2022
[float]
2123
=== Breaking changes
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.repositories.blobstore;
10+
11+
import org.apache.lucene.tests.mockfile.ExtrasFS;
12+
import org.elasticsearch.ElasticsearchException;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
15+
import org.elasticsearch.action.support.ActionTestUtils;
16+
import org.elasticsearch.action.support.SubscribableListener;
17+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.core.CheckedConsumer;
20+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor;
21+
import org.elasticsearch.logging.LogManager;
22+
import org.elasticsearch.logging.Logger;
23+
import org.elasticsearch.repositories.fs.FsRepository;
24+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
25+
import org.elasticsearch.snapshots.SnapshotState;
26+
import org.elasticsearch.test.ESTestCase;
27+
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
28+
import org.junit.Before;
29+
30+
import java.io.IOException;
31+
import java.nio.file.FileVisitResult;
32+
import java.nio.file.Files;
33+
import java.nio.file.Path;
34+
import java.nio.file.SimpleFileVisitor;
35+
import java.nio.file.attribute.BasicFileAttributes;
36+
import java.util.ArrayList;
37+
import java.util.Base64;
38+
import java.util.List;
39+
40+
public class BlobStoreCorruptionIT extends AbstractSnapshotIntegTestCase {
41+
42+
private static final Logger logger = LogManager.getLogger(BlobStoreCorruptionIT.class);
43+
44+
@Before
45+
public void suppressConsistencyCheck() {
46+
disableRepoConsistencyCheck("testing corruption detection involves breaking the repo");
47+
}
48+
49+
public void testCorruptionDetection() throws Exception {
50+
final var repositoryName = randomIdentifier();
51+
final var indexName = randomIdentifier();
52+
final var snapshotName = randomIdentifier();
53+
final var repositoryRootPath = randomRepoPath();
54+
55+
createRepository(repositoryName, FsRepository.TYPE, repositoryRootPath);
56+
createIndexWithRandomDocs(indexName, between(1, 100));
57+
flushAndRefresh(indexName);
58+
createSnapshot(repositoryName, snapshotName, List.of(indexName));
59+
60+
final var corruptedFile = corruptRandomFile(repositoryRootPath);
61+
final var corruptedFileType = RepositoryFileType.getRepositoryFileType(repositoryRootPath, corruptedFile);
62+
final var corruptionDetectors = new ArrayList<CheckedConsumer<ActionListener<Exception>, ?>>();
63+
64+
// detect corruption by listing the snapshots
65+
if (corruptedFileType == RepositoryFileType.SNAPSHOT_INFO) {
66+
corruptionDetectors.add(exceptionListener -> {
67+
logger.info("--> listing snapshots");
68+
client().admin()
69+
.cluster()
70+
.prepareGetSnapshots(TEST_REQUEST_TIMEOUT, repositoryName)
71+
.execute(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
72+
});
73+
}
74+
75+
// detect corruption by taking another snapshot
76+
if (corruptedFileType == RepositoryFileType.SHARD_GENERATION) {
77+
corruptionDetectors.add(exceptionListener -> {
78+
logger.info("--> taking another snapshot");
79+
client().admin()
80+
.cluster()
81+
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
82+
.setWaitForCompletion(true)
83+
.execute(exceptionListener.map(createSnapshotResponse -> {
84+
assertNotEquals(SnapshotState.SUCCESS, createSnapshotResponse.getSnapshotInfo().state());
85+
return new ElasticsearchException("create-snapshot failed as expected");
86+
}));
87+
});
88+
}
89+
90+
// detect corruption by restoring the snapshot
91+
switch (corruptedFileType) {
92+
case SNAPSHOT_INFO, GLOBAL_METADATA, INDEX_METADATA -> corruptionDetectors.add(exceptionListener -> {
93+
logger.info("--> restoring snapshot");
94+
client().admin()
95+
.cluster()
96+
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
97+
.setRestoreGlobalState(corruptedFileType == RepositoryFileType.GLOBAL_METADATA || randomBoolean())
98+
.setWaitForCompletion(true)
99+
.execute(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
100+
});
101+
case SHARD_SNAPSHOT_INFO, SHARD_DATA -> corruptionDetectors.add(exceptionListener -> {
102+
logger.info("--> restoring snapshot and checking for failed shards");
103+
SubscribableListener
104+
// if shard-level data is corrupted then the overall restore succeeds but the shard recoveries fail
105+
.<AcknowledgedResponse>newForked(l -> client().admin().indices().prepareDelete(indexName).execute(l))
106+
.andThenAccept(ElasticsearchAssertions::assertAcked)
107+
108+
.<RestoreSnapshotResponse>andThen(
109+
l -> client().admin()
110+
.cluster()
111+
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
112+
.setRestoreGlobalState(randomBoolean())
113+
.setWaitForCompletion(true)
114+
.execute(l)
115+
)
116+
117+
.addListener(exceptionListener.map(restoreSnapshotResponse -> {
118+
assertNotEquals(0, restoreSnapshotResponse.getRestoreInfo().failedShards());
119+
return new ElasticsearchException("post-restore recoveries failed as expected");
120+
}));
121+
});
122+
}
123+
124+
try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) {
125+
final var exception = safeAwait(randomFrom(corruptionDetectors));
126+
logger.info(Strings.format("--> corrupted [%s] and caught exception", corruptedFile), exception);
127+
}
128+
}
129+
130+
private static Path corruptRandomFile(Path repositoryRootPath) throws IOException {
131+
final var corruptedFileType = getRandomCorruptibleFileType();
132+
final var corruptedFile = getRandomFileToCorrupt(repositoryRootPath, corruptedFileType);
133+
if (randomBoolean()) {
134+
logger.info("--> deleting [{}]", corruptedFile);
135+
Files.delete(corruptedFile);
136+
} else {
137+
corruptFileContents(corruptedFile);
138+
}
139+
return corruptedFile;
140+
}
141+
142+
private static void corruptFileContents(Path fileToCorrupt) throws IOException {
143+
final var oldFileContents = Files.readAllBytes(fileToCorrupt);
144+
logger.info("--> contents of [{}] before corruption: [{}]", fileToCorrupt, Base64.getEncoder().encodeToString(oldFileContents));
145+
final byte[] newFileContents = new byte[randomBoolean() ? oldFileContents.length : between(0, oldFileContents.length)];
146+
System.arraycopy(oldFileContents, 0, newFileContents, 0, newFileContents.length);
147+
if (newFileContents.length == oldFileContents.length) {
148+
final var corruptionPosition = between(0, newFileContents.length - 1);
149+
newFileContents[corruptionPosition] = randomValueOtherThan(oldFileContents[corruptionPosition], ESTestCase::randomByte);
150+
logger.info(
151+
"--> updating byte at position [{}] from [{}] to [{}]",
152+
corruptionPosition,
153+
oldFileContents[corruptionPosition],
154+
newFileContents[corruptionPosition]
155+
);
156+
} else {
157+
logger.info("--> truncating file from length [{}] to length [{}]", oldFileContents.length, newFileContents.length);
158+
}
159+
Files.write(fileToCorrupt, newFileContents);
160+
logger.info("--> contents of [{}] after corruption: [{}]", fileToCorrupt, Base64.getEncoder().encodeToString(newFileContents));
161+
}
162+
163+
private static RepositoryFileType getRandomCorruptibleFileType() {
164+
return randomValueOtherThanMany(
165+
// these blob types do not have reliable corruption detection, so we must skip them
166+
t -> t == RepositoryFileType.ROOT_INDEX_N || t == RepositoryFileType.ROOT_INDEX_LATEST,
167+
() -> randomFrom(RepositoryFileType.values())
168+
);
169+
}
170+
171+
private static Path getRandomFileToCorrupt(Path repositoryRootPath, RepositoryFileType corruptedFileType) throws IOException {
172+
final var corruptibleFiles = new ArrayList<Path>();
173+
Files.walkFileTree(repositoryRootPath, new SimpleFileVisitor<>() {
174+
@Override
175+
public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) throws IOException {
176+
if (ExtrasFS.isExtra(filePath.getFileName().toString()) == false
177+
&& RepositoryFileType.getRepositoryFileType(repositoryRootPath, filePath) == corruptedFileType) {
178+
corruptibleFiles.add(filePath);
179+
}
180+
return super.visitFile(filePath, attrs);
181+
}
182+
});
183+
return randomFrom(corruptibleFiles);
184+
}
185+
186+
}

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.unit.ByteSizeValue;
1818
import org.elasticsearch.common.xcontent.XContentParserUtils;
1919
import org.elasticsearch.core.Nullable;
20+
import org.elasticsearch.gateway.CorruptStateException;
2021
import org.elasticsearch.index.store.StoreFileMetadata;
2122
import org.elasticsearch.xcontent.ParseField;
2223
import org.elasticsearch.xcontent.ToXContentFragment;
@@ -318,7 +319,11 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
318319
}
319320
case WRITER_UUID -> {
320321
writerUuid = new BytesRef(parser.binaryValue());
321-
assert writerUuid.length > 0;
322+
assert BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED == false || writerUuid.length > 0;
323+
if (writerUuid.length == 0) {
324+
// we never write UNAVAILABLE_WRITER_UUID, so this must be due to corruption
325+
throw new ElasticsearchParseException("invalid (empty) writer uuid");
326+
}
322327
}
323328
default -> XContentParserUtils.throwUnknownField(currentFieldName, parser);
324329
}
@@ -336,6 +341,12 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
336341
} else if (checksum == null) {
337342
throw new ElasticsearchParseException("missing checksum for name [" + name + "]");
338343
}
344+
try {
345+
// check for corruption before asserting writtenBy is parseable in the StoreFileMetadata constructor
346+
org.apache.lucene.util.Version.parse(writtenBy);
347+
} catch (Exception e) {
348+
throw new ElasticsearchParseException("invalid written_by [" + writtenBy + "]");
349+
}
339350
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize);
340351
}
341352

@@ -566,6 +577,11 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th
566577
}
567578
}
568579

580+
// check for corruption before asserting snapshot != null in the BlobStoreIndexShardSnapshot ctor
581+
if (snapshot == null) {
582+
throw new CorruptStateException("snapshot missing");
583+
}
584+
569585
return new BlobStoreIndexShardSnapshot(
570586
snapshot,
571587
indexFiles == null ? List.of() : indexFiles,

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
264264
return builder;
265265
}
266266

267+
static volatile boolean INTEGRITY_ASSERTIONS_ENABLED = true;
268+
267269
public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException {
268270
XContentParser.Token token = parser.currentToken();
269271
if (token == null) { // New parser
@@ -317,7 +319,12 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
317319
List<FileInfo> fileInfosBuilder = new ArrayList<>();
318320
for (String file : entry.v2()) {
319321
FileInfo fileInfo = files.get(file);
320-
assert fileInfo != null;
322+
if (fileInfo == null) {
323+
// could happen in production if the repo contents are corrupted
324+
final var exception = new IllegalStateException("shard index inconsistent at file [" + file + "]");
325+
assert INTEGRITY_ASSERTIONS_ENABLED == false : exception;
326+
throw exception;
327+
}
321328
fileInfosBuilder.add(fileInfo);
322329
}
323330
snapshots.add(new SnapshotFiles(entry.v1(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.v1())));
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.snapshots.blobstore;
10+
11+
import org.elasticsearch.core.Releasable;
12+
13+
/**
14+
* Test utility class to suppress assertions about the integrity of the contents of a blobstore repository, in order to verify the
15+
* production behaviour on encountering invalid data.
16+
*/
17+
public class BlobStoreIndexShardSnapshotsIntegritySuppressor implements Releasable {
18+
19+
public BlobStoreIndexShardSnapshotsIntegritySuppressor() {
20+
BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = false;
21+
}
22+
23+
@Override
24+
public void close() {
25+
BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = true;
26+
}
27+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.repositories.blobstore;
10+
11+
import org.elasticsearch.common.Strings;
12+
13+
import java.nio.file.Path;
14+
import java.util.regex.Pattern;
15+
16+
/**
17+
* The types of blobs in a {@link BlobStoreRepository}.
18+
*/
19+
public enum RepositoryFileType {
20+
21+
ROOT_INDEX_N("index-NUM"),
22+
ROOT_INDEX_LATEST("index.latest"),
23+
SNAPSHOT_INFO("snap-UUID.dat"),
24+
GLOBAL_METADATA("meta-UUID.dat"),
25+
INDEX_METADATA("indices/UUID/meta-SHORTUUID.dat"),
26+
SHARD_GENERATION("indices/UUID/NUM/index-UUID"),
27+
SHARD_SNAPSHOT_INFO("indices/UUID/NUM/snap-UUID.dat"),
28+
SHARD_DATA("indices/UUID/NUM/__UUID"),
29+
// NB no support for legacy names (yet)
30+
;
31+
32+
private final Pattern pattern;
33+
34+
RepositoryFileType(String regex) {
35+
pattern = Pattern.compile(
36+
"^("
37+
+ regex
38+
// decimal numbers
39+
.replace("NUM", "(0|[1-9][0-9]*)")
40+
// 15-byte UUIDS from TimeBasedUUIDGenerator
41+
.replace("SHORTUUID", "[0-9a-zA-Z_-]{20}")
42+
// 16-byte UUIDs from RandomBasedUUIDGenerator
43+
.replace("UUID", "[0-9a-zA-Z_-]{22}")
44+
+ ")$"
45+
);
46+
}
47+
48+
public static RepositoryFileType getRepositoryFileType(Path repositoryRoot, Path blobPath) {
49+
final var relativePath = repositoryRoot.relativize(blobPath).toString().replace(repositoryRoot.getFileSystem().getSeparator(), "/");
50+
for (final var repositoryFileType : RepositoryFileType.values()) {
51+
if (repositoryFileType.pattern.matcher(relativePath).matches()) {
52+
return repositoryFileType;
53+
}
54+
}
55+
throw new IllegalArgumentException(
56+
Strings.format("[%s] is not the path of a known blob type within [%s]", relativePath, repositoryRoot)
57+
);
58+
}
59+
60+
}

0 commit comments

Comments
 (0)