Skip to content

Commit f861f21

Browse files
committed
Add repository metadata integrity check API
Relates #52622
1 parent a3f8abb commit f861f21

File tree

14 files changed

+1182
-4
lines changed

14 files changed

+1182
-4
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.repositories.blobstore;
10+
11+
import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction;
12+
import org.elasticsearch.action.index.IndexRequestBuilder;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.core.Releasable;
16+
import org.elasticsearch.core.Releasables;
17+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor;
18+
import org.elasticsearch.repositories.RepositoriesService;
19+
import org.elasticsearch.repositories.RepositoryData;
20+
import org.elasticsearch.repositories.RepositoryException;
21+
import org.elasticsearch.repositories.RepositoryVerificationException;
22+
import org.elasticsearch.rest.RestStatus;
23+
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
24+
import org.elasticsearch.test.CorruptionUtils;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.stream.Collectors;
35+
36+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
37+
import static org.hamcrest.Matchers.allOf;
38+
import static org.hamcrest.Matchers.containsString;
39+
import static org.hamcrest.Matchers.empty;
40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.not;
42+
43+
public class BlobStoreMetadataIntegrityIT extends AbstractSnapshotIntegTestCase {
44+
45+
private static final String REPOSITORY_NAME = "test-repo";
46+
47+
private Releasable integrityCheckSuppressor;
48+
49+
@Before
50+
public void suppressIntegrityChecks() {
51+
disableRepoConsistencyCheck("testing integrity checks involves breaking the repo");
52+
assertNull(integrityCheckSuppressor);
53+
integrityCheckSuppressor = new BlobStoreIndexShardSnapshotsIntegritySuppressor();
54+
}
55+
56+
@After
57+
public void enableIntegrityChecks() {
58+
Releasables.closeExpectNoException(integrityCheckSuppressor);
59+
integrityCheckSuppressor = null;
60+
}
61+
62+
public void testIntegrityCheck() throws Exception {
63+
final var repoPath = randomRepoPath();
64+
createRepository(
65+
REPOSITORY_NAME,
66+
"fs",
67+
Settings.builder().put(BlobStoreRepository.SUPPORT_URL_REPO.getKey(), false).put("location", repoPath)
68+
);
69+
final var repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(REPOSITORY_NAME);
70+
71+
final var indexCount = between(1, 3);
72+
for (int i = 0; i < indexCount; i++) {
73+
createIndexWithRandomDocs("test-index-" + i, between(1, 1000));
74+
}
75+
76+
for (int snapshotIndex = 0; snapshotIndex < 4; snapshotIndex++) {
77+
final var indexRequests = new ArrayList<IndexRequestBuilder>();
78+
for (int i = 0; i < indexCount; i++) {
79+
if (randomBoolean()) {
80+
final var indexName = "test-index-" + i;
81+
if (randomBoolean()) {
82+
assertAcked(client().admin().indices().prepareDelete(indexName));
83+
createIndexWithRandomDocs(indexName, between(1, 1000));
84+
}
85+
final var numDocs = between(1, 1000);
86+
for (int doc = 0; doc < numDocs; doc++) {
87+
indexRequests.add(client().prepareIndex(indexName).setSource("field1", "bar " + doc));
88+
}
89+
}
90+
}
91+
indexRandom(true, indexRequests);
92+
assertEquals(0, client().admin().indices().prepareFlush().get().getFailedShards());
93+
createFullSnapshot(REPOSITORY_NAME, "test-snapshot-" + snapshotIndex);
94+
}
95+
96+
final var request = new VerifyRepositoryIntegrityAction.Request(REPOSITORY_NAME, 5, 5, 5, 5, 10000, false);
97+
98+
final var response = PlainActionFuture.<VerifyRepositoryIntegrityAction.Response, RuntimeException>get(
99+
listener -> client().execute(VerifyRepositoryIntegrityAction.INSTANCE, request, listener),
100+
30,
101+
TimeUnit.SECONDS
102+
);
103+
assertThat(response.getRestStatus(), equalTo(RestStatus.OK));
104+
assertThat(response.getExceptions(), empty());
105+
106+
final var tempDir = createTempDir();
107+
108+
final List<Path> blobs;
109+
try (var paths = Files.walk(repoPath)) {
110+
blobs = paths.filter(Files::isRegularFile).sorted().toList();
111+
}
112+
for (final var blob : blobs) {
113+
logger.info("repo contents: {}", blob);
114+
}
115+
116+
final var repositoryDataFuture = new PlainActionFuture<RepositoryData>();
117+
repository.getRepositoryData(repositoryDataFuture);
118+
final var repositoryData = repositoryDataFuture.get();
119+
final var repositoryDataBlob = repoPath.resolve("index-" + repositoryData.getGenId());
120+
121+
for (int i = 0; i < 2000; i++) {
122+
final var blobToDamage = randomFrom(blobs);
123+
final var isDataBlob = blobToDamage.getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX);
124+
final var isIndexBlob = blobToDamage.equals(repositoryDataBlob);
125+
if (isDataBlob || isIndexBlob || randomBoolean()) {
126+
logger.info("--> deleting {}", blobToDamage);
127+
Files.move(blobToDamage, tempDir.resolve("tmp"));
128+
} else {
129+
logger.info("--> corrupting {}", blobToDamage);
130+
Files.copy(blobToDamage, tempDir.resolve("tmp"));
131+
CorruptionUtils.corruptFile(random(), blobToDamage);
132+
}
133+
try {
134+
final var isCancelled = new AtomicBoolean();
135+
136+
final var verificationResponse = PlainActionFuture.get(
137+
(PlainActionFuture<List<RepositoryVerificationException>> listener) -> repository.verifyMetadataIntegrity(
138+
request,
139+
listener,
140+
() -> {
141+
if (rarely() && rarely()) {
142+
isCancelled.set(true);
143+
return true;
144+
}
145+
return isCancelled.get();
146+
}
147+
),
148+
30,
149+
TimeUnit.SECONDS
150+
);
151+
assertThat(verificationResponse, not(empty()));
152+
final var responseString = verificationResponse.stream().map(Throwable::getMessage).collect(Collectors.joining("\n"));
153+
if (isCancelled.get()) {
154+
assertThat(responseString, containsString("verification task cancelled before completion"));
155+
}
156+
if (isDataBlob && isCancelled.get() == false) {
157+
assertThat(
158+
responseString,
159+
allOf(containsString(blobToDamage.getFileName().toString()), containsString("missing blob"))
160+
);
161+
}
162+
} catch (RepositoryException e) {
163+
// ok, this means e.g. we couldn't even read the index blob
164+
} finally {
165+
Files.deleteIfExists(blobToDamage);
166+
Files.move(tempDir.resolve("tmp"), blobToDamage);
167+
}
168+
169+
final var repairResponse = PlainActionFuture.<VerifyRepositoryIntegrityAction.Response, RuntimeException>get(
170+
listener -> client().execute(VerifyRepositoryIntegrityAction.INSTANCE, request, listener),
171+
30,
172+
TimeUnit.SECONDS
173+
);
174+
assertThat(repairResponse.getRestStatus(), equalTo(RestStatus.OK));
175+
assertThat(repairResponse.getExceptions(), empty());
176+
}
177+
}
178+
}

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
exports org.elasticsearch.action.admin.cluster.repositories.cleanup;
7171
exports org.elasticsearch.action.admin.cluster.repositories.delete;
7272
exports org.elasticsearch.action.admin.cluster.repositories.get;
73+
exports org.elasticsearch.action.admin.cluster.repositories.integrity;
7374
exports org.elasticsearch.action.admin.cluster.repositories.put;
7475
exports org.elasticsearch.action.admin.cluster.repositories.verify;
7576
exports org.elasticsearch.action.admin.cluster.reroute;

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
6161
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction;
6262
import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
63+
import org.elasticsearch.action.admin.cluster.repositories.integrity.VerifyRepositoryIntegrityAction;
6364
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
6465
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
6566
import org.elasticsearch.action.admin.cluster.repositories.verify.TransportVerifyRepositoryAction;
@@ -336,6 +337,7 @@
336337
import org.elasticsearch.rest.action.admin.cluster.RestSnapshottableFeaturesAction;
337338
import org.elasticsearch.rest.action.admin.cluster.RestUpdateDesiredNodesAction;
338339
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
340+
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryIntegrityAction;
339341
import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction;
340342
import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction;
341343
import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction;
@@ -588,6 +590,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
588590
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
589591
actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
590592
actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
593+
actions.register(VerifyRepositoryIntegrityAction.INSTANCE, VerifyRepositoryIntegrityAction.TransportAction.class);
591594
actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class);
592595
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
593596
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
@@ -757,6 +760,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
757760
registerHandler.accept(new RestGetRepositoriesAction(settingsFilter));
758761
registerHandler.accept(new RestDeleteRepositoryAction());
759762
registerHandler.accept(new RestVerifyRepositoryAction());
763+
registerHandler.accept(new RestVerifyRepositoryIntegrityAction());
760764
registerHandler.accept(new RestCleanupRepositoryAction());
761765
registerHandler.accept(new RestGetSnapshotsAction());
762766
registerHandler.accept(new RestCreateSnapshotAction());

0 commit comments

Comments
 (0)