Skip to content

Commit cdc1b4c

Browse files
authored
Optionally make S3 writes unconditional (#138735)
As of #133030 Elasticsearch uses S3's support for conditional writes to protect against repository corruption. It is possible that some other storage system may claim to be fully S3-compatible despite rejecting such requests. This commit adds a repository setting that allows to make all writes unconditional, unsafely disabling the corruption protection, but allowing users some time to work with their storage supplier to address the incompatibility. Backport of #137185 and #138406 to 9.2
1 parent ffb689d commit cdc1b4c

File tree

8 files changed

+225
-13
lines changed

8 files changed

+225
-13
lines changed

modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@
3333

3434
public abstract class AbstractRepositoryS3RestTestCase extends ESRestTestCase {
3535

36-
public record TestRepository(String repositoryName, String clientName, String bucketName, String basePath) {
37-
38-
public Closeable register() throws IOException {
39-
return register(UnaryOperator.identity());
40-
}
41-
36+
public record TestRepository(
37+
String repositoryName,
38+
String clientName,
39+
String bucketName,
40+
String basePath,
41+
Settings extraRepositorySettings
42+
) {
4243
public Closeable register(UnaryOperator<Settings> settingsUnaryOperator) throws IOException {
4344
assertOK(client().performRequest(getRegisterRequest(settingsUnaryOperator)));
4445
return () -> assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repositoryName())));
@@ -65,6 +66,7 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
6566
Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()
6667
)
6768
)
69+
.put(extraRepositorySettings)
6870
.build()
6971
)
7072
)
@@ -73,6 +75,10 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
7375
}
7476
}
7577

78+
protected Settings extraRepositorySettings() {
79+
return Settings.EMPTY;
80+
}
81+
7682
protected abstract String getBucketName();
7783

7884
protected abstract String getBasePath();
@@ -84,7 +90,7 @@ protected static String getIdentifierPrefix(String testSuiteName) {
8490
}
8591

8692
private TestRepository newTestRepository() {
87-
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath());
93+
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
8894
}
8995

9096
private static UnaryOperator<Settings> readonlyOperator(Boolean readonly) {
@@ -152,7 +158,8 @@ private void testNonexistentBucket(Boolean readonly) throws Exception {
152158
randomIdentifier(),
153159
getClientName(),
154160
randomValueOtherThan(getBucketName(), ESTestCase::randomIdentifier),
155-
getBasePath()
161+
getBasePath(),
162+
extraRepositorySettings()
156163
);
157164
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));
158165

@@ -180,7 +187,8 @@ private void testNonexistentClient(Boolean readonly) throws Exception {
180187
randomIdentifier(),
181188
randomValueOtherThanMany(c -> c.equals(getClientName()) || c.equals("default"), ESTestCase::randomIdentifier),
182189
getBucketName(),
183-
getBasePath()
190+
getBasePath(),
191+
extraRepositorySettings()
184192
);
185193
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));
186194

