Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@

public abstract class AbstractRepositoryS3RestTestCase extends ESRestTestCase {

public record TestRepository(String repositoryName, String clientName, String bucketName, String basePath) {

public Closeable register() throws IOException {
return register(UnaryOperator.identity());
}

public record TestRepository(
String repositoryName,
String clientName,
String bucketName,
String basePath,
Settings extraRepositorySettings
) {
public Closeable register(UnaryOperator<Settings> settingsUnaryOperator) throws IOException {
assertOK(client().performRequest(getRegisterRequest(settingsUnaryOperator)));
return () -> assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repositoryName())));
Expand All @@ -65,6 +66,7 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()
)
)
.put(extraRepositorySettings)
.build()
)
)
Expand All @@ -73,6 +75,10 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
}
}

protected Settings extraRepositorySettings() {
return Settings.EMPTY;
}

protected abstract String getBucketName();

protected abstract String getBasePath();
Expand All @@ -84,7 +90,7 @@ protected static String getIdentifierPrefix(String testSuiteName) {
}

private TestRepository newTestRepository() {
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath());
return new TestRepository(randomIdentifier(), getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
}

private static UnaryOperator<Settings> readonlyOperator(Boolean readonly) {
Expand Down Expand Up @@ -152,7 +158,8 @@ private void testNonexistentBucket(Boolean readonly) throws Exception {
randomIdentifier(),
getClientName(),
randomValueOtherThan(getBucketName(), ESTestCase::randomIdentifier),
getBasePath()
getBasePath(),
extraRepositorySettings()
);
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));

Expand Down Expand Up @@ -180,7 +187,8 @@ private void testNonexistentClient(Boolean readonly) throws Exception {
randomIdentifier(),
randomValueOtherThanMany(c -> c.equals(getClientName()) || c.equals("default"), ESTestCase::randomIdentifier),
getBucketName(),
getBasePath()
getBasePath(),
extraRepositorySettings()
);
final var registerRequest = repository.getRegisterRequest(readonlyOperator(readonly));

