Skip to content

Commit 06cdd11

Browse files
authored
Retry on 403 for S3 put in certain environments (#115486)
This PR configures a new retry condition for s3 client so that it retries on 403 for operations such as PUT in certain environments. Note that 403 is already retried for GET due to S3RetryingInputStream. Resolves: ES-9321
1 parent 2f2ddad commit 06cdd11

File tree

5 files changed

+118
-13
lines changed

5 files changed

+118
-13
lines changed

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryMetricsTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.bytes.BytesArray;
2020
import org.elasticsearch.common.collect.Iterators;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.core.Strings;
2223
import org.elasticsearch.core.SuppressForbidden;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.plugins.PluginsService;
@@ -53,11 +54,13 @@
5354
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_TOTAL;
5455
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_UNSUCCESSFUL_OPERATIONS_TOTAL;
5556
import static org.elasticsearch.repositories.s3.S3RepositoriesMetrics.METRIC_DELETE_RETRIES_HISTOGRAM;
57+
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
5658
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
5759
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
5860
import static org.elasticsearch.rest.RestStatus.REQUESTED_RANGE_NOT_SATISFIED;
5961
import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;
6062
import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
63+
import static org.hamcrest.Matchers.containsString;
6164
import static org.hamcrest.Matchers.equalTo;
6265
import static org.hamcrest.Matchers.instanceOf;
6366
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -320,6 +323,51 @@ public void testRetrySnapshotDeleteMetricsWhenRetriesExhausted() {
320323
assertThat(longHistogramMeasurement.get(0).getLong(), equalTo(3L));
321324
}
322325

326+
public void testPutDoesNotRetryOn403InStateful() {
327+
final Settings settings = internalCluster().getInstance(Settings.class);
328+
assertThat(DiscoveryNode.isStateless(settings), equalTo(false));
329+
330+
final String repository = createRepository(randomRepositoryName());
331+
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
332+
final TestTelemetryPlugin plugin = getPlugin(dataNodeName);
333+
// Exclude snapshot related purpose to avoid trigger assertions for cross-checking purpose and blob names
334+
final OperationPurpose purpose = randomFrom(
335+
OperationPurpose.REPOSITORY_ANALYSIS,
336+
OperationPurpose.CLUSTER_STATE,
337+
OperationPurpose.INDICES,
338+
OperationPurpose.TRANSLOG
339+
);
340+
final BlobContainer blobContainer = getBlobContainer(dataNodeName, repository);
341+
final String blobName = randomIdentifier();
342+
343+
plugin.resetMeter();
344+
addErrorStatus(new S3ErrorResponse(FORBIDDEN, Strings.format("""
345+
<?xml version="1.0" encoding="UTF-8"?>
346+
<Error>
347+
<Code>InvalidAccessKeyId</Code>
348+
<Message>The AWS Access Key Id you provided does not exist in our records.</Message>
349+
<RequestId>%s</RequestId>
350+
</Error>""", randomUUID())));
351+
352+
final var exception = expectThrows(IOException.class, () -> {
353+
if (randomBoolean()) {
354+
blobContainer.writeBlob(purpose, blobName, new BytesArray("blob"), randomBoolean());
355+
} else {
356+
blobContainer.writeMetadataBlob(
357+
purpose,
358+
blobName,
359+
randomBoolean(),
360+
randomBoolean(),
361+
outputStream -> outputStream.write("blob".getBytes())
362+
);
363+
}
364+
});
365+
assertThat(exception.getCause().getMessage(), containsString("InvalidAccessKeyId"));
366+
367+
assertThat(getLongCounterValue(plugin, METRIC_REQUESTS_TOTAL, Operation.PUT_OBJECT), equalTo(1L));
368+
assertThat(getLongCounterValue(plugin, METRIC_EXCEPTIONS_TOTAL, Operation.PUT_OBJECT), equalTo(1L));
369+
}
370+
323371
private void addErrorStatus(RestStatus... statuses) {
324372
errorResponseQueue.addAll(Arrays.stream(statuses).map(S3ErrorResponse::new).toList());
325373
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.repositories.s3;
1111

12+
import com.amazonaws.AmazonServiceException;
1213
import com.amazonaws.ClientConfiguration;
1314
import com.amazonaws.SDKGlobalConfiguration;
1415
import com.amazonaws.auth.AWSCredentials;
@@ -20,13 +21,16 @@
2021
import com.amazonaws.auth.STSAssumeRoleWithWebIdentitySessionCredentialsProvider;
2122
import com.amazonaws.client.builder.AwsClientBuilder;
2223
import com.amazonaws.http.IdleConnectionReaper;
24+
import com.amazonaws.retry.PredefinedRetryPolicies;
25+
import com.amazonaws.retry.RetryPolicy;
2326
import com.amazonaws.services.s3.AmazonS3;
2427
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
2528
import com.amazonaws.services.s3.internal.Constants;
2629
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
2730
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
2831
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
2932

33+
import org.apache.http.HttpStatus;
3034
import org.apache.logging.log4j.LogManager;
3135
import org.apache.logging.log4j.Logger;
3236
import org.elasticsearch.ElasticsearchException;
@@ -193,7 +197,10 @@ AmazonS3 buildClient(final S3ClientSettings clientSettings) {
193197
protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettings) {
194198
final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
195199
builder.withCredentials(buildCredentials(LOGGER, clientSettings, webIdentityTokenCredentialsProvider));
196-
builder.withClientConfiguration(buildConfiguration(clientSettings));
200+
final ClientConfiguration clientConfiguration = buildConfiguration(clientSettings, isStateless);
201+
assert (isStateless == false && clientConfiguration.getRetryPolicy() == PredefinedRetryPolicies.DEFAULT)
202+
|| (isStateless && clientConfiguration.getRetryPolicy() == RETRYABLE_403_RETRY_POLICY) : "invalid retry policy configuration";
203+
builder.withClientConfiguration(clientConfiguration);
197204

198205
String endpoint = Strings.hasLength(clientSettings.endpoint) ? clientSettings.endpoint : Constants.S3_HOSTNAME;
199206
if ((endpoint.startsWith("http://") || endpoint.startsWith("https://")) == false) {
@@ -223,7 +230,7 @@ protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettin
223230
}
224231

225232
// pkg private for tests
226-
static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) {
233+
static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings, boolean isStateless) {
227234
final ClientConfiguration clientConfiguration = new ClientConfiguration();
228235
// the response metadata cache is only there for diagnostics purposes,
229236
// but can force objects from every response to the old generation.
@@ -248,6 +255,10 @@ static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) {
248255
clientConfiguration.setUseThrottleRetries(clientSettings.throttleRetries);
249256
clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis);
250257

258+
if (isStateless) {
259+
clientConfiguration.setRetryPolicy(RETRYABLE_403_RETRY_POLICY);
260+
}
261+
251262
return clientConfiguration;
252263
}
253264

@@ -504,4 +515,21 @@ interface SystemEnvironment {
504515
interface JvmEnvironment {
505516
String getProperty(String key, String defaultValue);
506517
}
518+
519+
static final RetryPolicy RETRYABLE_403_RETRY_POLICY = RetryPolicy.builder()
520+
.withRetryCondition((originalRequest, exception, retriesAttempted) -> {
521+
if (PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION.shouldRetry(originalRequest, exception, retriesAttempted)) {
522+
return true;
523+
}
524+
if (exception instanceof AmazonServiceException ase) {
525+
return ase.getStatusCode() == HttpStatus.SC_FORBIDDEN && "InvalidAccessKeyId".equals(ase.getErrorCode());
526+
}
527+
return false;
528+
})
529+
.withBackoffStrategy(PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY)
530+
.withMaxErrorRetry(PredefinedRetryPolicies.DEFAULT_MAX_ERROR_RETRY)
531+
.withHonorMaxErrorRetryInClientConfig(true)
532+
.withHonorDefaultMaxErrorRetryInRetryMode(true)
533+
.withHonorDefaultBackoffStrategyInRetryMode(true)
534+
.build();
507535
}

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AwsS3ServiceImplTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.amazonaws.auth.AWSStaticCredentialsProvider;
1818
import com.amazonaws.auth.BasicAWSCredentials;
1919
import com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper;
20+
import com.amazonaws.retry.PredefinedRetryPolicies;
2021

2122
import org.apache.logging.log4j.Logger;
2223
import org.apache.logging.log4j.util.Supplier;
@@ -211,7 +212,7 @@ private void launchAWSConfigurationTest(
211212
) {
212213

213214
final S3ClientSettings clientSettings = S3ClientSettings.getClientSettings(settings, "default");
214-
final ClientConfiguration configuration = S3Service.buildConfiguration(clientSettings);
215+
final ClientConfiguration configuration = S3Service.buildConfiguration(clientSettings, false);
215216

216217
assertThat(configuration.getResponseMetadataCacheSize(), is(0));
217218
assertThat(configuration.getProtocol(), is(expectedProtocol));
@@ -222,6 +223,7 @@ private void launchAWSConfigurationTest(
222223
assertThat(configuration.getMaxErrorRetry(), is(expectedMaxRetries));
223224
assertThat(configuration.useThrottledRetries(), is(expectedUseThrottleRetries));
224225
assertThat(configuration.getSocketTimeout(), is(expectedReadTimeout));
226+
assertThat(configuration.getRetryPolicy(), is(PredefinedRetryPolicies.DEFAULT));
225227
}
226228

227229
public void testEndpointSetting() {

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ public void testSignerOverrideCanBeSet() {
194194
);
195195
assertThat(settings.get("default").region, is(""));
196196
assertThat(settings.get("other").signerOverride, is(signerOverride));
197-
ClientConfiguration defaultConfiguration = S3Service.buildConfiguration(settings.get("default"));
197+
ClientConfiguration defaultConfiguration = S3Service.buildConfiguration(settings.get("default"), false);
198198
assertThat(defaultConfiguration.getSignerOverride(), nullValue());
199-
ClientConfiguration configuration = S3Service.buildConfiguration(settings.get("other"));
199+
ClientConfiguration configuration = S3Service.buildConfiguration(settings.get("other"), false);
200200
assertThat(configuration.getSignerOverride(), is(signerOverride));
201201
}
202202

@@ -207,12 +207,18 @@ public void testMaxConnectionsCanBeSet() {
207207
);
208208
assertThat(settings.get("default").maxConnections, is(ClientConfiguration.DEFAULT_MAX_CONNECTIONS));
209209
assertThat(settings.get("other").maxConnections, is(maxConnections));
210-
ClientConfiguration defaultConfiguration = S3Service.buildConfiguration(settings.get("default"));
210+
ClientConfiguration defaultConfiguration = S3Service.buildConfiguration(settings.get("default"), false);
211211
assertThat(defaultConfiguration.getMaxConnections(), is(ClientConfiguration.DEFAULT_MAX_CONNECTIONS));
212-
ClientConfiguration configuration = S3Service.buildConfiguration(settings.get("other"));
212+
ClientConfiguration configuration = S3Service.buildConfiguration(settings.get("other"), false);
213213
assertThat(configuration.getMaxConnections(), is(maxConnections));
214214

215215
// the default appears in the docs so let's make sure it doesn't change:
216216
assertEquals(50, ClientConfiguration.DEFAULT_MAX_CONNECTIONS);
217217
}
218+
219+
public void testStatelessDefaultRetryPolicy() {
220+
final var s3ClientSettings = S3ClientSettings.load(Settings.EMPTY).get("default");
221+
final var clientConfiguration = S3Service.buildConfiguration(s3ClientSettings, true);
222+
assertThat(clientConfiguration.getRetryPolicy(), is(S3Service.RETRYABLE_403_RETRY_POLICY));
223+
}
218224
}

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ServiceTests.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,23 @@
88
*/
99
package org.elasticsearch.repositories.s3;
1010

