Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138072.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138072
summary: Add a new setting for s3 API call timeout
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.repositories.s3;

import fixture.s3.S3HttpHandler;
import software.amazon.awssdk.core.exception.ApiCallTimeoutException;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.containsString;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class S3BlobStoreRepositoryTimeoutTests extends ESMockAPIBasedRepositoryIntegTestCase {

private S3StallingHttpHandler s3StallingHttpHandler;

@Override
public void setUp() throws Exception {
super.setUp();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(S3RepositoryPlugin.class);
}

@Override
protected String repositoryType() {
return S3Repository.TYPE;
}

@Override
protected Settings repositorySettings(String repoName) {
Settings.Builder settingsBuilder = Settings.builder()
.put(super.repositorySettings(repoName))
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
.put(S3Repository.CLIENT_NAME.getKey(), "test");
if (randomBoolean()) {
settingsBuilder.put(S3Repository.BASE_PATH_SETTING.getKey(), randomFrom("test", "test/1"));
}
return settingsBuilder.build();
}

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
this.s3StallingHttpHandler = new S3StallingHttpHandler("bucket");
return Collections.singletonMap("/bucket", this.s3StallingHttpHandler);
}

@Override
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return delegate;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "test_access_key");
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "test_secret_key");

final Settings.Builder builder = Settings.builder()
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
.put(S3ClientSettings.READ_TIMEOUT_SETTING.getConcreteSettingForNamespace("test").getKey(), "1s")
.put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), "0")
.put(S3ClientSettings.API_CALL_TIMEOUT_SETTING.getConcreteSettingForNamespace("test").getKey(), "5s")
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.setSecureSettings(secureSettings);

return builder.build();
}