Expand Down Expand Up @@ -267,7 +275,7 @@ private void testUsageStats(Boolean readonly) throws Exception {

public void testSnapshotAndRestore() throws Exception {
final var repository = newTestRepository();
try (var ignored = repository.register()) {
try (var ignored = repository.register(UnaryOperator.identity())) {
final var repositoryName = repository.repositoryName();
final var indexName = randomIdentifier();
final var snapshotsToDelete = new ArrayList<String>(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.s3;

import fixture.aws.DynamicRegionSupplier;
import fixture.s3.S3HttpFixture;
import fixture.s3.S3HttpHandler;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.LogType;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;

@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482
public class RepositoryS3ConditionalWritesUnsupportedRestIT extends AbstractRepositoryS3RestTestCase {

private static final String PREFIX = getIdentifierPrefix("RepositoryS3BasicCredentialsRestIT");
private static final String BUCKET = PREFIX + "bucket";
private static final String BASE_PATH = PREFIX + "base_path";
private static final String ACCESS_KEY = PREFIX + "access-key";
private static final String SECRET_KEY = PREFIX + "secret-key";
private static final String CLIENT = "no_conditional_writes_client";

private static final Supplier<String> regionSupplier = new DynamicRegionSupplier();
private static final S3HttpFixture s3Fixture = new S3HttpFixture(
true,
BUCKET,
BASE_PATH,
fixedAccessKey(ACCESS_KEY, regionSupplier, "s3")
) {
@Override
protected HttpHandler createHandler() {
final var delegateHandler = asInstanceOf(S3HttpHandler.class, super.createHandler());
return exchange -> {
if (exchange.getRequestHeaders().containsKey("if-match") || exchange.getRequestHeaders().containsKey("if-none-match")) {
final var exception = new AssertionError(
Strings.format(
"unsupported conditional write: [%s] with headers [%s]",
delegateHandler.parseRequest(exchange),
exchange.getRequestHeaders()
)
);
ExceptionsHelper.maybeDieOnAnotherThread(exception);
throw exception;
}
delegateHandler.handle(exchange);
};
}
};

@Override
protected Settings extraRepositorySettings() {
return Settings.builder()
.put(super.extraRepositorySettings())
.put(S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.getKey(), true)
.build();
}

public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("repository-s3")
.systemProperty("aws.region", regionSupplier)
.keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY)
.keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY)
.setting("s3.client." + CLIENT + ".endpoint", s3Fixture::getAddress)
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster);

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected String getBucketName() {
return BUCKET;
}

@Override
protected String getBasePath() {
return BASE_PATH;
}

@Override
protected String getClientName() {
return CLIENT;
}

public void testWarningLog() throws IOException {
final var repoName = randomIdentifier();
final var testRepository = new TestRepository(repoName, getClientName(), getBucketName(), getBasePath(), extraRepositorySettings());
try (var ignored = testRepository.register(UnaryOperator.identity()); var logStream = cluster.getNodeLog(0, LogType.SERVER)) {
assertThat(
Streams.readAllLines(logStream),
hasItem(
allOf(
containsString("WARN"),
containsString(repoName),
containsString("""
is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve this \
warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the \
[unsafely_incompatible_with_s3_conditional_writes] repository setting"""),
containsString(ReferenceDocs.S3_COMPATIBLE_REPOSITORIES.toString())
)
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ void executeSingleUpload(
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (failIfAlreadyExists) {
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
putRequestBuilder.ifNoneMatch("*");
}
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
Expand Down Expand Up @@ -642,7 +642,7 @@ private void executeMultipart(
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));

if (failIfAlreadyExists) {
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class S3BlobStore implements BlobStore {

private final StorageClass storageClass;

private final boolean supportsConditionalWrites;

private final RepositoryMetadata repositoryMetadata;

private final ThreadPool threadPool;
Expand All @@ -118,6 +120,7 @@ class S3BlobStore implements BlobStore {
ByteSizeValue maxCopySizeBeforeMultipart,
String cannedACL,
String storageClass,
boolean supportConditionalWrites,
RepositoryMetadata repositoryMetadata,
BigArrays bigArrays,
ThreadPool threadPool,
Expand All @@ -133,6 +136,7 @@ class S3BlobStore implements BlobStore {
this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.supportsConditionalWrites = supportConditionalWrites;
this.repositoryMetadata = repositoryMetadata;
this.threadPool = threadPool;
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
Expand Down Expand Up @@ -623,4 +627,13 @@ public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrid
}
}

/**
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
* {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES} setting.
*/
public boolean supportsConditionalWrites(OperationPurpose purpose) {
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including conditional write support
return supportsConditionalWrites || purpose == OperationPurpose.REPOSITORY_ANALYSIS;
Comment on lines +636 to +637
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB no tests for the REPOSITORY_ANALYSIS condition here, they'll be added when we resolve #134632.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean repo analysis will always fail for storage service not supporting conditional writes? I assume that's intentional even if this means existing compatible ones could start failing? IIUC, this work has something to do with AWS's MPU guarantee. Do we consider a storage passing the analysis if it supports the MPU guarantees but not the conditional writes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right, this is covered in the docs:

NOTE: Different versions of Elasticsearch may perform different checks for repository compatibility, with newer versions typically being stricter than older ones. A storage system that passes repository analysis with one version of Elasticsearch may fail with a different version. This indicates it behaves incorrectly in ways that the former version did not detect.

This doesn't yet matter for the non-linearizable MPUs but when we move to using conditional writes for CAS operations we will only do so if the storage supports conditional writes. If the user indicates that their storage doesn't support conditional writes yet then we'll fall back to today's MPU-based CAS implementation.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.Dynamic
);

static final Setting<Boolean> UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES = Setting.boolSetting(
"unsafely_incompatible_with_s3_conditional_writes",
Copy link
Contributor

@mhl-b mhl-b Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have other names for the setting in mind? I assume with conditional ETag there would be another parameter, since it's a different feature from existence check, and s3-like-storage might have one of those. Maybe supports_s3_write_existence_check, and later supports_s3_write_etag_check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be with unsafe_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't planning on distinguishing conditional-on-object-exists from conditional-on-etag writes. Do you see a need to do that? My thinking is that "conditional writes" covers both cases, and although these things were announced by AWS as separate features they're now documented together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only argument I have is that these features came at different time and use different implementation that s3-like-repos might have only one of them.

But the feature parity is blurry, and how much we should put effort into featuring these knobs. I think it's s3-like-repo responsibility provide own s3-like-client rather than using not fully compatible aws client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be worth to name this setting more precisely. If a storage does support etag but not the other, it is not obvious whether it should set this setting to true or false.

I also wonder whether it is better named more positively, e.g. supports_s3_conditional_writes, which seems more straightforward. But maybe you prefer the more negative form to also serve as a warnng?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a storage does support etag but not the other, it is not obvious

I think it's clear in that case that the storage does not properly support conditional writes so the user needs to disable them until they can upgrade their storage. I don't think it means much to the user to distinguish the different kinds of conditional writes.

I also wonder whether it is better named more positively

Yeah I'd prefer that but it's also important to get the unsafe bit into the name so that users know they need to work on their storage upgrade ASAP.

false
);

private final S3Service service;

private final String bucket;
Expand All @@ -271,6 +276,13 @@ class S3Repository extends MeteredBlobStoreRepository {
*/
private final TimeValue coolDown;

/**
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
* {@link #UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES} setting.
*/
private final boolean supportsConditionalWrites;

private final Executor snapshotExecutor;

private final S3RepositoriesMetrics s3RepositoriesMetrics;
Expand Down Expand Up @@ -347,6 +359,19 @@ class S3Repository extends MeteredBlobStoreRepository {
}

coolDown = COOLDOWN_PERIOD.get(metadata.settings());
supportsConditionalWrites = UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.get(metadata.settings()) == Boolean.FALSE;

if (supportsConditionalWrites == false) {
logger.warn(
"""
repository [{}] is configured to unsafely avoid conditional writes which may lead to repository corruption; to resolve \
this warning, upgrade your storage to a system that is fully compatible with AWS S3 and then remove the [{}] \
repository setting; for more information, see [{}]""",
metadata.name(),
UNSAFELY_INCOMPATIBLE_WITH_S3_WRITES.getKey(),
ReferenceDocs.S3_COMPATIBLE_REPOSITORIES
);
}

logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], "
Expand Down Expand Up @@ -485,6 +510,7 @@ protected S3BlobStore createBlobStore() {
maxCopySizeBeforeMultipart,
cannedACL,
storageClass,
supportsConditionalWrites,
metadata,
bigArrays,
threadPool,
Expand Down