11+
import com.amazonaws.AmazonWebServiceRequest;
12+
import com.amazonaws.services.s3.model.AmazonS3Exception;
13+
1114
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1215
import org.elasticsearch.common.settings.Settings;
1316
import org.elasticsearch.env.Environment;
1417
import org.elasticsearch.test.ESTestCase;
1518
import org.elasticsearch.watcher.ResourceWatcherService;
16-
import org.mockito.Mockito;
1719

1820
import java.io.IOException;
1921

22+
import static org.mockito.Mockito.mock;
23+
2024
public class S3ServiceTests extends ESTestCase {
2125

2226
public void testCachedClientsAreReleased() throws IOException {
23-
final S3Service s3Service = new S3Service(
24-
Mockito.mock(Environment.class),
25-
Settings.EMPTY,
26-
Mockito.mock(ResourceWatcherService.class)
27-
);
27+
final S3Service s3Service = new S3Service(mock(Environment.class), Settings.EMPTY, mock(ResourceWatcherService.class));
2828
final Settings settings = Settings.builder().put("endpoint", "http://first").build();
2929
final RepositoryMetadata metadata1 = new RepositoryMetadata("first", "s3", settings);
3030
final RepositoryMetadata metadata2 = new RepositoryMetadata("second", "s3", settings);
@@ -41,4 +41,25 @@ public void testCachedClientsAreReleased() throws IOException {
4141
final S3ClientSettings clientSettingsReloaded = s3Service.settings(metadata1);
4242
assertNotSame(clientSettings, clientSettingsReloaded);
4343
}
44+
45+
public void testRetryOn403RetryPolicy() {
46+
final AmazonS3Exception e = new AmazonS3Exception("error");
47+
e.setStatusCode(403);
48+
e.setErrorCode("InvalidAccessKeyId");
49+
50+
// Retry on 403 invalid access key id
51+
assertTrue(
52+
S3Service.RETRYABLE_403_RETRY_POLICY.getRetryCondition().shouldRetry(mock(AmazonWebServiceRequest.class), e, between(0, 9))
53+
);
54+
55+
// Not retry if not 403 or not invalid access key id
56+
if (randomBoolean()) {
57+
e.setStatusCode(randomValueOtherThan(403, () -> between(0, 600)));
58+
} else {
59+
e.setErrorCode(randomAlphaOfLength(10));
60+
}
61+
assertFalse(
62+
S3Service.RETRYABLE_403_RETRY_POLICY.getRetryCondition().shouldRetry(mock(AmazonWebServiceRequest.class), e, between(0, 9))
63+
);
64+
}
4465
}

0 commit comments

Comments
 (0)