public void testWriteTimeout() {
final String repository = createRepository(randomIdentifier());

final var blobStoreRepository = (BlobStoreRepository) internalCluster().getDataNodeInstance(RepositoriesService.class)
.repository(ProjectId.DEFAULT, repository);
final var blobContainer = blobStoreRepository.blobStore().blobContainer(BlobPath.EMPTY.add(randomIdentifier()));

final var latch = new CountDownLatch(1);
s3StallingHttpHandler.setStallLatchRef(latch);
try {
blobContainer.writeBlob(
randomFrom(OperationPurpose.values()),
"index-" + randomIdentifier(),
new BytesArray(randomBytes((int) ByteSizeValue.ofMb(10).getBytes())),
randomBoolean()
);
fail("should have timed out");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use expectThrows() here?

} catch (IOException e) {
final var cause = ExceptionsHelper.unwrap(e, ApiCallTimeoutException.class);
assertNotNull(cause);
assertThat(cause.getMessage(), containsString("Client execution did not complete before the specified timeout configuration"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we assert something about how these outcomes are captured in the metrics? Should we add something specific to S3RepositoriesMetrics to track this case separately from other exceptions?

} finally {
latch.countDown();
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint")
protected class S3StallingHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler {

private final AtomicReference<CountDownLatch> stallLatchRef = new AtomicReference<>(null);

S3StallingHttpHandler(final String bucket) {
super(bucket);
}

@Override
public void handle(final HttpExchange exchange) throws IOException {
final var latch = stallLatchRef.get();
if (latch != null) {
final String headerDecodedContentLength = exchange.getRequestHeaders().getFirst("x-amz-decoded-content-length");
logger.info(
"--> Simulating server unresponsiveness for request [{} {}] with decoded content length [{}]",
exchange.getRequestMethod(),
exchange.getRequestURI(),
headerDecodedContentLength
);
try {
final var released = latch.await(60, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use safeAwait()? At least, I'd like us to be asserting that the latch is released in a reasonably timely manner.

logger.info("--> Latch released: {}", released);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.info("--> Done simulating server unresponsiveness");
}
super.handle(exchange);
}

void setStallLatchRef(CountDownLatch latch) {
stallLatchRef.set(latch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ final class S3ClientSettings {
key -> Setting.intSetting(key, Defaults.RETRY_COUNT, 0, Property.NodeScope)
);

/**
* The maximum time for a single attempt of an API operation. See also
* <a href="https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/timeouts.html">AWS SDK docs on timeout</a>
*/
static final Setting.AffixSetting<TimeValue> API_CALL_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"api_call_timeout",
key -> Setting.timeSetting(key, Defaults.API_CALL_TIMEOUT, Property.NodeScope)
);

/** Formerly whether retries should be throttled (ie use backoff), now unused. V2 AWS SDK always uses throttling. */
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION) // no longer used, should be removed in v10
static final Setting.AffixSetting<Boolean> UNUSED_USE_THROTTLE_RETRIES_SETTING = Setting.affixKeySetting(
Expand Down Expand Up @@ -232,6 +242,11 @@ final class S3ClientSettings {
/** The number of retries to use for the s3 client. */
final int maxRetries;

/**
* The maximum time for a single attempt of an API operation
*/
final int apiCallTimeoutMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep this as a TimeValue until it's actually used?


/** Whether the s3 client should use path style access. */
final boolean pathStyleAccess;

Expand All @@ -257,6 +272,7 @@ private S3ClientSettings(
long connectionMaxIdleTimeMillis,
int maxConnections,
int maxRetries,
int apiCallTimeoutMillis,
boolean pathStyleAccess,
boolean disableChunkedEncoding,
boolean addPurposeCustomQueryParameter,
Expand All @@ -274,6 +290,7 @@ private S3ClientSettings(
this.connectionMaxIdleTimeMillis = connectionMaxIdleTimeMillis;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
this.apiCallTimeoutMillis = apiCallTimeoutMillis;
this.pathStyleAccess = pathStyleAccess;
this.disableChunkedEncoding = disableChunkedEncoding;
this.addPurposeCustomQueryParameter = addPurposeCustomQueryParameter;
Expand Down Expand Up @@ -303,6 +320,9 @@ S3ClientSettings refine(Settings repositorySettings) {
);
final int newMaxConnections = getRepoSettingOrDefault(MAX_CONNECTIONS_SETTING, normalizedSettings, maxConnections);
final int newMaxRetries = getRepoSettingOrDefault(MAX_RETRIES_SETTING, normalizedSettings, maxRetries);
final int newApiCallTimeoutMillis = Math.toIntExact(
getRepoSettingOrDefault(API_CALL_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(apiCallTimeoutMillis)).millis()
);
final boolean newPathStyleAccess = getRepoSettingOrDefault(USE_PATH_STYLE_ACCESS, normalizedSettings, pathStyleAccess);
final boolean newDisableChunkedEncoding = getRepoSettingOrDefault(
DISABLE_CHUNKED_ENCODING,
Expand Down Expand Up @@ -355,6 +375,7 @@ S3ClientSettings refine(Settings repositorySettings) {
newConnectionMaxIdleTimeMillis,
newMaxConnections,
newMaxRetries,
newApiCallTimeoutMillis,
newPathStyleAccess,
newDisableChunkedEncoding,
newAddPurposeCustomQueryParameter,
Expand Down Expand Up @@ -464,6 +485,7 @@ static S3ClientSettings getClientSettings(final Settings settings, final String
getConfigValue(settings, clientName, CONNECTION_MAX_IDLE_TIME_SETTING).millis(),
getConfigValue(settings, clientName, MAX_CONNECTIONS_SETTING),
getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
Math.toIntExact(getConfigValue(settings, clientName, API_CALL_TIMEOUT_SETTING).millis()),
getConfigValue(settings, clientName, USE_PATH_STYLE_ACCESS),
getConfigValue(settings, clientName, DISABLE_CHUNKED_ENCODING),
getConfigValue(settings, clientName, ADD_PURPOSE_CUSTOM_QUERY_PARAMETER),
Expand All @@ -486,6 +508,7 @@ public boolean equals(final Object o) {
&& Objects.equals(connectionMaxIdleTimeMillis, that.connectionMaxIdleTimeMillis)
&& maxConnections == that.maxConnections
&& maxRetries == that.maxRetries
&& apiCallTimeoutMillis == that.apiCallTimeoutMillis
&& Objects.equals(credentials, that.credentials)
&& Objects.equals(protocol, that.protocol)
&& Objects.equals(endpoint, that.endpoint)
Expand All @@ -512,6 +535,7 @@ public int hashCode() {
readTimeoutMillis,
connectionMaxIdleTimeMillis,
maxRetries,
apiCallTimeoutMillis,
maxConnections,
disableChunkedEncoding,
addPurposeCustomQueryParameter,
Expand All @@ -536,5 +560,6 @@ static final class Defaults {
static final TimeValue CONNECTION_MAX_IDLE_TIME = TimeValue.timeValueSeconds(60);
static final int MAX_CONNECTIONS = 50;
static final int RETRY_COUNT = 3;
static final TimeValue API_CALL_TIMEOUT = TimeValue.ZERO; // default to no API call timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use TimeValue.MINUS_ONE to mean "no timeout", leaving TimeValue.ZERO to mean "time out immediately"? Or at least to defer the meaning of zero down to the SDK, I haven't followed through to determine whether zero means zero or infinity within the SDK but I'd prefer we had some way to say "don't even invoke clientOverrideConfiguration.apiCallTimeout".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out zero is invalid here:

org.elasticsearch.repositories.RepositoryVerificationException: [repository] path [base_path_integration_tests] is not accessible on master node
Caused by: java.lang.IllegalArgumentException: apiCallTimeout must be positive
        at software.amazon.awssdk.utils.Validate.isPositive(Validate.java:694) ~[?:?]
        at software.amazon.awssdk.utils.Validate.isPositiveOrNull(Validate.java:711) ~[?:?]
        at software.amazon.awssdk.core.client.config.ClientOverrideConfiguration.<init>(ClientOverrideConfiguration.java:172) ~[?:?]
        at software.amazon.awssdk.core.client.config.ClientOverrideConfiguration$DefaultBuilder.build(ClientOverrideConfiguration.java:1126) ~[?:?]
        at software.amazon.awssdk.core.client.config.ClientOverrideConfiguration$DefaultBuilder.build(ClientOverrideConfiguration.java:786) ~[?:?]
        at org.elasticsearch.repositories.s3.S3Service.buildConfiguration(S3Service.java:347) ~[?:?]
        at org.elasticsearch.repositories.s3.S3Service.buildClientBuilder(S3Service.java:200) ~[?:?]
        at org.elasticsearch.repositories.s3.S3Service.buildClient(S3Service.java:193) ~[?:?]
        at org.elasticsearch.repositories.s3.S3Service.buildClientReference(S3Service.java:178) ~[?:?]
        at org.elasticsearch.repositories.s3.S3ClientsManager$ClientsHolder.client(S3ClientsManager.java:326) ~[?:?]
        at org.elasticsearch.repositories.s3.S3ClientsManager.client(S3ClientsManager.java:192) ~[?:?]
        at org.elasticsearch.repositories.s3.S3Service.client(S3Service.java:163) ~[?:?]
        at org.elasticsearch.repositories.s3.S3BlobStore.clientReference(S3BlobStore.java:270) ~[?:?]
        at org.elasticsearch.repositories.s3.S3BlobContainer.executeSingleUpload(S3BlobContainer.java:550) ~[?:?]
        at org.elasticsearch.repositories.s3.S3BlobContainer.writeBlob(S3BlobContainer.java:148) ~[?:?]
        at org.elasticsearch.common.blobstore.BlobContainer.writeBlob(BlobContainer.java:123) ~[elasticsearch-9.3.0-SNAPSHOT.jar:?]
        at org.elasticsearch.repositories.s3.S3BlobContainer.writeBlobAtomic(S3BlobContainer.java:332) ~[?:?]
        at org.elasticsearch.repositories.blobstore.BlobStoreRepository.startVerification(BlobStoreRepository.java:2314) ~[elasticsearch-9.3.0-SNAPSHOT.jar:?]
        at org.elasticsearch.repositories.RepositoriesService.lambda$validatePutRepositoryRequest$11(RepositoriesService.java:403) ~[elasticsearch-9.3.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionRunnable$1.doRun(ActionRunnable.java:37) ~[elasticsearch-9.3.0-SNAPSHOT.jar:?]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1113) ~[elasticsearch-9.3.0-SNAPSHOT.jar:?]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) ~[elasticsearch-9.3.0-SNAPSHOT.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614) ~[?:?]
        at java.lang.Thread.run(Thread.java:1474) ~[?:?]

Still I'd prefer to use -1 to mean "missing".

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public List<Setting<?>> getSettings() {
S3ClientSettings.READ_TIMEOUT_SETTING,
S3ClientSettings.MAX_CONNECTIONS_SETTING,
S3ClientSettings.MAX_RETRIES_SETTING,
S3ClientSettings.API_CALL_TIMEOUT_SETTING,
S3ClientSettings.UNUSED_USE_THROTTLE_RETRIES_SETTING,
S3ClientSettings.USE_PATH_STYLE_ACCESS,
S3ClientSettings.UNUSED_SIGNER_OVERRIDE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ static ClientOverrideConfiguration buildConfiguration(S3ClientSettings clientSet
retryStrategyBuilder.retryOnException(S3Service::isInvalidAccessKeyIdException);
}
clientOverrideConfiguration.retryStrategy(retryStrategyBuilder.build());
clientOverrideConfiguration.apiCallTimeout(Duration.ofMillis(clientSettings.apiCallTimeoutMillis));
return clientOverrideConfiguration.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testThereIsADefaultClientByDefault() {
assertThat(defaultSettings.maxConnections, is(S3ClientSettings.Defaults.MAX_CONNECTIONS));
assertThat(defaultSettings.maxRetries, is(S3ClientSettings.Defaults.RETRY_COUNT));
assertThat(defaultSettings.connectionMaxIdleTimeMillis, is(S3ClientSettings.Defaults.CONNECTION_MAX_IDLE_TIME.millis()));
assertThat(defaultSettings.apiCallTimeoutMillis, is(0));
}

public void testDefaultClientSettingsCanBeSet() {
Expand Down