Skip to content

Commit f663886

Browse files
authored
Split searchable snapshot into multiple repo operations (#116987)
* Split searchable snapshot into multiple repo operations Each operation on a snapshot repository uses the same `Repository`, `BlobStore`, etc. instances throughout, in order to avoid the complexity arising from handling metadata updates that occur while an operation is running. Today we model the entire lifetime of a searchable snapshot shard as a single repository operation since there should be no metadata updates that matter in this context (other than those that are handled dynamically via other mechanisms) and some metadata updates might be positively harmful to a searchable snapshot shard. It turns out that there are some undocumented legacy settings which _do_ matter to searchable snapshots, and which are still in use, so with this commit we move to a finer-grained model of repository operations within a searchable snapshot. Backport of #116918 to 8.16 * Add end-to-end test for reloading S3 credentials We don't seem to have a test that completely verifies that a S3 repository can reload credentials from an updated keystore. This commit adds such a test. Backport of #116762 to 8.16.
1 parent 33b8fb1 commit f663886

File tree

9 files changed

+505
-79
lines changed

9 files changed

+505
-79
lines changed

docs/changelog/116918.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116918
2+
summary: Split searchable snapshot into multiple repo operations
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ public class RepositoryMetadata implements Writeable {
4646
* @param settings repository settings
4747
*/
4848
public RepositoryMetadata(String name, String type, Settings settings) {
49-
this(name, RepositoryData.MISSING_UUID, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
49+
this(name, RepositoryData.MISSING_UUID, type, settings);
50+
}
51+
52+
public RepositoryMetadata(String name, String uuid, String type, Settings settings) {
53+
this(name, uuid, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
5054
}
5155

5256
public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) {

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,22 @@ public RegisterRepositoryTask(final RepositoriesService repositoriesService, fin
283283

284284
@Override
285285
public ClusterState execute(ClusterState currentState) {
286-
RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
287286
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
288287
RepositoriesMetadata repositories = RepositoriesMetadata.get(currentState);
289288
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
290289
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
291-
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
290+
if (repositoryMetadata.name().equals(request.name())) {
291+
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
292+
request.name(),
293+
// Copy the UUID from the existing instance rather than resetting it back to MISSING_UUID which would force us to
294+
// re-read the RepositoryData to get it again. In principle the new RepositoryMetadata might point to a different
295+
// underlying repository at this point, but if so that'll cause things to fail in clear ways and eventually (before
296+
// writing anything) we'll read the RepositoryData again and update the UUID in the RepositoryMetadata to match. See
297+
// also #109936.
298+
repositoryMetadata.uuid(),
299+
request.type(),
300+
request.settings()
301+
);
292302
Repository existing = repositoriesService.repositories.get(request.name());
293303
if (existing == null) {
294304
existing = repositoriesService.internalRepositories.get(request.name());

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
200200
public static final String STATELESS_SHARD_WRITE_THREAD_NAME = "stateless_shard_write";
201201
public static final String STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME = "stateless_cluster_state";
202202
public static final String STATELESS_SHARD_PREWARMING_THREAD_NAME = "stateless_prewarm";
203+
public static final String SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME = "searchable_snapshots_cache_fetch_async";
204+
public static final String SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME = "searchable_snapshots_cache_prewarming";
203205

204206
/**
205207
* Prefix for the name of the root {@link RepositoryData} blob.
@@ -2183,7 +2185,9 @@ private void assertSnapshotOrStatelessPermittedThreadPool() {
21832185
STATELESS_TRANSLOG_THREAD_NAME,
21842186
STATELESS_SHARD_WRITE_THREAD_NAME,
21852187
STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME,
2186-
STATELESS_SHARD_PREWARMING_THREAD_NAME
2188+
STATELESS_SHARD_PREWARMING_THREAD_NAME,
2189+
SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME,
2190+
SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME
21872191
);
21882192
}
21892193

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.searchablesnapshots.s3;
9+
10+
import fixture.s3.S3HttpFixture;
11+
import io.netty.handler.codec.http.HttpMethod;
12+
13+
import org.apache.http.client.methods.HttpPut;
14+
import org.apache.http.entity.ByteArrayEntity;
15+
import org.apache.http.entity.ContentType;
16+
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.client.RequestOptions;
18+
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.client.WarningsHandler;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.core.Nullable;
22+
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
24+
import org.elasticsearch.test.cluster.MutableSettingsProvider;
25+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
26+
import org.elasticsearch.test.rest.ESRestTestCase;
27+
import org.elasticsearch.test.rest.ObjectPath;
28+
import org.elasticsearch.xcontent.XContentBuilder;
29+
import org.elasticsearch.xcontent.XContentType;
30+
import org.junit.Before;
31+
import org.junit.ClassRule;
32+
import org.junit.rules.RuleChain;
33+
import org.junit.rules.TestRule;
34+
35+
import java.io.ByteArrayOutputStream;
36+
import java.io.IOException;
37+
import java.util.function.UnaryOperator;
38+
39+
import static org.hamcrest.CoreMatchers.containsString;
40+
import static org.hamcrest.Matchers.allOf;
41+
42+
public class S3SearchableSnapshotsCredentialsReloadIT extends ESRestTestCase {
43+
44+
private static final String BUCKET = "S3SearchableSnapshotsCredentialsReloadIT-bucket";
45+
private static final String BASE_PATH = "S3SearchableSnapshotsCredentialsReloadIT-base-path";
46+
47+
public static final S3HttpFixture s3Fixture = new S3HttpFixture(true, BUCKET, BASE_PATH, "ignored");
48+
49+
private static final MutableSettingsProvider keystoreSettings = new MutableSettingsProvider();
50+
51+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
52+
.distribution(DistributionType.DEFAULT)
53+
.setting("xpack.license.self_generated.type", "trial")
54+
.keystore(keystoreSettings)
55+
.setting("xpack.searchable.snapshot.shared_cache.size", "4kB")
56+
.setting("xpack.searchable.snapshot.shared_cache.region_size", "4kB")
57+
.setting("xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive", "0ms")
58+
.setting("xpack.security.enabled", "false")
59+
.systemProperty("es.allow_insecure_settings", "true")
60+
.build();
61+
62+
@ClassRule
63+
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster);
64+
65+
@Override
66+
protected String getTestRestCluster() {
67+
return cluster.getHttpAddresses();
68+
}
69+
70+
@Before
71+
public void skipFips() {
72+
assumeFalse("getting these tests to run in a FIPS JVM is kinda fiddly and we don't really need the extra coverage", inFipsJvm());
73+
}
74+
75+
public void testReloadCredentialsFromKeystore() throws IOException {
76+
final TestHarness testHarness = new TestHarness();
77+
testHarness.putRepository();
78+
79+
// Set up initial credentials
80+
final String accessKey1 = randomIdentifier();
81+
s3Fixture.setAccessKey(accessKey1);
82+
keystoreSettings.put("s3.client.default.access_key", accessKey1);
83+
keystoreSettings.put("s3.client.default.secret_key", randomIdentifier());
84+
cluster.updateStoredSecureSettings();
85+
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings")));
86+
87+
testHarness.createFrozenSearchableSnapshotIndex();
88+
89+
// Verify searchable snapshot functionality
90+
testHarness.ensureSearchSuccess();
91+
92+
// Rotate credentials in blob store
93+
logger.info("--> rotate credentials");
94+
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier);
95+
s3Fixture.setAccessKey(accessKey2);
96+
97+
// Ensure searchable snapshot now does not work due to invalid credentials
98+
logger.info("--> expect failure");
99+
testHarness.ensureSearchFailure();
100+
101+
// Set up refreshed credentials
102+
logger.info("--> update keystore contents");
103+
keystoreSettings.put("s3.client.default.access_key", accessKey2);
104+
cluster.updateStoredSecureSettings();
105+
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings")));
106+
107+
// Check access using refreshed credentials
108+
logger.info("--> expect success");
109+
testHarness.ensureSearchSuccess();
110+
}
111+
112+
public void testReloadCredentialsFromAlternativeClient() throws IOException {
113+
final TestHarness testHarness = new TestHarness();
114+
testHarness.putRepository();
115+
116+
// Set up credentials
117+
final String accessKey1 = randomIdentifier();
118+
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier);
119+
final String alternativeClient = randomValueOtherThan("default", ESTestCase::randomIdentifier);
120+
121+
s3Fixture.setAccessKey(accessKey1);
122+
keystoreSettings.put("s3.client.default.access_key", accessKey1);
123+
keystoreSettings.put("s3.client.default.secret_key", randomIdentifier());
124+
keystoreSettings.put("s3.client." + alternativeClient + ".access_key", accessKey2);
125+
keystoreSettings.put("s3.client." + alternativeClient + ".secret_key", randomIdentifier());
126+
cluster.updateStoredSecureSettings();
127+
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings")));
128+
129+
testHarness.createFrozenSearchableSnapshotIndex();
130+
131+
// Verify searchable snapshot functionality
132+
testHarness.ensureSearchSuccess();
133+
134+
// Rotate credentials in blob store
135+
logger.info("--> rotate credentials");
136+
s3Fixture.setAccessKey(accessKey2);
137+
138+
// Ensure searchable snapshot now does not work due to invalid credentials
139+
logger.info("--> expect failure");
140+
testHarness.ensureSearchFailure();
141+
142+
// Adjust repository to use new client
143+
logger.info("--> update repository metadata");
144+
testHarness.putRepository(b -> b.put("client", alternativeClient));
145+
146+
// Check access using refreshed credentials
147+
logger.info("--> expect success");
148+
testHarness.ensureSearchSuccess();
149+
}
150+
151+
public void testReloadCredentialsFromMetadata() throws IOException {
152+
final TestHarness testHarness = new TestHarness();
153+
testHarness.warningsHandler = WarningsHandler.PERMISSIVE;
154+
155+
// Set up credentials
156+
final String accessKey1 = randomIdentifier();
157+
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier);
158+
159+
testHarness.putRepository(b -> b.put("access_key", accessKey1).put("secret_key", randomIdentifier()));
160+
s3Fixture.setAccessKey(accessKey1);
161+
162+
testHarness.createFrozenSearchableSnapshotIndex();
163+
164+
// Verify searchable snapshot functionality
165+
testHarness.ensureSearchSuccess();
166+
167+
// Rotate credentials in blob store
168+
logger.info("--> rotate credentials");
169+
s3Fixture.setAccessKey(accessKey2);
170+
171+
// Ensure searchable snapshot now does not work due to invalid credentials
172+
logger.info("--> expect failure");
173+
testHarness.ensureSearchFailure();
174+
175+
// Adjust repository to use new client
176+
logger.info("--> update repository metadata");
177+
testHarness.putRepository(b -> b.put("access_key", accessKey2).put("secret_key", randomIdentifier()));
178+
179+
// Check access using refreshed credentials
180+
logger.info("--> expect success");
181+
testHarness.ensureSearchSuccess();
182+
}
183+
184+
private class TestHarness {
185+
private final String mountedIndexName = randomIdentifier();
186+
private final String repositoryName = randomIdentifier();
187+
188+
@Nullable // to use the default
189+
WarningsHandler warningsHandler;
190+
191+
void putRepository() throws IOException {
192+
putRepository(UnaryOperator.identity());
193+
}
194+
195+
void putRepository(UnaryOperator<Settings.Builder> settingsOperator) throws IOException {
196+
// Register repository
197+
final Request request = newXContentRequest(
198+
HttpMethod.PUT,
199+
"/_snapshot/" + repositoryName,
200+
(b, p) -> b.field("type", "s3")
201+
.startObject("settings")
202+
.value(
203+
settingsOperator.apply(
204+
Settings.builder().put("bucket", BUCKET).put("base_path", BASE_PATH).put("endpoint", s3Fixture.getAddress())
205+
).build()
206+
)
207+
.endObject()
208+
);
209+
request.addParameter("verify", "false"); // because we don't have access to the blob store yet
210+
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
211+
assertOK(client().performRequest(request));
212+
}
213+
214+
void createFrozenSearchableSnapshotIndex() throws IOException {
215+
// Create an index, large enough that its data is not all captured in the file headers
216+
final String indexName = randomValueOtherThan(mountedIndexName, ESTestCase::randomIdentifier);
217+
createIndex(indexName, indexSettings(1, 0).build());
218+
try (var bodyStream = new ByteArrayOutputStream()) {
219+
for (int i = 0; i < 1024; i++) {
220+
try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) {
221+
bodyLineBuilder.startObject().startObject("index").endObject().endObject();
222+
}
223+
bodyStream.write(0x0a);
224+
try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) {
225+
bodyLineBuilder.startObject().field("foo", "bar").endObject();
226+
}
227+
bodyStream.write(0x0a);
228+
}
229+
bodyStream.flush();
230+
final Request request = new Request("PUT", indexName + "/_bulk");
231+
request.setEntity(new ByteArrayEntity(bodyStream.toByteArray(), ContentType.APPLICATION_JSON));
232+
client().performRequest(request);
233+
}
234+
235+
// Take a snapshot and delete the original index
236+
final String snapshotName = randomIdentifier();
237+
final Request createSnapshotRequest = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repositoryName + '/' + snapshotName);
238+
createSnapshotRequest.addParameter("wait_for_completion", "true");
239+
createSnapshotRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
240+
assertOK(client().performRequest(createSnapshotRequest));
241+
242+
deleteIndex(indexName);
243+
244+
// Mount the snapshotted index as a searchable snapshot
245+
final Request mountRequest = newXContentRequest(
246+
HttpMethod.POST,
247+
"/_snapshot/" + repositoryName + "/" + snapshotName + "/_mount",
248+
(b, p) -> b.field("index", indexName).field("renamed_index", mountedIndexName)
249+
);
250+
mountRequest.addParameter("wait_for_completion", "true");
251+
mountRequest.addParameter("storage", "shared_cache");
252+
assertOK(client().performRequest(mountRequest));
253+
ensureGreen(mountedIndexName);
254+
}
255+
256+
void ensureSearchSuccess() throws IOException {
257+
final Request searchRequest = new Request("GET", mountedIndexName + "/_search");
258+
searchRequest.addParameter("size", "10000");
259+
assertEquals(
260+
"bar",
261+
ObjectPath.createFromResponse(assertOK(client().performRequest(searchRequest))).evaluate("hits.hits.0._source.foo")
262+
);
263+
}
264+
265+
void ensureSearchFailure() throws IOException {
266+
assertOK(client().performRequest(new Request("POST", "/_searchable_snapshots/cache/clear")));
267+
final Request searchRequest = new Request("GET", mountedIndexName + "/_search");
268+
searchRequest.addParameter("size", "10000");
269+
assertThat(
270+
expectThrows(ResponseException.class, () -> client().performRequest(searchRequest)).getMessage(),
271+
allOf(
272+
containsString("Bad access key"),
273+
containsString("Status Code: 403"),
274+
containsString("Error Code: AccessDenied"),
275+
containsString("failed to read data from cache")
276+
)
277+
);
278+
}
279+
}
280+
281+
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,9 +547,9 @@ public Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
547547
return Map.of(SNAPSHOT_RECOVERY_STATE_FACTORY_KEY, SearchableSnapshotRecoveryState::new);
548548
}
549549

550-
public static final String CACHE_FETCH_ASYNC_THREAD_POOL_NAME = "searchable_snapshots_cache_fetch_async";
550+
public static final String CACHE_FETCH_ASYNC_THREAD_POOL_NAME = BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME;
551551
public static final String CACHE_FETCH_ASYNC_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_fetch_async_thread_pool";
552-
public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming";
552+
public static final String CACHE_PREWARMING_THREAD_POOL_NAME = BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME;
553553
public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool";
554554

555555
public static ScalingExecutorBuilder[] executorBuilders(Settings settings) {

0 commit comments

Comments
 (0)