diff --git a/docs/changelog/114813.yaml b/docs/changelog/114813.yaml new file mode 100644 index 0000000000000..1595b004178c4 --- /dev/null +++ b/docs/changelog/114813.yaml @@ -0,0 +1,5 @@ +pr: 114813 +summary: Retry `S3BlobContainer#getRegister` on all exceptions +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/docs/reference/snapshot-restore/repository-s3.asciidoc b/docs/reference/snapshot-restore/repository-s3.asciidoc index b48bb5c4f059a..36f311b1cdd97 100644 --- a/docs/reference/snapshot-restore/repository-s3.asciidoc +++ b/docs/reference/snapshot-restore/repository-s3.asciidoc @@ -343,6 +343,11 @@ include::repository-shared-settings.asciidoc[] will disable retries altogether. Note that if retries are enabled in the Azure client, each of these retries comprises that many client-level retries. +`get_register_retry_delay` + + (<>) Sets the time to wait before trying again if an attempt to read a + <> fails. Defaults to `5s`. + NOTE: The option of defining client settings in the repository settings as documented below is considered deprecated, and will be removed in a future version. diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 49df078453327..902dcb42fc0cb 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -40,6 +40,7 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; @@ -910,21 +911,44 @@ public void compareAndExchangeRegister( @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { ActionListener.completeWith(listener, () -> { - final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key)); - S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose); - try ( - var clientReference = blobStore.clientReference(); - var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); - var stream = s3Object.getObjectContent() - ) { - return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key)); - } catch (AmazonS3Exception e) { - logger.trace(() -> Strings.format("[%s]: getRegister failed", key), e); - if (e.getStatusCode() == 404) { - return OptionalBytesReference.EMPTY; - } else { - throw e; + final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS + ? BackoffPolicy.noBackoff() + : BackoffPolicy.constantBackoff(blobStore.getGetRegisterRetryDelay(), blobStore.getMaxRetries()); + final var retryDelayIterator = backoffPolicy.iterator(); + + Exception finalException = null; + while (true) { + final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key)); + S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose); + try ( + var clientReference = blobStore.clientReference(); + var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); + var stream = s3Object.getObjectContent() + ) { + return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key)); + } catch (Exception attemptException) { + logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException); + if (attemptException instanceof AmazonS3Exception amazonS3Exception && amazonS3Exception.getStatusCode() == 404) { + return OptionalBytesReference.EMPTY; + } else if (finalException == null) { + finalException = attemptException; + } else if (finalException != attemptException) { + finalException.addSuppressed(attemptException); + } } + if (retryDelayIterator.hasNext()) { + try { + // noinspection BusyWait + Thread.sleep(retryDelayIterator.next().millis()); + continue; + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + finalException.addSuppressed(interruptedException); + // fall through and throw the exception + } + } + + throw finalException; } }); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index e2efc926f7e3a..5fb3254df819b 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -95,6 +95,8 @@ class S3BlobStore implements BlobStore { private final int bulkDeletionBatchSize; private final BackoffPolicy retryThrottledDeleteBackoffPolicy; + private final TimeValue getRegisterRetryDelay; + S3BlobStore( S3Service service, String bucket, @@ -121,6 +123,7 @@ class S3BlobStore implements BlobStore { this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings()); this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy; + this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings()); } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -468,6 +471,10 @@ public StorageClass getStorageClass() { return storageClass; } + public TimeValue getGetRegisterRetryDelay() { + return getRegisterRetryDelay; + } + public static StorageClass initStorageClass(String storageClass) { if ((storageClass == null) || storageClass.equals("")) { return StorageClass.Standard; diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 0750f6ab59d57..fde15d5d6e6bc 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -223,6 +223,16 @@ class S3Repository extends MeteredBlobStoreRepository { 0 ); + /** + * Time to wait before trying again if getRegister fails. + */ + static final Setting GET_REGISTER_RETRY_DELAY = Setting.timeSetting( + "get_register_retry_delay", + new TimeValue(5, TimeUnit.SECONDS), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic + ); + private final S3Service service; private final String bucket; diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 76d980c222a96..2eb2ed26153f9 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.OptionalBytesReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; @@ -191,7 +192,10 @@ protected BlobContainer createBlobContainer( final RepositoryMetadata repositoryMetadata = new RepositoryMetadata( "repository", S3Repository.TYPE, - Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build() + Settings.builder() + .put(S3Repository.CLIENT_NAME.getKey(), clientName) + .put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO) + .build() ); final S3BlobStore s3BlobStore = new S3BlobStore( @@ -945,6 +949,75 @@ private Set operationPurposesThatRetryOnDelete() { return Set.of(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA); } + public void testGetRegisterRetries() { + final var maxRetries = between(0, 3); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null); + + interface FailingHandlerFactory { + void addHandler(String blobName, Integer... responseCodes); + } + + final var requestCounter = new AtomicInteger(); + final FailingHandlerFactory countingFailingHandlerFactory = (blobName, responseCodes) -> httpServer.createContext( + downloadStorageEndpoint(blobContainer, blobName), + exchange -> { + requestCounter.incrementAndGet(); + try (exchange) { + exchange.sendResponseHeaders(randomFrom(responseCodes), -1); + } + } + ); + + countingFailingHandlerFactory.addHandler("test_register_no_internal_retries", HttpStatus.SC_UNPROCESSABLE_ENTITY); + countingFailingHandlerFactory.addHandler( + "test_register_internal_retries", + HttpStatus.SC_INTERNAL_SERVER_ERROR, + HttpStatus.SC_SERVICE_UNAVAILABLE + ); + countingFailingHandlerFactory.addHandler("test_register_not_found", HttpStatus.SC_NOT_FOUND); + + { + final var exceptionWithInternalRetries = safeAwaitFailure( + OptionalBytesReference.class, + l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_internal_retries", l) + ); + assertThat(exceptionWithInternalRetries, instanceOf(AmazonS3Exception.class)); + assertEquals((maxRetries + 1) * (maxRetries + 1), requestCounter.get()); + assertEquals(maxRetries, exceptionWithInternalRetries.getSuppressed().length); + } + + { + requestCounter.set(0); + final var exceptionWithoutInternalRetries = safeAwaitFailure( + OptionalBytesReference.class, + l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_no_internal_retries", l) + ); + assertThat(exceptionWithoutInternalRetries, instanceOf(AmazonS3Exception.class)); + assertEquals(maxRetries + 1, requestCounter.get()); + assertEquals(maxRetries, exceptionWithoutInternalRetries.getSuppressed().length); + } + + { + requestCounter.set(0); + final var repoAnalysisException = safeAwaitFailure( + OptionalBytesReference.class, + l -> blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, "test_register_no_internal_retries", l) + ); + assertThat(repoAnalysisException, instanceOf(AmazonS3Exception.class)); + assertEquals(1, requestCounter.get()); + assertEquals(0, repoAnalysisException.getSuppressed().length); + } + + { + requestCounter.set(0); + final OptionalBytesReference expectEmpty = safeAwait( + l -> blobContainer.getRegister(randomPurpose(), "test_register_not_found", l) + ); + assertEquals(OptionalBytesReference.EMPTY, expectEmpty); + assertEquals(1, requestCounter.get()); + } + } + @Override protected Matcher getMaxRetriesMatcher(int maxRetries) { // some attempts make meaningful progress and do not count towards the max retry limit