Skip to content

Commit c7a176f

Browse files
committed
Make S3 custom query parameter optional
Today Elasticsearch will record the purpose for each request to S3 using a custom query parameter. This isn't believed to be necessary outside of the ECH/ECE/ECK/... managed services, and it adds rather a lot to the request logs, so with this commit we make the feature optional and disabled by default.
1 parent 989032b commit c7a176f

File tree

7 files changed

+191
-14
lines changed

7 files changed

+191
-14
lines changed

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ protected Settings repositorySettings(String repoName) {
135135
.put(super.repositorySettings(repoName))
136136
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
137137
.put(S3Repository.CLIENT_NAME.getKey(), "test")
138+
.put(S3Repository.ADD_PURPOSE_CUSTOM_QUERY_PARAMETER_SETTING.getKey(), true)
138139
// Don't cache repository data because some tests manually modify the repository data
139140
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false);
140141
if (randomBoolean()) {

modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
5959
.put("canned_acl", "private")
6060
.put("storage_class", "standard")
6161
.put("disable_chunked_encoding", randomBoolean())
62+
.put("add_purpose_custom_query_parameter", randomBoolean())
6263
.build()
6364
)
6465
)

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,9 +1153,7 @@ ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCounti
11531153
.prefix(keyPath)
11541154
.maxUploads(maxUploads)
11551155
// TODO adjust to use S3BlobStore.configureRequestForMetrics, adding metrics collection
1156-
.overrideConfiguration(
1157-
b -> b.putRawQueryParameter(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE, OperationPurpose.SNAPSHOT_DATA.getKey())
1158-
)
1156+
.overrideConfiguration(b -> blobStore.addPurposeQueryParameter(OperationPurpose.SNAPSHOT_DATA, b))
11591157
.build();
11601158
final var multipartUploadListing = clientReference.client().listMultipartUploads(listMultipartUploadsRequest);
11611159
final var multipartUploads = multipartUploadListing.uploads();
@@ -1184,12 +1182,7 @@ ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCounti
11841182
.key(u.key())
11851183
.uploadId(u.uploadId())
11861184
// TODO adjust to use S3BlobStore.configureRequestForMetrics, adding metrics collection
1187-
.overrideConfiguration(
1188-
b -> b.putRawQueryParameter(
1189-
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
1190-
OperationPurpose.SNAPSHOT_DATA.getKey()
1191-
)
1192-
)
1185+
.overrideConfiguration(b -> blobStore.addPurposeQueryParameter(OperationPurpose.SNAPSHOT_DATA, b))
11931186
.build()
11941187
)
11951188
);

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.repositories.s3;
1111

1212
import software.amazon.awssdk.awscore.AwsRequest;
13+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
1314
import software.amazon.awssdk.core.exception.SdkException;
1415
import software.amazon.awssdk.core.metrics.CoreMetric;
1516
import software.amazon.awssdk.core.retry.RetryUtils;
@@ -58,6 +59,8 @@
5859
import java.util.function.Predicate;
5960
import java.util.stream.Collectors;
6061

