Skip to content

Commit dc8afd2

Browse files
authored
Optionally make S3 writes unconditional (#137185)
As of #133030 Elasticsearch uses S3's support for conditional writes to protect against repository corruption. It is possible that some other storage system may claim to be fully S3-compatible despite rejecting such requests. This commit adds a repository setting that allows to make all writes unconditional, unsafely disabling the corruption protection, but allowing users some time to work with their storage supplier to address the incompatibility.
1 parent aebedd6 commit dc8afd2

File tree

8 files changed

+225
-13
lines changed

8 files changed

+225
-13
lines changed

modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@
3333

3434
public abstract class AbstractRepositoryS3RestTestCase extends ESRestTestCase {
3535

36-
public record TestRepository(String repositoryName, String clientName, String bucketName, String basePath) {
37-
38-
public Closeable register() throws IOException {
39-
return register(UnaryOperator.identity());
40-
}
41-
36+
public record TestRepository(
37+
String repositoryName,
38+
String clientName,
39+
String bucketName,
40+
String basePath,
41+
Settings extraRepositorySettings
42+
) {
4243
public Closeable register(UnaryOperator<Settings> settingsUnaryOperator) throws IOException {
4344
assertOK(client().performRequest(getRegisterRequest(settingsUnaryOperator)));
4445
return () -> assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repositoryName())));
@@ -65,6 +66,7 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
6566
Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()
6667
)
6768
)
69+
.put(extraRepositorySettings)
6870
.build()
6971
)
7072
)
@@ -73,6 +75,10 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
7375
}
7476
}
7577

78+
protected Settings extraRepositorySettings() {
79+
return Settings.EMPTY;
80+
}
81+
7682
protected abstract String getBucketName();
7783

7884
protected abstract String getBasePath();
@@ -84,7 +90,7 @@ protected static String getIdentifierPrefix(String testSuiteName) {
8490
}
8591

8692
private TestRepository newTestRepository() {
87-
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath());
93+
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
8894
}
8995

9096
private static UnaryOperator<Settings> readonlyOperator(Boolean readonly) {
@@ -152,7 +158,8 @@ private void testNonexistentBucket(Boolean readonly) throws Exception {
152158
randomIdentifier(),
153159
getClientName(),
154160
randomValueOtherThan(getBucketName(), ESTestCase::randomIdentifier),
155-
getBasePath()
161+
getBasePath(),
162+
extraRepositorySettings()
156163
);
157164
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));
158165

@@ -180,7 +187,8 @@ private void testNonexistentClient(Boolean readonly) throws Exception {
180187
randomIdentifier(),
181188
randomValueOtherThanMany(c -> c.equals(getClientName()) || c.equals("default"), ESTestCase::randomIdentifier),
182189
getBucketName(),
183-
getBasePath()
190+
getBasePath(),
191+
extraRepositorySettings()
184192
);
185193
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));
186194

