Skip to content

Commit 6f0437a

Browse files
Move software.amazon.awssdk:retries from runtimeOnly to implementation
Upgrade code in S3BlobStoreRepositoryTests.java and S3RepositoryThirdPartyTests.java to compile Move the S3Service from RetryCondition/Policy to RetryStrategy: the former are deprecated
1 parent c308493 commit 6f0437a

File tree

6 files changed

+70
-65
lines changed

6 files changed

+70
-65
lines changed

modules/repository-s3/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
implementation "software.amazon.awssdk:identity-spi:${versions.awsv2sdk}"
3030
implementation "software.amazon.awssdk:metrics-spi:${versions.awsv2sdk}"
3131
implementation "software.amazon.awssdk:regions:${versions.awsv2sdk}"
32+
implementation "software.amazon.awssdk:retries:${versions.awsv2sdk}"
3233
implementation "software.amazon.awssdk:s3:${versions.awsv2sdk}"
3334
implementation "software.amazon.awssdk:sdk-core:${versions.awsv2sdk}"
3435
implementation "software.amazon.awssdk:services:${versions.awsv2sdk}"
@@ -56,7 +57,6 @@ dependencies {
5657
runtimeOnly "software.amazon.awssdk:json-utils:${versions.awsv2sdk}"
5758
runtimeOnly "software.amazon.awssdk:profiles:${versions.awsv2sdk}"
5859
runtimeOnly "software.amazon.awssdk:protocol-core:${versions.awsv2sdk}"
59-
runtimeOnly "software.amazon.awssdk:retries:${versions.awsv2sdk}"
6060
runtimeOnly "software.amazon.awssdk:retries-spi:${versions.awsv2sdk}"
6161
runtimeOnly "software.amazon.awssdk:third-party-jackson-core:${versions.awsv2sdk}"
6262

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
package org.elasticsearch.repositories.s3;
1010

1111
import fixture.s3.S3HttpHandler;
12+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
13+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
14+
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
15+
import software.amazon.awssdk.services.s3.model.MultipartUpload;
1216

13-
import com.amazonaws.http.AmazonHttpClient;
14-
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
15-
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
16-
import com.amazonaws.services.s3.model.MultipartUpload;
1717
import com.sun.net.httpserver.Headers;
1818
import com.sun.net.httpserver.HttpExchange;
1919
import com.sun.net.httpserver.HttpHandler;
@@ -285,7 +285,7 @@ public void testMetrics() throws Exception {
285285
final BlobStore blobStore = blobStoreRepository.blobStore();
286286
final BlobStore delegateBlobStore = ((BlobStoreWrapper) blobStore).delegate();
287287
final S3BlobStore s3BlobStore = (S3BlobStore) delegateBlobStore;
288-
final Map<S3BlobStore.StatsKey, S3BlobStore.IgnoreNoResponseMetricsCollector> statsCollectors = s3BlobStore
288+
final Map<S3BlobStore.StatsKey, S3BlobStore.IgnoreNoResponseMetricsPublisher> statsCollectors = s3BlobStore
289289
.getStatsCollectors().collectors;
290290

291291
final var plugins = internalCluster().getInstance(PluginsService.class, nodeName)
@@ -512,30 +512,36 @@ public void testMultipartUploadCleanup() {
512512

513513
try (var clientRef = blobStore.clientReference()) {
514514
final var danglingBlobName = randomIdentifier();
515-
final var initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
516-
blobStore.bucket(),
517-
blobStore.blobContainer(repository.basePath().add("test-multipart-upload")).path().buildAsString() + danglingBlobName
518-
);
519-
initiateMultipartUploadRequest.putCustomQueryParameter(
520-
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
521-
OperationPurpose.SNAPSHOT_DATA.getKey()
522-
);
523-
final var multipartUploadResult = clientRef.client().initiateMultipartUpload(initiateMultipartUploadRequest);
524-
525-
final var listMultipartUploadsRequest = new ListMultipartUploadsRequest(blobStore.bucket()).withPrefix(
526-
repository.basePath().buildAsString()
527-
);
528-
listMultipartUploadsRequest.putCustomQueryParameter(
529-
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
530-
OperationPurpose.SNAPSHOT_DATA.getKey()
531-
);
515+
final var initiateMultipartUploadRequest = CreateMultipartUploadRequest.builder()
516+
.bucket(blobStore.bucket())
517+
.key(blobStore.blobContainer(repository.basePath().add("test-multipart-upload")).path().buildAsString() + danglingBlobName)
518+
.overrideConfiguration(
519+
// NOMERGE: check this conversion makes sense.
520+
AwsRequestOverrideConfiguration.builder()
521+
.putRawQueryParameter(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE, OperationPurpose.SNAPSHOT_DATA.getKey())
522+
.build()
523+
)
524+
.build();
525+
526+
final var multipartUploadResult = clientRef.client().createMultipartUpload(initiateMultipartUploadRequest);
527+
528+
final var listMultipartUploadsRequest = ListMultipartUploadsRequest.builder()
529+
.bucket(blobStore.bucket())
530+
.prefix(repository.basePath().buildAsString())
531+
.overrideConfiguration(
532+
// NOMERGE: check this conversion makes sense.
533+
AwsRequestOverrideConfiguration.builder()
534+
.putRawQueryParameter(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE, OperationPurpose.SNAPSHOT_DATA.getKey())
535+
.build()
536+
)
537+
.build();
532538
assertEquals(
533-
List.of(multipartUploadResult.getUploadId()),
539+
List.of(multipartUploadResult.uploadId()),
534540
clientRef.client()
535541
.listMultipartUploads(listMultipartUploadsRequest)
536-
.getMultipartUploads()
542+
.uploads()
537543
.stream()
538-
.map(MultipartUpload::getUploadId)
544+
.map(MultipartUpload::uploadId)
539545
.toList()
540546
);
541547

@@ -557,7 +563,7 @@ public void testMultipartUploadCleanup() {
557563
Level.INFO,
558564
Strings.format(
559565
"cleaned up dangling multipart upload [%s] of blob [%s]*test-multipart-upload/%s]",
560-
multipartUploadResult.getUploadId(),
566+
multipartUploadResult.uploadId(),
561567
repoName,
562568
danglingBlobName
563569
)
@@ -575,9 +581,9 @@ public void match(LogEvent event) {
575581
assertThat(
576582
clientRef.client()
577583
.listMultipartUploads(listMultipartUploadsRequest)
578-
.getMultipartUploads()
584+
.uploads()
579585
.stream()
580-
.map(MultipartUpload::getUploadId)
586+
.map(MultipartUpload::uploadId)
581587
.toList(),
582588
empty()
583589
);
@@ -676,7 +682,8 @@ protected static class S3ErroneousHttpHandler extends ErroneousHttpHandler {
676682
@Override
677683
protected String requestUniqueId(final HttpExchange exchange) {
678684
// Amazon SDK client provides a unique ID per request
679-
return exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID);
685+
// TODO NOMERGE: make "x-amz-request-id" into a constant someplace.
686+
return exchange.getRequestHeaders().getFirst("x-amz-request-id");
680687
}
681688
}
682689

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88
*/
99
package org.elasticsearch.repositories.s3;
1010

11-
import com.amazonaws.services.s3.model.AmazonS3Exception;
12-
import com.amazonaws.services.s3.model.GetObjectRequest;
13-
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
14-
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
15-
import com.amazonaws.services.s3.model.MultipartUpload;
11+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
12+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
13+
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
14+
import software.amazon.awssdk.services.s3.model.MultipartUpload;
15+
import software.amazon.awssdk.services.s3.model.S3Exception;
16+
1617
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
1718
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
1819

@@ -176,8 +177,9 @@ BytesReference readRegister() {
176177
}
177178

178179
List<MultipartUpload> listMultipartUploads() {
179-
return client.listMultipartUploads(new ListMultipartUploadsRequest(bucketName).withPrefix(registerBlobPath))
180-
.getMultipartUploads();
180+
return client.listMultipartUploads(
181+
ListMultipartUploadsRequest.builder().bucket(bucketName).prefix(registerBlobPath).build()
182+
).uploads();
181183
}
182184
}
183185

@@ -191,22 +193,19 @@ List<MultipartUpload> listMultipartUploads() {
191193
assertEquals(bytes1, testHarness.readRegister());
192194
assertArrayEquals(
193195
bytes1.array(),
194-
client.getObject(new GetObjectRequest(bucketName, registerBlobPath)).getObjectContent().readAllBytes()
196+
client.getObject(GetObjectRequest.builder().bucket(bucketName).key(registerBlobPath).build()).readAllBytes()
195197
);
196198

197199
// a fresh ongoing upload blocks other CAS attempts
198-
client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, registerBlobPath));
200+
client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucketName).key(registerBlobPath).build());
199201
assertThat(testHarness.listMultipartUploads(), hasSize(1));
200202

