From 33195c946424397109fe828329a39a0e023e9708 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 15 Oct 2024 11:13:52 +0100 Subject: [PATCH 1/3] Retry `S3BlobContainer#getRegister` on all exceptions S3 register reads are subject to the regular client retry policy, but in practice we see failures of these reads sometimes for errors that are transient but for which the SDK does not retry. This commit adds another layer of retries to these reads. Relates ES-9721 --- .../snapshot-restore/repository-s3.asciidoc | 5 ++ .../repositories/s3/S3BlobContainer.java | 51 +++++++++---- .../repositories/s3/S3BlobStore.java | 8 +- .../repositories/s3/S3Repository.java | 10 +++ .../s3/S3BlobContainerRetriesTests.java | 76 ++++++++++++++++++- 5 files changed, 134 insertions(+), 16 deletions(-) diff --git a/docs/reference/snapshot-restore/repository-s3.asciidoc b/docs/reference/snapshot-restore/repository-s3.asciidoc index 71a9fd8b87c96..658c0fb293a2d 100644 --- a/docs/reference/snapshot-restore/repository-s3.asciidoc +++ b/docs/reference/snapshot-restore/repository-s3.asciidoc @@ -329,6 +329,11 @@ include::repository-shared-settings.asciidoc[] `1000` which is the maximum number supported by the https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up dangling multipart uploads. +`get_register_retry_delay` + + (<>) Sets the time to wait before trying again if an attempt to read a + <> fails. Defaults to `5s`. + NOTE: The option of defining client settings in the repository settings as documented below is considered deprecated, and will be removed in a future version. diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 49df078453327..10420ec9b3d38 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -40,6 +40,7 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; @@ -910,21 +911,43 @@ public void compareAndExchangeRegister( @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { ActionListener.completeWith(listener, () -> { - final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key)); - S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose); - try ( - var clientReference = blobStore.clientReference(); - var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); - var stream = s3Object.getObjectContent() - ) { - return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key)); - } catch (AmazonS3Exception e) { - logger.trace(() -> Strings.format("[%s]: getRegister failed", key), e); - if (e.getStatusCode() == 404) { - return OptionalBytesReference.EMPTY; - } else { - throw e; + final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS + ? BackoffPolicy.noBackoff() + : BackoffPolicy.constantBackoff(blobStore.getGetRegisterRetryDelay(), blobStore.getMaxRetries()); + final var retryDelayIterator = backoffPolicy.iterator(); + + Exception finalException = null; + while (true) { + final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key)); + S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose); + try ( + var clientReference = blobStore.clientReference(); + var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); + var stream = s3Object.getObjectContent() + ) { + return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key)); + } catch (Exception attemptException) { + logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException); + if (attemptException instanceof AmazonS3Exception amazonS3Exception && amazonS3Exception.getStatusCode() == 404) { + return OptionalBytesReference.EMPTY; + } else if (finalException == null) { + finalException = attemptException; + } else if (finalException != attemptException) { + finalException.addSuppressed(attemptException); + } } + if (retryDelayIterator.hasNext()) { + try { + // noinspection BusyWait + Thread.sleep(retryDelayIterator.next().millis()); + continue; + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + finalException.addSuppressed(interruptedException); + } + } + + throw finalException; } }); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 3e6b7c356cb11..bb8250b28d1ff 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -92,6 +92,8 @@ class S3BlobStore implements BlobStore { private final int bulkDeletionBatchSize; + private final TimeValue getRegisterRetryDelay; + S3BlobStore( S3Service service, String bucket, @@ -116,7 +118,7 @@ class S3BlobStore implements BlobStore { this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings()); - + this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings()); } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -409,6 +411,10 @@ public StorageClass getStorageClass() { return storageClass; } + public TimeValue getGetRegisterRetryDelay() { + return getRegisterRetryDelay; + } + public static StorageClass initStorageClass(String storageClass) { if ((storageClass == null) || storageClass.equals("")) { return StorageClass.Standard; diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index af385eeac6a5b..e7a6db475ffa4 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -202,6 +202,16 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.Dynamic ); + /** + * Time to wait before trying again if getRegister fails. + */ + static final Setting GET_REGISTER_RETRY_DELAY = Setting.timeSetting( + "get_register_retry_delay", + new TimeValue(5, TimeUnit.SECONDS), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic + ); + private final S3Service service; private final String bucket; diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 1443ff704efd1..323921a7b6021 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -14,6 +14,7 @@ import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.util.Base16; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -24,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.OptionalBytesReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; @@ -183,7 +185,10 @@ protected BlobContainer createBlobContainer( final RepositoryMetadata repositoryMetadata = new RepositoryMetadata( "repository", S3Repository.TYPE, - Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build() + Settings.builder() + .put(S3Repository.CLIENT_NAME.getKey(), clientName) + .put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO) + .build() ); final S3BlobStore s3BlobStore = new S3BlobStore( @@ -771,6 +776,75 @@ public void handle(HttpExchange exchange) throws IOException { assertThat(getRetryHistogramMeasurements(), empty()); } + public void testGetRegisterRetries() { + final var maxRetries = between(0, 3); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null); + + interface FailingHandlerFactory { + void addHandler(String blobName, Integer... responseCodes); + } + + final var requestCounter = new AtomicInteger(); + final FailingHandlerFactory countingFailingHandlerFactory = (blobName, responseCodes) -> httpServer.createContext( + downloadStorageEndpoint(blobContainer, blobName), + exchange -> { + requestCounter.incrementAndGet(); + try (exchange) { + exchange.sendResponseHeaders(randomFrom(responseCodes), -1); + } + } + ); + + countingFailingHandlerFactory.addHandler("test_register_no_internal_retries", HttpStatus.SC_UNPROCESSABLE_ENTITY); + countingFailingHandlerFactory.addHandler( + "test_register_internal_retries", + HttpStatus.SC_INTERNAL_SERVER_ERROR, + HttpStatus.SC_SERVICE_UNAVAILABLE + ); + countingFailingHandlerFactory.addHandler("test_register_not_found", HttpStatus.SC_NOT_FOUND); + + { + final var exceptionWithInternalRetries = safeAwaitFailure( + OptionalBytesReference.class, + l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_internal_retries", l) + ); + assertThat(exceptionWithInternalRetries, instanceOf(AmazonS3Exception.class)); + assertEquals((maxRetries + 1) * (maxRetries + 1), requestCounter.get()); + assertEquals(maxRetries, exceptionWithInternalRetries.getSuppressed().length); + } + + { + requestCounter.set(0); + final var exceptionWithoutInternalRetries = safeAwaitFailure( + OptionalBytesReference.class, + l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_no_internal_retries", l) + ); + assertThat(exceptionWithoutInternalRetries, instanceOf(AmazonS3Exception.class)); + assertEquals(maxRetries + 1, requestCounter.get()); + assertEquals(maxRetries, exceptionWithoutInternalRetries.getSuppressed().length); + } + + { + requestCounter.set(0); + final var repoAnalysisException = safeAwaitFailure( + OptionalBytesReference.class, + l -> blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, "test_register_no_internal_retries", l) + ); + assertThat(repoAnalysisException, instanceOf(AmazonS3Exception.class)); + assertEquals(1, requestCounter.get()); + assertEquals(0, repoAnalysisException.getSuppressed().length); + } + + { + requestCounter.set(0); + final OptionalBytesReference expectEmpty = safeAwait( + l -> blobContainer.getRegister(randomPurpose(), "test_register_not_found", l) + ); + assertEquals(OptionalBytesReference.EMPTY, expectEmpty); + assertEquals(1, requestCounter.get()); + } + } + @Override protected Matcher getMaxRetriesMatcher(int maxRetries) { // some attempts make meaningful progress and do not count towards the max retry limit From 0158c9b47a32c36b00aa84653ab793cf2880bce9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 15 Oct 2024 12:31:07 +0100 Subject: [PATCH 2/3] Update docs/changelog/114813.yaml --- docs/changelog/114813.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/114813.yaml diff --git a/docs/changelog/114813.yaml b/docs/changelog/114813.yaml new file mode 100644 index 0000000000000..1595b004178c4 --- /dev/null +++ b/docs/changelog/114813.yaml @@ -0,0 +1,5 @@ +pr: 114813 +summary: Retry `S3BlobContainer#getRegister` on all exceptions +area: Snapshot/Restore +type: enhancement +issues: [] From b4cc1c989288a90473ee30855c60a1c632dfc416 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 16 Oct 2024 06:59:14 +0100 Subject: [PATCH 3/3] Fall through comment --- .../java/org/elasticsearch/repositories/s3/S3BlobContainer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 10420ec9b3d38..902dcb42fc0cb 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -944,6 +944,7 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener