diff --git a/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java b/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java index fb7e0215003fb..43c91e5b08042 100644 --- a/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java +++ b/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/AbstractRepositoryS3RestTestCase.java @@ -33,12 +33,13 @@ public abstract class AbstractRepositoryS3RestTestCase extends ESRestTestCase { - public record TestRepository(String repositoryName, String clientName, String bucketName, String basePath) { - - public Closeable register() throws IOException { - return register(UnaryOperator.identity()); - } - + public record TestRepository( + String repositoryName, + String clientName, + String bucketName, + String basePath, + Settings extraRepositorySettings + ) { public Closeable register(UnaryOperator settingsUnaryOperator) throws IOException { assertOK(client().performRequest(getRegisterRequest(settingsUnaryOperator))); return () -> assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repositoryName()))); @@ -65,6 +66,7 @@ private Request getRegisterRequest(UnaryOperator settingsUnaryOperator Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build() ) ) + .put(extraRepositorySettings) .build() ) ) @@ -73,6 +75,10 @@ private Request getRegisterRequest(UnaryOperator settingsUnaryOperator } } + protected Settings extraRepositorySettings() { + return Settings.EMPTY; + } + protected abstract String getBucketName(); protected abstract String getBasePath(); @@ -84,7 +90,7 @@ protected static String getIdentifierPrefix(String testSuiteName) { } private TestRepository newTestRepository() { - return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath()); + return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath(), extraRepositorySettings()); } private static UnaryOperator readonlyOperator(Boolean readonly) { @@ -152,7 +158,8 @@ private void testNonexistentBucket(Boolean readonly) throws Exception { randomIdentifier(), getClientName(), randomValueOtherThan(getBucketName(), ESTestCase::randomIdentifier), - getBasePath() + getBasePath(), + extraRepositorySettings() ); final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly)); @@ -180,7 +187,8 @@ private void testNonexistentClient(Boolean readonly) throws Exception { randomIdentifier(), randomValueOtherThanMany(c -> c.equals(getClientName()) || c.equals("default"), ESTestCase::randomIdentifier), getBucketName(), - getBasePath() + getBasePath(), + extraRepositorySettings() ); final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly)); @@ -267,7 +275,7 @@ private void testUsageStats(Boolean readonly) throws Exception { public void testSnapshotAndRestore() throws Exception { final var repository = newTestRepository(); - try (var ignored = repository.register()) { + try (var ignored = repository.register(UnaryOperator.identity())) { final var repositoryName = repository.repositoryName(); final var indexName = randomIdentifier(); final var snapshotsToDelete = new ArrayList(2); diff --git a/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3ConditionalWritesUnsupportedRestIT.java b/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3ConditionalWritesUnsupportedRestIT.java new file mode 100644 index 0000000000000..1ac7435dc7750 --- /dev/null +++ b/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3ConditionalWritesUnsupportedRestIT.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.s3; + +import fixture.aws.DynamicRegionSupplier; +import fixture.s3.S3HttpFixture; +import fixture.s3.S3HttpHandler; + +import com.carrotsearch.randomizedtesting.annotations.SuppressForbidden; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.LogType; +import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import static fixture.aws.AwsCredentialsUtils.fixedAccessKey; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; + +@ThreadLeakFilters(filters = { TestContainersThreadFilter.class }) +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482 +@SuppressForbidden("HttpExchange and Headers are ok here") +public class RepositoryS3ConditionalWritesUnsupportedRestIT extends AbstractRepositoryS3RestTestCase { + + private static final String PREFIX = getIdentifierPrefix("RepositoryS3BasicCredentialsRestIT"); + private static final String BUCKET = PREFIX + "bucket"; + private static final String BASE_PATH = PREFIX + "base_path"; + private static final String ACCESS_KEY = PREFIX + "access-key"; + private static final String SECRET_KEY = PREFIX + "secret-key"; + private static final String CLIENT = "no_conditional_writes_client"; + + private static final Supplier regionSupplier = new DynamicRegionSupplier(); + + private static final S3HttpFixture s3Fixture = new S3HttpFixture( + true, + BUCKET, + BASE_PATH, + fixedAccessKey(ACCESS_KEY, regionSupplier, "s3") + ) { + @Override + @SuppressForbidden("HttpExchange and Headers are ok here") + protected HttpHandler createHandler() { + return new AssertNoConditionalWritesHandler(asInstanceOf(S3HttpHandler.class, super.createHandler())); + } + }; + + @SuppressForbidden("HttpExchange and Headers are ok here") + private static class AssertNoConditionalWritesHandler implements HttpHandler { + + private final S3HttpHandler delegateHandler; + + private AssertNoConditionalWritesHandler(S3HttpHandler delegateHandler) { + this.delegateHandler = delegateHandler; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + if (exchange.getRequestHeaders().containsKey("if-match") || exchange.getRequestHeaders().containsKey("if-none-match")) { + final var exception = new AssertionError( + Strings.format( + "unsupported conditional write: [%s] with headers [%s]", + delegateHandler.parseRequest(exchange), + exchange.getRequestHeaders() + ) + ); + ExceptionsHelper.maybeDieOnAnotherThread(exception); + throw exception; + } + delegateHandler.handle(exchange); + } + } + + @Override + protected Settings extraRepositorySettings() { + return Settings.builder() + .put(super.extraRepositorySettings()) + .put(S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getKey(), true) + .build(); + } + + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("repository-s3") + .systemProperty("aws.region", regionSupplier) + .keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY) + .keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY) + .setting("s3.client." + CLIENT + ".endpoint", s3Fixture::getAddress) + .build(); + + @ClassRule + public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected String getBucketName() { + return BUCKET; + } + + @Override + protected String getBasePath() { + return BASE_PATH; + } + + @Override + protected String getClientName() { + return CLIENT; + } + + public void testWarningLog() throws IOException { + final var repoName = randomIdentifier(); + final var testRepository = new TestRepository(repoName, getClientName(), getBucketName(), getBasePath(), extraRepositorySettings()); + try (var ignored = testRepository.register(UnaryOperator.identity()); var logStream = cluster.getNodeLog(0, LogType.SERVER)) { + assertThat( + Streams.readAllLines(logStream), + hasItem( + allOf( + containsString("WARN"), + containsString(repoName), + containsString(""" + is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve this \ + warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the \ + [unsafely_incompatible_with_s3_conditional_writes] repository setting"""), + containsString(ReferenceDocs.S3_COMPATIBLE_REPOSITORIES.toString()) + ) + ) + ); + } + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index dcd3a7dbe6533..4ff6c9291079f 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -563,7 +563,7 @@ void executeSingleUpload( if (s3BlobStore.serverSideEncryption()) { putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } - if (failIfAlreadyExists) { + if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) { putRequestBuilder.ifNoneMatch("*"); } S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose); @@ -642,7 +642,7 @@ private void executeMultipart( .uploadId(uploadId) .multipartUpload(b -> b.parts(parts)); - if (failIfAlreadyExists) { + if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) { completeMultipartUploadRequestBuilder.ifNoneMatch("*"); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 8e5f7939ae396..1b2ce77c2a7b6 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -94,6 +94,8 @@ class S3BlobStore implements BlobStore { private final StorageClass storageClass; + private final boolean supportsConditionalWrites; + private final RepositoryMetadata repositoryMetadata; private final ThreadPool threadPool; @@ -118,6 +120,7 @@ class S3BlobStore implements BlobStore { ByteSizeValue maxCopySizeBeforeMultipart, String cannedACL, String storageClass, + boolean supportConditionalWrites, RepositoryMetadata repositoryMetadata, BigArrays bigArrays, ThreadPool threadPool, @@ -133,6 +136,7 @@ class S3BlobStore implements BlobStore { this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart; this.cannedACL = initCannedACL(cannedACL); this.storageClass = initStorageClass(storageClass); + this.supportsConditionalWrites = supportConditionalWrites; this.repositoryMetadata = repositoryMetadata; this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); @@ -623,4 +627,13 @@ public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrid } } + /** + * Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality + * properly. We allow to disable the use of this functionality, making all writes unconditional, using the + * {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting. + */ + public boolean supportsConditionalWrites(OperationPurpose purpose) { + // REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including conditional write support + return supportsConditionalWrites || purpose == OperationPurpose.REPOSITORY_ANALYSIS; + } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 5d008b4bd375e..5cc4eaa22e3bb 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -249,6 +249,11 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.Dynamic ); + static final Setting UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES = Setting.boolSetting( + "unsafely_incompatible_with_s3_conditional_writes", + false + ); + private final S3Service service; private final String bucket; @@ -271,6 +276,13 @@ class S3Repository extends MeteredBlobStoreRepository { */ private final TimeValue coolDown; + /** + * Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality + * properly. We allow to disable the use of this functionality, making all writes unconditional, using the + * {@link #UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting. + */ + private final boolean supportsConditionalWrites; + private final Executor snapshotExecutor; private final S3RepositoriesMetrics s3RepositoriesMetrics; @@ -347,6 +359,19 @@ class S3Repository extends MeteredBlobStoreRepository { } coolDown = COOLDOWN_PERIOD.get(metadata.settings()); + supportsConditionalWrites = UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.get(metadata.settings()) == Boolean.FALSE; + + if (supportsConditionalWrites == false) { + logger.warn( + """ + repository [{}] is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve \ + this warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the [{}] \ + repository setting; for more information, see [{}]""", + metadata.name(), + UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getKey(), + ReferenceDocs.S3_COMPATIBLE_REPOSITORIES + ); + } logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], " @@ -485,6 +510,7 @@ protected S3BlobStore createBlobStore() { maxCopySizeBeforeMultipart, cannedACL, storageClass, + supportsConditionalWrites, metadata, bigArrays, threadPool, diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 4da5cb43e7f44..e217cffc6d3aa 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -249,6 +249,7 @@ protected BlobContainer createBlobContainer( S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY), S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY), S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY), + S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY), repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java index 90c9793921f95..662c2137b79e2 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java @@ -117,6 +117,7 @@ public void testExecuteSingleUpload() throws IOException { final S3BlobStore blobStore = mock(S3BlobStore.class); when(blobStore.bucket()).thenReturn(bucketName); when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize); + when(blobStore.supportsConditionalWrites(any())).thenReturn(true); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); @@ -238,6 +239,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException { final S3BlobStore blobStore = mock(S3BlobStore.class); when(blobStore.bucket()).thenReturn(bucketName); when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + when(blobStore.supportsConditionalWrites(any())).thenReturn(true); final S3BlobStore sourceBlobStore = mock(S3BlobStore.class); when(sourceBlobStore.bucket()).thenReturn(sourceBucketName); diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java index 130c094050bc4..7c4049eff20af 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java @@ -112,7 +112,14 @@ protected Settings repositorySettings() { .put("delete_objects_max_size", between(1, 1000)) .put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones .put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5)) - .put(randomFrom(Settings.EMPTY, Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build())) + // verify we always set the x-purpose header even if disabled for other repository operations + .put(randomBooleanSetting("add_purpose_custom_query_parameter")) + // this parameter is ignored for repo analysis + .put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes")) .build(); } + + private Settings randomBooleanSetting(String settingKey) { + return randomFrom(Settings.EMPTY, Settings.builder().put(settingKey, randomBoolean()).build()); + } }