@@ -267,7 +275,7 @@ private void testUsageStats(Boolean readonly) throws Exception {
267275

268276
public void testSnapshotAndRestore() throws Exception {
269277
final var repository = newTestRepository();
270-
try (var ignored = repository.register()) {
278+
try (var ignored = repository.register(UnaryOperator.identity())) {
271279
final var repositoryName = repository.repositoryName();
272280
final var indexName = randomIdentifier();
273281
final var snapshotsToDelete = new ArrayList<String>(2);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.s3;
11+
12+
import fixture.aws.DynamicRegionSupplier;
13+
import fixture.s3.S3HttpFixture;
14+
import fixture.s3.S3HttpHandler;
15+
16+
import com.carrotsearch.randomizedtesting.annotations.SuppressForbidden;
17+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
18+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
19+
import com.sun.net.httpserver.HttpExchange;
20+
import com.sun.net.httpserver.HttpHandler;
21+
22+
import org.elasticsearch.ExceptionsHelper;
23+
import org.elasticsearch.common.ReferenceDocs;
24+
import org.elasticsearch.common.Strings;
25+
import org.elasticsearch.common.io.Streams;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
28+
import org.elasticsearch.test.cluster.LogType;
29+
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
30+
import org.junit.ClassRule;
31+
import org.junit.rules.RuleChain;
32+
import org.junit.rules.TestRule;
33+
34+
import java.io.IOException;
35+
import java.util.function.Supplier;
36+
import java.util.function.UnaryOperator;
37+
38+
import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
39+
import static org.hamcrest.Matchers.allOf;
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.hasItem;
42+
43+
@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
44+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482
45+
@SuppressForbidden("HttpExchange and Headers are ok here")
46+
public class RepositoryS3ConditionalWritesUnsupportedRestIT extends AbstractRepositoryS3RestTestCase {
47+
48+
private static final String PREFIX = getIdentifierPrefix("RepositoryS3BasicCredentialsRestIT");
49+
private static final String BUCKET = PREFIX + "bucket";
50+
private static final String BASE_PATH = PREFIX + "base_path";
51+
private static final String ACCESS_KEY = PREFIX + "access-key";
52+
private static final String SECRET_KEY = PREFIX + "secret-key";
53+
private static final String CLIENT = "no_conditional_writes_client";
54+
55+
private static final Supplier<String> regionSupplier = new DynamicRegionSupplier();
56+
57+
private static final S3HttpFixture s3Fixture = new S3HttpFixture(
58+
true,
59+
BUCKET,
60+
BASE_PATH,
61+
fixedAccessKey(ACCESS_KEY, regionSupplier, "s3")
62+
) {
63+
@Override
64+
@SuppressForbidden("HttpExchange and Headers are ok here")
65+
protected HttpHandler createHandler() {
66+
return new AssertNoConditionalWritesHandler(asInstanceOf(S3HttpHandler.class, super.createHandler()));
67+
}
68+
};
69+
70+
@SuppressForbidden("HttpExchange and Headers are ok here")
71+
private static class AssertNoConditionalWritesHandler implements HttpHandler {
72+
73+
private final S3HttpHandler delegateHandler;
74+
75+
private AssertNoConditionalWritesHandler(S3HttpHandler delegateHandler) {
76+
this.delegateHandler = delegateHandler;
77+
}
78+
79+
@Override
80+
public void handle(HttpExchange exchange) throws IOException {
81+
if (exchange.getRequestHeaders().containsKey("if-match") || exchange.getRequestHeaders().containsKey("if-none-match")) {
82+
final var exception = new AssertionError(
83+
Strings.format(
84+
"unsupported conditional write: [%s] with headers [%s]",
85+
delegateHandler.parseRequest(exchange),
86+
exchange.getRequestHeaders()
87+
)
88+
);
89+
ExceptionsHelper.maybeDieOnAnotherThread(exception);
90+
throw exception;
91+
}
92+
delegateHandler.handle(exchange);
93+
}
94+
}
95+
96+
@Override
97+
protected Settings extraRepositorySettings() {
98+
return Settings.builder()
99+
.put(super.extraRepositorySettings())
100+
.put(S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.getKey(), true)
101+
.build();
102+
}
103+
104+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
105+
.module("repository-s3")
106+
.systemProperty("aws.region", regionSupplier)
107+
.keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY)
108+
.keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY)
109+
.setting("s3.client." + CLIENT + ".endpoint", s3Fixture::getAddress)
110+
.build();
111+
112+
@ClassRule
113+
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster);
114+
115+
@Override
116+
protected String getTestRestCluster() {
117+
return cluster.getHttpAddresses();
118+
}
119+
120+
@Override
121+
protected String getBucketName() {
122+
return BUCKET;
123+
}
124+
125+
@Override
126+
protected String getBasePath() {
127+
return BASE_PATH;
128+
}
129+
130+
@Override
131+
protected String getClientName() {
132+
return CLIENT;
133+
}
134+
135+
public void testWarningLog() throws IOException {
136+
final var repoName = randomIdentifier();
137+
final var testRepository = new TestRepository(repoName, getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
138+
try (var ignored = testRepository.register(UnaryOperator.identity()); var logStream = cluster.getNodeLog(0, LogType.SERVER)) {
139+
assertThat(
140+
Streams.readAllLines(logStream),
141+
hasItem(
142+
allOf(
143+
containsString("WARN"),
144+
containsString(repoName),
145+
containsString("""
146+
is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve this \
147+
warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the \
148+
[unsafely_incompatible_with_s3_conditional_writes] repository setting"""),
149+
containsString(ReferenceDocs.S3_COMPATIBLE_REPOSITORIES.toString())
150+
)
151+
)
152+
);
153+
}
154+
}
155+
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ void executeSingleUpload(
563563
if (s3BlobStore.serverSideEncryption()) {
564564
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
565565
}
566-
if (failIfAlreadyExists) {
566+
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
567567
putRequestBuilder.ifNoneMatch("*");
568568
}
569569
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
@@ -642,7 +642,7 @@ private void executeMultipart(
642642
.uploadId(uploadId)
643643
.multipartUpload(b -> b.parts(parts));
644644

645-
if (failIfAlreadyExists) {
645+
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
646646
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
647647
}
648648

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class S3BlobStore implements BlobStore {
9494

9595
private final StorageClass storageClass;
9696

97+
private final boolean supportsConditionalWrites;
98+
9799
private final RepositoryMetadata repositoryMetadata;
98100

99101
private final ThreadPool threadPool;
@@ -118,6 +120,7 @@ class S3BlobStore implements BlobStore {
118120
ByteSizeValue maxCopySizeBeforeMultipart,
119121
String cannedACL,
120122
String storageClass,
123+
boolean supportConditionalWrites,
121124
RepositoryMetadata repositoryMetadata,
122125
BigArrays bigArrays,
123126
ThreadPool threadPool,
@@ -133,6 +136,7 @@ class S3BlobStore implements BlobStore {
133136
this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart;
134137
this.cannedACL = initCannedACL(cannedACL);
135138
this.storageClass = initStorageClass(storageClass);
139+
this.supportsConditionalWrites = supportConditionalWrites;
136140
this.repositoryMetadata = repositoryMetadata;
137141
this.threadPool = threadPool;
138142
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
@@ -623,4 +627,13 @@ public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrid
623627
}
624628
}
625629

630+
/**
631+
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
632+
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
633+
* {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES} setting.
634+
*/
635+
public boolean supportsConditionalWrites(OperationPurpose purpose) {
636+
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including conditional write support
637+
return supportsConditionalWrites || purpose == OperationPurpose.REPOSITORY_ANALYSIS;
638+
}
626639
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,11 @@ class S3Repository extends MeteredBlobStoreRepository {
249249
Setting.Property.Dynamic
250250
);
251251

252+
static final Setting<Boolean> UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES = Setting.boolSetting(
253+
"unsafely_incompatible_with_s3_conditional_writes",
254+
false
255+
);
256+
252257
private final S3Service service;
253258

254259
private final String bucket;
@@ -271,6 +276,13 @@ class S3Repository extends MeteredBlobStoreRepository {
271276
*/
272277
private final TimeValue coolDown;
273278

279+
/**
280+
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
281+
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
282+
* {@link #UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES} setting.
283+
*/
284+
private final boolean supportsConditionalWrites;
285+
274286
private final Executor snapshotExecutor;
275287

276288
private final S3RepositoriesMetrics s3RepositoriesMetrics;
@@ -347,6 +359,19 @@ class S3Repository extends MeteredBlobStoreRepository {
347359
}
348360

349361
coolDown = COOLDOWN_PERIOD.get(metadata.settings());
362+
supportsConditionalWrites = UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.get(metadata.settings()) == Boolean.FALSE;
363+
364+
if (supportsConditionalWrites == false) {
365+
logger.warn(
366+
"""
367+
repository [{}] is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve \
368+
this warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the [{}] \
369+
repository setting; for more information, see [{}]""",
370+
metadata.name(),
371+
UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.getKey(),
372+
ReferenceDocs.S3_COMPATIBLE_REPOSITORIES
373+
);
374+
}
350375

351376
logger.debug(
352377
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], "
@@ -485,6 +510,7 @@ protected S3BlobStore createBlobStore() {
485510
maxCopySizeBeforeMultipart,
486511
cannedACL,
487512
storageClass,
513+
supportsConditionalWrites,
488514
metadata,
489515
bigArrays,
490516
threadPool,

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ protected BlobContainer createBlobContainer(
249249
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
250250
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
251251
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
252+
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.getDefault(Settings.EMPTY),
252253
repositoryMetadata,
253254
BigArrays.NON_RECYCLING_INSTANCE,
254255
new DeterministicTaskQueue().getThreadPool(),

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public void testExecuteSingleUpload() throws IOException {
117117
final S3BlobStore blobStore = mock(S3BlobStore.class);
118118
when(blobStore.bucket()).thenReturn(bucketName);
119119
when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize);
120+
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);
120121

121122
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
122123

@@ -238,6 +239,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
238239
final S3BlobStore blobStore = mock(S3BlobStore.class);
239240
when(blobStore.bucket()).thenReturn(bucketName);
240241
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
242+
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);
241243

242244
final S3BlobStore sourceBlobStore = mock(S3BlobStore.class);
243245
when(sourceBlobStore.bucket()).thenReturn(sourceBucketName);

x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,14 @@ protected Settings repositorySettings() {
112112
.put("delete_objects_max_size", between(1, 1000))
113113
.put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones
114114
.put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5))
115-
.put(randomFrom(Settings.EMPTY, Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()))
115+
// verify we always set the x-purpose header even if disabled for other repository operations
116+
.put(randomBooleanSetting("add_purpose_custom_query_parameter"))
117+
// this parameter is ignored for repo analysis
118+
.put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes"))
116119
.build();
117120
}
121+
122+
private Settings randomBooleanSetting(String settingKey) {
123+
return randomFrom(Settings.EMPTY, Settings.builder().put(settingKey, randomBoolean()).build());
124+
}
118125
}

0 commit comments

Comments
 (0)