@@ -267,7 +275,7 @@ private void testUsageStats(Boolean readonly) throws Exception {
267275

268276
public void testSnapshotAndRestore() throws Exception {
269277
final var repository = newTestRepository();
270-
try (var ignored = repository.register()) {
278+
try (var ignored = repository.register(UnaryOperator.identity())) {
271279
final var repositoryName = repository.repositoryName();
272280
final var indexName = randomIdentifier();
273281
final var snapshotsToDelete = new ArrayList<String>(2);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.s3;
11+
12+
import fixture.aws.DynamicRegionSupplier;
13+
import fixture.s3.S3HttpFixture;
14+
import fixture.s3.S3HttpHandler;
15+
16+
import com.carrotsearch.randomizedtesting.annotations.SuppressForbidden;
17+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
18+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
19+
import com.sun.net.httpserver.HttpExchange;
20+
import com.sun.net.httpserver.HttpHandler;
21+
22+
import org.elasticsearch.ExceptionsHelper;
23+
import org.elasticsearch.common.ReferenceDocs;
24+
import org.elasticsearch.common.Strings;
25+
import org.elasticsearch.common.io.Streams;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
28+
import org.elasticsearch.test.cluster.LogType;
29+
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
30+
import org.junit.ClassRule;
31+
import org.junit.rules.RuleChain;
32+
import org.junit.rules.TestRule;
33+
34+
import java.io.IOException;
35+
import java.util.function.Supplier;
36+
import java.util.function.UnaryOperator;
37+
38+
import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
39+
import static org.hamcrest.Matchers.allOf;
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.hasItem;
42+
43+
@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
44+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482
45+
@SuppressForbidden("HttpExchange and Headers are ok here")
46+
public class RepositoryS3ConditionalWritesUnsupportedRestIT extends AbstractRepositoryS3RestTestCase {
47+
48+
private static final String PREFIX = getIdentifierPrefix("RepositoryS3BasicCredentialsRestIT");
49+
private static final String BUCKET = PREFIX + "bucket";
50+
private static final String BASE_PATH = PREFIX + "base_path";
51+
private static final String ACCESS_KEY = PREFIX + "access-key";
52+
private static final String SECRET_KEY = PREFIX + "secret-key";
53+
private static final String CLIENT = "no_conditional_writes_client";
54+
55+
private static final Supplier<String> regionSupplier = new DynamicRegionSupplier();
56+
57+
private static final S3HttpFixture s3Fixture = new S3HttpFixture(
58+
true,
59+
BUCKET,
60+
BASE_PATH,
61+
fixedAccessKey(ACCESS_KEY, regionSupplier, "s3")
62+
) {
63+
@Override
64+
@SuppressForbidden("HttpExchange and Headers are ok here")
65+
protected HttpHandler createHandler() {
66+
return new AssertNoConditionalWritesHandler(asInstanceOf(S3HttpHandler.class, super.createHandler()));
67+
}
68+
};
69+
70+
@SuppressForbidden("HttpExchange and Headers are ok here")
71+
private static class AssertNoConditionalWritesHandler implements HttpHandler {
72+
73+
private final S3HttpHandler delegateHandler;
74+
75+
private AssertNoConditionalWritesHandler(S3HttpHandler delegateHandler) {
76+
this.delegateHandler = delegateHandler;
77+
}
78+
79+
@Override
80+
public void handle(HttpExchange exchange) throws IOException {
81+
if (exchange.getRequestHeaders().containsKey("if-match") || exchange.getRequestHeaders().containsKey("if-none-match")) {
82+
final var exception = new AssertionError(
83+
Strings.format(
84+
"unsupported conditional write: [%s] with headers [%s]",
85+
delegateHandler.parseRequest(exchange),
86+
exchange.getRequestHeaders()
87+
)
88+
);
89+
ExceptionsHelper.maybeDieOnAnotherThread(exception);
90+
throw exception;
91+
}
92+
delegateHandler.handle(exchange);
93+
}
94+
}
95+
96+
@Override
97+
protected Settings extraRepositorySettings() {
98+
return Settings.builder()
99+
.put(super.extraRepositorySettings())
100+
.put(S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getKey(), true)
101+
.build();
102+
}
103+
104+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
105+
.module("repository-s3")
106+
.systemProperty("aws.region", regionSupplier)
107+
.keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY)
108+
.keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY)
109+
.setting("s3.client." + CLIENT + ".endpoint", s3Fixture::getAddress)
110+
.build();
111+
112+
@ClassRule
113+
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster);
114+
115+
@Override
116+
protected String getTestRestCluster() {
117+
return cluster.getHttpAddresses();
118+
}
119+
120+
@Override
121+
protected String getBucketName() {
122+
return BUCKET;
123+
}
124+
125+
@Override
126+
protected String getBasePath() {
127+
return BASE_PATH;
128+
}
129+
130+
@Override
131+
protected String getClientName() {
132+
return CLIENT;
133+
}
134+
135+
public void testWarningLog() throws IOException {
136+
final var repoName = randomIdentifier();
137+
final var testRepository = new TestRepository(repoName, getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
138+
try (var ignored = testRepository.register(UnaryOperator.identity()); var logStream = cluster.getNodeLog(0, LogType.SERVER)) {
139+
assertThat(
140+
Streams.readAllLines(logStream),
141+
hasItem(
142+
allOf(
143+
containsString("WARN"),
144+
containsString(repoName),
145+
containsString("""
146+
is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve this \
147+
warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the \
148+
[unsafely_incompatible_with_s3_conditional_writes] repository setting"""),
149+
containsString(ReferenceDocs.S3_COMPATIBLE_REPOSITORIES.toString())
150+
)
151+
)
152+
);
153+
}
154+
}
155+
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ void executeSingleUpload(
563563
if (s3BlobStore.serverSideEncryption()) {
564564
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
565565
}
566-
if (failIfAlreadyExists) {
566+
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
567567
putRequestBuilder.ifNoneMatch("*");
568568
}
569569
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
@@ -642,7 +642,7 @@ private void executeMultipart(
642642
.uploadId(uploadId)
643643
.multipartUpload(b -> b.parts(parts));
644644

645-
if (failIfAlreadyExists) {
645+
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
646646
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
647647
}
648648

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class S3BlobStore implements BlobStore {
9494

9595
private final StorageClass storageClass;
9696

97+
private final boolean supportsConditionalWrites;
98+
9799
private final RepositoryMetadata repositoryMetadata;
98100

99101
private final ThreadPool threadPool;
@@ -118,6 +120,7 @@ class S3BlobStore implements BlobStore {
118120
ByteSizeValue maxCopySizeBeforeMultipart,
119121
String cannedACL,
120122
String storageClass,
123+
boolean supportConditionalWrites,
121124
RepositoryMetadata repositoryMetadata,
122125
BigArrays bigArrays,
123126
ThreadPool threadPool,
@@ -133,6 +136,7 @@ class S3BlobStore implements BlobStore {
133136
this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart;
134137
this.cannedACL = initCannedACL(cannedACL);
135138
this.storageClass = initStorageClass(storageClass);
139+
this.supportsConditionalWrites = supportConditionalWrites;
136140
this.repositoryMetadata = repositoryMetadata;
137141
this.threadPool = threadPool;
138142
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
@@ -623,4 +627,13 @@ public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrid
623627
}
624628
}
625629

630+
/**
631+
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
632+
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
633+
* {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting.
634+
*/
635+
public boolean supportsConditionalWrites(OperationPurpose purpose) {
636+
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including conditional write support
637+
return supportsConditionalWrites || purpose == OperationPurpose.REPOSITORY_ANALYSIS;
638+
}
626639
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,11 @@ class S3Repository extends MeteredBlobStoreRepository {
249249
Setting.Property.Dynamic
250250
);
251251

252+
static final Setting<Boolean> UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES = Setting.boolSetting(
253+
"unsafely_incompatible_with_s3_conditional_writes",
254+
false
255+
);
256+
252257
private final S3Service service;
253258

254259
private final String bucket;
@@ -271,6 +276,13 @@ class S3Repository extends MeteredBlobStoreRepository {
271276
*/
272277
private final TimeValue coolDown;
273278

279+
/**
280+
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
281+
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
282+
* {@link #UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting.
283+
*/
284+
private final boolean supportsConditionalWrites;
285+
274286
private final Executor snapshotExecutor;
275287

276288
private final S3RepositoriesMetrics s3RepositoriesMetrics;
@@ -347,6 +359,19 @@ class S3Repository extends MeteredBlobStoreRepository {
347359
}
348360

349361
coolDown = COOLDOWN_PERIOD.get(metadata.settings());
362+
supportsConditionalWrites = UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.get(metadata.settings()) == Boolean.FALSE;
363+
364+
if (supportsConditionalWrites == false) {
365+
logger.warn(
366+
"""
367+
repository [{}] is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve \
368+
this warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the [{}] \
369+
repository setting; for more information, see [{}]""",
370+
metadata.name(),
371+
UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getKey(),
372+
ReferenceDocs.S3_COMPATIBLE_REPOSITORIES
373+
);
374+
}
350375

351376
logger.debug(
352377
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], "
@@ -485,6 +510,7 @@ protected S3BlobStore createBlobStore() {
485510
maxCopySizeBeforeMultipart,
486511
cannedACL,
487512
storageClass,
513+
supportsConditionalWrites,
488514
metadata,
489515
bigArrays,
490516
threadPool,

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ protected BlobContainer createBlobContainer(
249249
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
250250
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
251251
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
252+
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY),
252253
repositoryMetadata,
253254
BigArrays.NON_RECYCLING_INSTANCE,
254255
new DeterministicTaskQueue().getThreadPool(),

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public void testExecuteSingleUpload() throws IOException {
117117
final S3BlobStore blobStore = mock(S3BlobStore.class);
118118
when(blobStore.bucket()).thenReturn(bucketName);
119119
when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize);
120+
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);
120121

121122
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
122123

@@ -238,6 +239,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
238239
final S3BlobStore blobStore = mock(S3BlobStore.class);
239240
when(blobStore.bucket()).thenReturn(bucketName);
240241
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
242+
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);
241243

242244
final S3BlobStore sourceBlobStore = mock(S3BlobStore.class);
243245
when(sourceBlobStore.bucket()).thenReturn(sourceBucketName);

x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,14 @@ protected Settings repositorySettings() {
112112
.put("delete_objects_max_size", between(1, 1000))
113113
.put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones
114114
.put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5))
115-
.put(randomFrom(Settings.EMPTY, Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()))
115+
// verify we always set the x-purpose header even if disabled for other repository operations
116+
.put(randomBooleanSetting("add_purpose_custom_query_parameter"))
117+
// this parameter is ignored for repo analysis
118+
.put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes"))
116119
.build();
117120
}
121+
122+
private Settings randomBooleanSetting(String settingKey) {
123+
return randomFrom(Settings.EMPTY, Settings.builder().put(settingKey, randomBoolean()).build());
124+
}
118125
}

0 commit comments

Comments
 (0)