Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@

public abstract class AbstractRepositoryS3RestTestCase extends ESRestTestCase {

public record TestRepository(String repositoryName, String clientName, String bucketName, String basePath) {

public Closeable register() throws IOException {
return register(UnaryOperator.identity());
}

public record TestRepository(
String repositoryName,
String clientName,
String bucketName,
String basePath,
Settings extraRepositorySettings
) {
public Closeable register(UnaryOperator<Settings> settingsUnaryOperator) throws IOException {
assertOK(client().performRequest(getRegisterRequest(settingsUnaryOperator)));
return () -> assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repositoryName())));
Expand All @@ -65,6 +66,7 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()
)
)
.put(extraRepositorySettings)
.build()
)
)
Expand All @@ -73,6 +75,10 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
}
}

protected Settings extraRepositorySettings() {
return Settings.EMPTY;
}

protected abstract String getBucketName();

protected abstract String getBasePath();
Expand All @@ -84,7 +90,7 @@ protected static String getIdentifierPrefix(String testSuiteName) {
}

private TestRepository newTestRepository() {
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath());
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
}

private static UnaryOperator<Settings> readonlyOperator(Boolean readonly) {
Expand Down Expand Up @@ -152,7 +158,8 @@ private void testNonexistentBucket(Boolean readonly) throws Exception {
randomIdentifier(),
getClientName(),
randomValueOtherThan(getBucketName(), ESTestCase::randomIdentifier),
getBasePath()
getBasePath(),
extraRepositorySettings()
);
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));

Expand Down Expand Up @@ -180,7 +187,8 @@ private void testNonexistentClient(Boolean readonly) throws Exception {
randomIdentifier(),
randomValueOtherThanMany(c -> c.equals(getClientName()) || c.equals("default"), ESTestCase::randomIdentifier),
getBucketName(),
getBasePath()
getBasePath(),
extraRepositorySettings()
);
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));

Expand Down Expand Up @@ -267,7 +275,7 @@ private void testUsageStats(Boolean readonly) throws Exception {

public void testSnapshotAndRestore() throws Exception {
final var repository = newTestRepository();
try (var ignored = repository.register()) {
try (var ignored = repository.register(UnaryOperator.identity())) {
final var repositoryName = repository.repositoryName();
final var indexName = randomIdentifier();
final var snapshotsToDelete = new ArrayList<String>(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.s3;

import fixture.aws.DynamicRegionSupplier;
import fixture.s3.S3HttpFixture;
import fixture.s3.S3HttpHandler;

import com.carrotsearch.randomizedtesting.annotations.SuppressForbidden;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.LogType;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;

@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482
@SuppressForbidden("HttpExchange and Headers are ok here")
public class RepositoryS3ConditionalWritesUnsupportedRestIT extends AbstractRepositoryS3RestTestCase {

private static final String PREFIX = getIdentifierPrefix("RepositoryS3BasicCredentialsRestIT");
private static final String BUCKET = PREFIX + "bucket";
private static final String BASE_PATH = PREFIX + "base_path";
private static final String ACCESS_KEY = PREFIX + "access-key";
private static final String SECRET_KEY = PREFIX + "secret-key";
private static final String CLIENT = "no_conditional_writes_client";

private static final Supplier<String> regionSupplier = new DynamicRegionSupplier();

private static final S3HttpFixture s3Fixture = new S3HttpFixture(
true,
BUCKET,
BASE_PATH,
fixedAccessKey(ACCESS_KEY, regionSupplier, "s3")
) {
@Override
@SuppressForbidden("HttpExchange and Headers are ok here")
protected HttpHandler createHandler() {
return new AssertNoConditionalWritesHandler(asInstanceOf(S3HttpHandler.class, super.createHandler()));
}
};

@SuppressForbidden("HttpExchange and Headers are ok here")
private static class AssertNoConditionalWritesHandler implements HttpHandler {

private final S3HttpHandler delegateHandler;

private AssertNoConditionalWritesHandler(S3HttpHandler delegateHandler) {
this.delegateHandler = delegateHandler;
}

@Override
public void handle(HttpExchange exchange) throws IOException {
if (exchange.getRequestHeaders().containsKey("if-match") || exchange.getRequestHeaders().containsKey("if-none-match")) {
final var exception = new AssertionError(
Strings.format(
"unsupported conditional write: [%s] with headers [%s]",
delegateHandler.parseRequest(exchange),
exchange.getRequestHeaders()
)
);
ExceptionsHelper.maybeDieOnAnotherThread(exception);
throw exception;
}
delegateHandler.handle(exchange);
}
}

@Override
protected Settings extraRepositorySettings() {
return Settings.builder()
.put(super.extraRepositorySettings())
.put(S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getKey(), true)
.build();
}

public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("repository-s3")
.systemProperty("aws.region", regionSupplier)
.keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY)
.keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY)
.setting("s3.client." + CLIENT + ".endpoint", s3Fixture::getAddress)
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster);

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected String getBucketName() {
return BUCKET;
}

@Override
protected String getBasePath() {
return BASE_PATH;
}

@Override
protected String getClientName() {
return CLIENT;
}

public void testWarningLog() throws IOException {
final var repoName = randomIdentifier();
final var testRepository = new TestRepository(repoName, getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
try (var ignored = testRepository.register(UnaryOperator.identity()); var logStream = cluster.getNodeLog(0, LogType.SERVER)) {
assertThat(
Streams.readAllLines(logStream),
hasItem(
allOf(
containsString("WARN"),
containsString(repoName),
containsString("""
is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve this \
warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the \
[unsafely_incompatible_with_s3_conditional_writes] repository setting"""),
containsString(ReferenceDocs.S3_COMPATIBLE_REPOSITORIES.toString())
)
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ void executeSingleUpload(
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (failIfAlreadyExists) {
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
putRequestBuilder.ifNoneMatch("*");
}
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
Expand Down Expand Up @@ -642,7 +642,7 @@ private void executeMultipart(
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));

if (failIfAlreadyExists) {
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class S3BlobStore implements BlobStore {

private final StorageClass storageClass;

private final boolean supportsConditionalWrites;

private final RepositoryMetadata repositoryMetadata;

private final ThreadPool threadPool;
Expand All @@ -118,6 +120,7 @@ class S3BlobStore implements BlobStore {
ByteSizeValue maxCopySizeBeforeMultipart,
String cannedACL,
String storageClass,
boolean supportConditionalWrites,
RepositoryMetadata repositoryMetadata,
BigArrays bigArrays,
ThreadPool threadPool,
Expand All @@ -133,6 +136,7 @@ class S3BlobStore implements BlobStore {
this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.supportsConditionalWrites = supportConditionalWrites;
this.repositoryMetadata = repositoryMetadata;
this.threadPool = threadPool;
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
Expand Down Expand Up @@ -623,4 +627,13 @@ public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrid
}
}

/**
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
* {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting.
*/
public boolean supportsConditionalWrites(OperationPurpose purpose) {
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including conditional write support
return supportsConditionalWrites || purpose == OperationPurpose.REPOSITORY_ANALYSIS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.Dynamic
);

static final Setting<Boolean> UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES = Setting.boolSetting(
"unsafely_incompatible_with_s3_conditional_writes",
false
);

private final S3Service service;

private final String bucket;
Expand All @@ -271,6 +276,13 @@ class S3Repository extends MeteredBlobStoreRepository {
*/
private final TimeValue coolDown;

/**
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
* {@link #UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting.
*/
private final boolean supportsConditionalWrites;

private final Executor snapshotExecutor;

private final S3RepositoriesMetrics s3RepositoriesMetrics;
Expand Down Expand Up @@ -347,6 +359,19 @@ class S3Repository extends MeteredBlobStoreRepository {
}

coolDown = COOLDOWN_PERIOD.get(metadata.settings());
supportsConditionalWrites = UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.get(metadata.settings()) == Boolean.FALSE;

if (supportsConditionalWrites == false) {
logger.warn(
"""
repository [{}] is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve \
this warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the [{}] \
repository setting; for more information, see [{}]""",
metadata.name(),
UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getKey(),
ReferenceDocs.S3_COMPATIBLE_REPOSITORIES
);
}

logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], "
Expand Down Expand Up @@ -485,6 +510,7 @@ protected S3BlobStore createBlobStore() {
maxCopySizeBeforeMultipart,
cannedACL,
storageClass,
supportsConditionalWrites,
metadata,
bigArrays,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ protected BlobContainer createBlobContainer(
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY),
repositoryMetadata,
BigArrays.NON_RECYCLING_INSTANCE,
new DeterministicTaskQueue().getThreadPool(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void testExecuteSingleUpload() throws IOException {
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize);
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

Expand Down Expand Up @@ -238,6 +239,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);

final S3BlobStore sourceBlobStore = mock(S3BlobStore.class);
when(sourceBlobStore.bucket()).thenReturn(sourceBucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ protected Settings repositorySettings() {
.put("delete_objects_max_size", between(1, 1000))
.put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones
.put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5))
.put(randomFrom(Settings.EMPTY, Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()))
// verify we always set the x-purpose header even if disabled for other repository operations
.put(randomBooleanSetting("add_purpose_custom_query_parameter"))
// this parameter is ignored for repo analysis
.put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes"))
.build();
}

private Settings randomBooleanSetting(String settingKey) {
return randomFrom(Settings.EMPTY, Settings.builder().put(settingKey, randomBoolean()).build());
}
}