201203
assertFalse(testHarness.tryCompareAndSet(bytes1, bytes2));
202204
final var multipartUploads = testHarness.listMultipartUploads();
203205
assertThat(multipartUploads, hasSize(1));
204206

205207
// repo clock may not be exactly aligned with ours, but it should be close
206-
final var age = blobStore.getThreadPool().absoluteTimeInMillis() - multipartUploads.get(0)
207-
.getInitiated()
208-
.toInstant()
209-
.toEpochMilli();
208+
final var age = blobStore.getThreadPool().absoluteTimeInMillis() - multipartUploads.get(0).initiated().toEpochMilli();
210209
final var ageRangeMillis = TimeValue.timeValueMinutes(1).millis();
211210
assertThat(age, allOf(greaterThanOrEqualTo(-ageRangeMillis), lessThanOrEqualTo(ageRangeMillis)));
212211

@@ -225,7 +224,9 @@ List<MultipartUpload> listMultipartUploads() {
225224

226225
public void testReadFromPositionLargerThanBlobLength() {
227226
testReadFromPositionLargerThanBlobLength(
228-
e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
227+
e -> Integer.parseInt(
228+
asInstanceOf(S3Exception.class, e.getCause()).awsErrorDetails().errorCode()
229+
) == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
229230
);
230231
}
231232
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import software.amazon.awssdk.awscore.exception.AwsServiceException;
2020
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
2121
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
22-
import software.amazon.awssdk.core.retry.RetryPolicy;
23-
import software.amazon.awssdk.core.retry.conditions.RetryCondition;
2422
import software.amazon.awssdk.http.SdkHttpClient;
2523
import software.amazon.awssdk.http.apache.ApacheHttpClient;
2624
import software.amazon.awssdk.http.apache.ProxyConfiguration;
@@ -246,28 +244,25 @@ static SdkHttpClient buildHttpClient(S3ClientSettings clientSettings) {
246244
return httpClientBuilder.build();
247245
}
248246

249-
static final RetryCondition RETRYABLE_403_RETRY_POLICY = (retryPolicyContext) -> {
250-
if (RetryCondition.defaultRetryCondition().shouldRetry(retryPolicyContext)) {
251-
return true;
252-
}
253-
if (retryPolicyContext.exception() instanceof AwsServiceException ase) {
247+
static boolean RETRYABLE_403_RETRY_PREDICATE(Throwable e) {
248+
if (e instanceof AwsServiceException ase) {
254249
return ase.statusCode() == RestStatus.FORBIDDEN.getStatus() && "InvalidAccessKeyId".equals(ase.awsErrorDetails().errorCode());
255250
}
256251
return false;
257-
};
252+
}
258253

259254
static ClientOverrideConfiguration buildConfiguration(S3ClientSettings clientSettings, boolean isStateless) {
260255
ClientOverrideConfiguration.Builder clientOverrideConfiguration = ClientOverrideConfiguration.builder();
261256

262-
// TODO: revisit this, does it still make sense to specially retry?
263-
RetryPolicy.Builder retryPolicy = RetryPolicy.builder();
264-
retryPolicy.numRetries(clientSettings.maxRetries);
265-
if (isStateless) {
266-
// Create a 403 error retyable policy.
267-
retryPolicy.retryCondition(RETRYABLE_403_RETRY_POLICY);
268-
}
257+
clientOverrideConfiguration.retryStrategy(builder -> {
258+
builder.maxAttempts(clientSettings.maxRetries);
259+
// TODO NOMERGE: revisit this, does it still make sense to specially retry?
260+
if (isStateless) {
261+
// Create a 403 error retyable policy.
262+
builder.retryOnException(S3Service::RETRYABLE_403_RETRY_PREDICATE);
269263

270-
clientOverrideConfiguration.retryPolicy(retryPolicy.build());
264+
}
265+
});
271266
clientOverrideConfiguration.putAdvancedOption(SdkAdvancedClientOption.SIGNER, clientSettings.signerOverride.signerFactory.get());
272267
return clientOverrideConfiguration.build();
273268
}
@@ -312,6 +307,7 @@ static AwsCredentialsProvider buildCredentials(
312307
.addCredentialsProvider(new ErrorLoggingCredentialsProvider(webIdentityTokenCredentialsProvider, LOGGER))
313308
// TODO NOMERGE: revisit whether this conversion makes sense
314309
// Consider using DefaultCredentialsProvider rather than these two particular providers.
310+
// .addCredentialsProvider(new ErrorLoggingCredentialsProvider(DefaultCredentialsProvider.create(), LOGGER))
315311
.addCredentialsProvider(new ErrorLoggingCredentialsProvider(ContainerCredentialsProvider.create(), LOGGER))
316312
.addCredentialsProvider(new ErrorLoggingCredentialsProvider(InstanceProfileCredentialsProvider.create(), LOGGER))
317313
.build()

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/CustomWebIdentityTokenCredentialsProviderTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ public void testSupportRegionalizedEndpoints() throws Exception {
247247
// endpoint in a unit test. The client depends on hardcoded RegionalEndpointsOptionResolver that in turn depends
248248
// on the system environment that we can't change in the test. So we just verify we that we called `withRegion`
249249
// on stsClientBuilder which should internally correctly configure the endpoint when the STS client is built.
250-
assertEquals("us-west-2", webIdentityTokenCredentialsProvider.getSecurityTokenServiceRegion());
250+
// TODO NOMERGE: can't access region anymore, need to rethink this.
251+
// assertEquals("us-west-2", webIdentityTokenCredentialsProvider.getStsRegion());
251252

252253
webIdentityTokenCredentialsProvider.close();
253254
}

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public void testRetryOn403RetryPolicy() {
6262

6363
// The retryable 403 condition retries on 403 invalid access key id
6464
assertTrue(
65-
S3Service.RETRYABLE_403_RETRY_POLICY.shouldRetry(
66-
RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(s3Exception).build()
65+
S3Service.RETRYABLE_403_RETRY_PREDICATE(
66+
RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(s3Exception).build().exception()
6767
)
6868
);
6969

@@ -74,15 +74,15 @@ public void testRetryOn403RetryPolicy() {
7474
var retryPolicyContext = RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(non403Exception).build();
7575
// Retryable 403 condition delegates to the AWS default retry condition. Its result must be consistent with the decision
7676
// by the AWS default, e.g. some error status like 429 is retryable by default, the retryable 403 condition respects it.
77-
boolean actual = S3Service.RETRYABLE_403_RETRY_POLICY.shouldRetry(retryPolicyContext);
77+
boolean actual = S3Service.RETRYABLE_403_RETRY_PREDICATE(retryPolicyContext.exception());
7878
boolean expected = RetryCondition.defaultRetryCondition().shouldRetry(retryPolicyContext);
7979
assertThat(actual, equalTo(expected));
8080
} else {
8181
// Not retry for 403 with error code that is not invalid access key id
8282
String errorCode = randomAlphaOfLength(10);
8383
var exception = S3Exception.builder().awsErrorDetails(AwsErrorDetails.builder().errorCode(errorCode).build()).build();
8484
var retryPolicyContext = RetryPolicyContext.builder().retriesAttempted(between(0, 9)).exception(exception).build();
85-
assertFalse(S3Service.RETRYABLE_403_RETRY_POLICY.shouldRetry(retryPolicyContext));
85+
assertFalse(S3Service.RETRYABLE_403_RETRY_PREDICATE(retryPolicyContext.exception()));
8686
}
8787
}
8888
}

0 commit comments

Comments
 (0)