62+
import static org.elasticsearch.repositories.s3.S3Repository.ADD_PURPOSE_CUSTOM_QUERY_PARAMETER_SETTING;
63+
6164
class S3BlobStore implements BlobStore {
6265

6366
public static final String CUSTOM_QUERY_PARAMETER_COPY_SOURCE = "x-amz-copy-source";
@@ -102,6 +105,8 @@ class S3BlobStore implements BlobStore {
102105

103106
private final TimeValue getRegisterRetryDelay;
104107

108+
private final boolean addPurposeCustomQueryParameter;
109+
105110
S3BlobStore(
106111
S3Service service,
107112
String bucket,
@@ -131,6 +136,7 @@ class S3BlobStore implements BlobStore {
131136
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
132137
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
133138
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
139+
this.addPurposeCustomQueryParameter = ADD_PURPOSE_CUSTOM_QUERY_PARAMETER_SETTING.get(repositoryMetadata.settings());
134140
}
135141

136142
MetricPublisher getMetricPublisher(Operation operation, OperationPurpose purpose) {
@@ -600,9 +606,17 @@ static void configureRequestForMetrics(
600606
Operation operation,
601607
OperationPurpose purpose
602608
) {
603-
request.overrideConfiguration(
604-
builder -> builder.metricPublishers(List.of(blobStore.getMetricPublisher(operation, purpose)))
605-
.putRawQueryParameter(CUSTOM_QUERY_PARAMETER_PURPOSE, purpose.getKey())
606-
);
609+
request.overrideConfiguration(builder -> {
610+
builder.metricPublishers(List.of(blobStore.getMetricPublisher(operation, purpose)));
611+
blobStore.addPurposeQueryParameter(purpose, builder);
612+
});
613+
}
614+
615+
public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrideConfiguration.Builder builder) {
616+
if (addPurposeCustomQueryParameter || purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
617+
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including custom query parameter support, so is always added
618+
builder.putRawQueryParameter(CUSTOM_QUERY_PARAMETER_PURPOSE, purpose.getKey());
619+
}
607620
}
621+
608622
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.Strings;
2222
import org.elasticsearch.common.blobstore.BlobPath;
2323
import org.elasticsearch.common.blobstore.BlobStore;
24+
import org.elasticsearch.common.blobstore.OperationPurpose;
2425
import org.elasticsearch.common.logging.DeprecationCategory;
2526
import org.elasticsearch.common.logging.DeprecationLogger;
2627
import org.elasticsearch.common.settings.SecureSetting;
@@ -245,6 +246,14 @@ class S3Repository extends MeteredBlobStoreRepository {
245246
Setting.Property.Dynamic
246247
);
247248

249+
/**
250+
* Whether to include {@link OperationPurpose#getKey()} as a custom query parameter on all operations.
251+
*/
252+
static final Setting<Boolean> ADD_PURPOSE_CUSTOM_QUERY_PARAMETER_SETTING = Setting.boolSetting(
253+
"add_purpose_custom_query_parameter",
254+
false
255+
);
256+
248257
private final S3Service service;
249258

250259
private final String bucket;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.s3;
11+
12+
import fixture.s3.S3HttpFixture;
13+
import fixture.s3.S3HttpHandler;
14+
15+
import com.sun.net.httpserver.HttpExchange;
16+
import com.sun.net.httpserver.HttpHandler;
17+
18+
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
19+
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
20+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
21+
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
22+
import org.elasticsearch.common.settings.MockSecureSettings;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.util.CollectionUtils;
25+
import org.elasticsearch.plugins.Plugin;
26+
import org.elasticsearch.snapshots.SnapshotState;
27+
import org.elasticsearch.test.ESSingleNodeTestCase;
28+
import org.hamcrest.Matcher;
29+
import org.junit.runner.Description;
30+
import org.junit.runners.model.Statement;
31+
32+
import java.io.IOException;
33+
import java.util.Collection;
34+
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
36+
import static org.hamcrest.Matchers.hasItem;
37+
import static org.hamcrest.Matchers.not;
38+
39+
public class AddPurposeCustomQueryParameterTests extends ESSingleNodeTestCase {
40+
41+
@Override
42+
protected Collection<Class<? extends Plugin>> getPlugins() {
43+
return CollectionUtils.appendToCopyNoNullElements(super.getPlugins(), S3RepositoryPlugin.class);
44+
}
45+
46+
@Override
47+
protected Settings nodeSettings() {
48+
final var secureSettings = new MockSecureSettings();
49+
secureSettings.setString(S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace("default").getKey(), "test_access_key");
50+
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("default").getKey(), "test_secret_key");
51+
52+
return Settings.builder()
53+
.put(super.nodeSettings())
54+
.put(S3ClientSettings.REGION.getConcreteSettingForNamespace("default").getKey(), "es-test-region")
55+
.setSecureSettings(secureSettings)
56+
.build();
57+
}
58+
59+
public void testCustomQueryParameterConfiguration() throws Throwable {
60+
final var indexName = randomIdentifier();
61+
createIndex(indexName);
62+
prepareIndex(indexName).setSource("foo", "bar").get();
63+
64+
final var bucket = randomIdentifier();
65+
final var basePath = randomIdentifier();
66+
67+
runWithFixture(
68+
Settings.builder().put("bucket", bucket).put("base_path", basePath).build(), // defaults to omitting the custom params
69+
not(hasItem(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE))
70+
);
71+
72+
runWithFixture(
73+
Settings.builder().put("bucket", bucket).put("base_path", basePath).put("add_purpose_custom_query_parameter", false).build(),
74+
not(hasItem(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE))
75+
);
76+
77+
runWithFixture(
78+
Settings.builder().put("bucket", bucket).put("base_path", basePath).put("add_purpose_custom_query_parameter", true).build(),
79+
hasItem(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE)
80+
);
81+
}
82+
83+
private void runWithFixture(Settings repositorySettings, Matcher<Iterable<? super String>> queryParamMatcher) throws Throwable {
84+
final var bucket = repositorySettings.get("bucket");
85+
final var basePath = repositorySettings.get("base_path");
86+
final var httpFixture = new S3HttpFixture(true, bucket, basePath, (key, token) -> true) {
87+
@Override
88+
protected HttpHandler createHandler() {
89+
return new S3HttpHandler(bucket, basePath) {
90+
@Override
91+
public void handle(HttpExchange exchange) throws IOException {
92+
assertThat(parseRequest(exchange).queryParameters().keySet(), queryParamMatcher);
93+
super.handle(exchange);
94+
}
95+
};
96+
}
97+
};
98+
httpFixture.apply(new Statement() {
99+
@Override
100+
public void evaluate() {
101+
final var repoName = randomIdentifier();
102+
assertAcked(
103+
client().execute(
104+
TransportPutRepositoryAction.TYPE,
105+
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(S3Repository.TYPE)
106+
.settings(Settings.builder().put("endpoint", httpFixture.getAddress()).put(repositorySettings))
107+
)
108+
);
109+
110+
assertEquals(
111+
SnapshotState.SUCCESS,
112+
client().execute(
113+
TransportCreateSnapshotAction.TYPE,
114+
new CreateSnapshotRequest(TEST_REQUEST_TIMEOUT, repoName, randomIdentifier()).waitForCompletion(true)
115+
).actionGet(SAFE_AWAIT_TIMEOUT).getSnapshotInfo().state()
116+
);
117+
}
118+
}, Description.EMPTY).evaluate();
119+
}
120+
121+
}

x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
import fixture.aws.DynamicRegionSupplier;
1010
import fixture.s3.S3HttpFixture;
11+
import fixture.s3.S3HttpHandler;
1112

13+
import com.sun.net.httpserver.HttpHandler;
14+
15+
import org.elasticsearch.common.regex.Regex;
1216
import org.elasticsearch.common.settings.Settings;
1317
import org.elasticsearch.common.unit.ByteSizeValue;
1418
import org.elasticsearch.test.cluster.ElasticsearchCluster;
@@ -17,6 +21,7 @@
1721
import org.junit.rules.RuleChain;
1822
import org.junit.rules.TestRule;
1923

24+
import java.util.concurrent.atomic.AtomicBoolean;
2025
import java.util.function.Supplier;
2126

2227
import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
@@ -33,7 +38,39 @@ public class S3RepositoryAnalysisRestIT extends AbstractRepositoryAnalysisRestTe
3338
"bucket",
3439
"base_path_integration_tests",
3540
fixedAccessKey("s3_test_access_key", regionSupplier, "s3")
36-
);
41+
) {
42+
@Override
43+
protected HttpHandler createHandler() {
44+
final var delegateHandler = asInstanceOf(S3HttpHandler.class, super.createHandler());
45+
final var repoAnalysisStarted = new AtomicBoolean();
46+
return exchange -> {
47+
ensurePurposeParameterPresent(delegateHandler.parseRequest(exchange), repoAnalysisStarted);
48+
delegateHandler.handle(exchange);
49+
};
50+
}
51+
};
52+
53+
private static void ensurePurposeParameterPresent(S3HttpHandler.S3Request request, AtomicBoolean repoAnalysisStarted) {
54+
if (request.path().startsWith("/bucket/base_path_integration_tests/temp-analysis-")) {
55+
repoAnalysisStarted.set(true);
56+
}
57+
if (repoAnalysisStarted.get() == false) {
58+
if (Regex.simpleMatch("/bucket/base_path_integration_tests/tests-*/master.dat", request.path())
59+
|| Regex.simpleMatch("/bucket/base_path_integration_tests/tests-*/data-*.dat", request.path())
60+
|| (request.isListObjectsRequest() && request.getQueryParamOnce("prefix").startsWith("base_path_integration_tests/tests-"))
61+
|| (request.isMultiObjectDeleteRequest())) {
62+
// verify repository is not part of repo analysis so will have different/missing x-purpose parameter
63+
return;
64+
}
65+
if (request.isListObjectsRequest() && request.getQueryParamOnce("prefix").equals("base_path_integration_tests/index-")) {
66+
// getRepositoryData looking for root index-N blob will have different/missing x-purpose parameter
67+
return;
68+
}
69+
}
70+
repoAnalysisStarted.set(true);
71+
assertTrue(request.toString(), request.hasQueryParamOnce("x-purpose"));
72+
assertEquals(request.toString(), "RepositoryAnalysis", request.getQueryParamOnce("x-purpose"));
73+
}
3774

3875
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
3976
.distribution(DistributionType.DEFAULT)
@@ -73,6 +110,7 @@ protected Settings repositorySettings() {
73110
.put("delete_objects_max_size", between(1, 1000))
74111
.put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones
75112
.put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5))
113+
.put("add_purpose_custom_query_parameter", randomBoolean())
76114
.build();
77115
}
78116
}

0 commit comments

Comments
 (0)