From 1ebbae2f796179dac23e989f8975014f2b6e621c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 25 Nov 2025 20:03:15 +1100 Subject: [PATCH 01/15] Use common retry logic for GCS --- ...eCloudStorageBlobStoreRepositoryTests.java | 24 +- .../GoogleCloudStorageThirdPartyTests.java | 2 +- .../gcs/GoogleCloudStorageBlobStore.java | 32 +- .../gcs/GoogleCloudStorageClientSettings.java | 24 +- .../gcs/GoogleCloudStoragePlugin.java | 3 +- ...GoogleCloudStorageRetryingInputStream.java | 285 +++++++----------- .../gcs/GoogleCloudStorageService.java | 108 +++++-- ...CloudStorageBlobContainerRetriesTests.java | 25 +- ...leCloudStorageBlobContainerStatsTests.java | 4 +- ...leCloudStorageBlobStoreContainerTests.java | 11 +- ...GoogleCloudStorageClientSettingsTests.java | 17 +- ...GoogleCloudStorageClientsManagerTests.java | 70 +++-- .../gcs/GoogleCloudStoragePluginTests.java | 3 +- ...eCloudStorageRetryingInputStreamTests.java | 11 +- .../gcs/GoogleCloudStorageServiceTests.java | 35 ++- .../s3/S3RetryingInputStream.java | 13 +- .../common/blobstore/RetryingInputStream.java | 38 ++- .../blobstore/RetryingInputStreamTests.java | 100 ++++-- .../AbstractThirdPartyRepositoryTestCase.java | 6 + 19 files changed, 501 insertions(+), 310 deletions(-) diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 2532a9212f9e2..b2d55169a7372 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -13,7 +13,6 @@ import fixture.gcs.GoogleCloudStorageHttpHandler; import fixture.gcs.TestUtils; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRetryStrategy; @@ -60,9 +59,11 @@ import java.util.Map; import static org.elasticsearch.common.io.Streams.readFully; +import static org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; @@ -118,6 +119,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { settings.put(super.nodeSettings(nodeOrdinal, otherSettings)); settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()); settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token"); + settings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), 6); final MockSecureSettings secureSettings = new MockSecureSettings(); final byte[] serviceAccount = TestUtils.createServiceAccount(random()); @@ -195,7 +197,7 @@ public void testWriteReadLarge() throws IOException { random().nextBytes(data); writeBlob(container, "foobar", new BytesArray(data), false); } - try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) { + try (InputStream stream = container.readBlob(randomRetryingPurpose(), "foobar")) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; @@ -218,7 +220,7 @@ public void testWriteFileMultipleOfChunkSize() throws IOException { byte[] initialValue = randomByteArrayOfLength(uploadSize); container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); - BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key)); assertEquals(new BytesArray(initialValue), reference); container.deleteBlobsIgnoringIfNotExists(randomPurpose(), Iterators.single(key)); @@ -242,24 +244,18 @@ protected GoogleCloudStorageService createStorageService(ClusterService clusterS @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { - StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions); + StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour); return options.toBuilder() .setStorageRetryStrategy(StorageRetryStrategy.getLegacyStorageRetryStrategy()) - .setHost(options.getHost()) - .setCredentials(options.getCredentials()) .setRetrySettings( - RetrySettings.newBuilder() - .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) + options.getRetrySettings() + .toBuilder() .setInitialRetryDelay(Duration.ofMillis(10L)) - .setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier()) .setMaxRetryDelay(Duration.ofSeconds(1L)) - .setMaxAttempts(0) .setJittered(false) - .setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout()) - .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) - .setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout()) .build() ) .build(); diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java index 520f922280eb0..b85ddde004568 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java @@ -115,7 +115,7 @@ public void testResumeAfterUpdate() { executeOnBlobStore(repo, container -> { container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true); - try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), blobKey)) { // Trigger the first request for the blob, partially read it int read = inputStream.read(); assert read != -1; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 2af940c19bbac..f02692a742a29 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -139,8 +139,32 @@ class GoogleCloudStorageBlobStore implements BlobStore { this.casBackoffPolicy = casBackoffPolicy; } - private MeteredStorage client() throws IOException { - return storageService.client(projectId, clientName, repositoryName, statsCollector); + /** + * Get a client that will retry according to its configured settings + * + * @return A client + */ + MeteredStorage client() throws IOException { + return storageService.client( + projectId, + clientName, + repositoryName, + statsCollector, + GoogleCloudStorageService.RetryBehaviour.ClientConfigured + ); + } + + /** + * Get a client that will not retry on failure + * + * @return A client with max retries configured to zero + */ + MeteredStorage clientNoRetries() throws IOException { + return storageService.client(projectId, clientName, repositoryName, statsCollector, GoogleCloudStorageService.RetryBehaviour.None); + } + + int getMaxRetries() { + return storageService.clientSettings(projectId, clientName).getMaxRetries(); } @Override @@ -229,7 +253,7 @@ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException * @return the InputStream used to read the blob's content */ InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { - return new GoogleCloudStorageRetryingInputStream(purpose, client(), BlobId.of(bucketName, blobName)); + return new GoogleCloudStorageRetryingInputStream(this, purpose, BlobId.of(bucketName, blobName)); } /** @@ -252,8 +276,8 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l return new ByteArrayInputStream(new byte[0]); } else { return new GoogleCloudStorageRetryingInputStream( + this, purpose, - client(), BlobId.of(bucketName, blobName), position, Math.addExact(position, length - 1) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java index 582e0b48121fa..10d557ce2a8aa 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java @@ -120,6 +120,17 @@ public class GoogleCloudStorageClientSettings { () -> PROXY_HOST_SETTING ); + /** + * The maximum number of retries to use when a GCS request fails. + *

+ * Default to 5 to match {@link com.google.cloud.ServiceOptions#getDefaultRetrySettings()} + */ + static final Setting.AffixSetting MAX_RETRIES_SETTING = Setting.affixKeySetting( + PREFIX, + "max_retries", + (key) -> Setting.intSetting(key, 5, 0, Setting.Property.NodeScope) + ); + /** The credentials used by the client to connect to the Storage endpoint. */ private final ServiceAccountCredentials credential; @@ -144,6 +155,8 @@ public class GoogleCloudStorageClientSettings { @Nullable private final Proxy proxy; + private final int maxRetries; + GoogleCloudStorageClientSettings( final ServiceAccountCredentials credential, final String endpoint, @@ -152,7 +165,8 @@ public class GoogleCloudStorageClientSettings { final TimeValue readTimeout, final String applicationName, final URI tokenUri, - final Proxy proxy + final Proxy proxy, + final int maxRetries ) { this.credential = credential; this.endpoint = endpoint; @@ -162,6 +176,7 @@ public class GoogleCloudStorageClientSettings { this.applicationName = applicationName; this.tokenUri = tokenUri; this.proxy = proxy; + this.maxRetries = maxRetries; } public ServiceAccountCredentials getCredential() { @@ -197,6 +212,10 @@ public Proxy getProxy() { return proxy; } + public int getMaxRetries() { + return maxRetries; + } + @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; @@ -249,7 +268,8 @@ static GoogleCloudStorageClientSettings getClientSettings(final Settings setting getConfigValue(settings, clientName, READ_TIMEOUT_SETTING), getConfigValue(settings, clientName, APPLICATION_NAME_SETTING), getConfigValue(settings, clientName, TOKEN_URI_SETTING), - proxy + proxy, + getConfigValue(settings, clientName, MAX_RETRIES_SETTING) ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 2c35aac1a46e8..5b930337bc447 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -93,7 +93,8 @@ public List> getSettings() { GoogleCloudStorageClientSettings.TOKEN_URI_SETTING, GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING, GoogleCloudStorageClientSettings.PROXY_HOST_SETTING, - GoogleCloudStorageClientSettings.PROXY_PORT_SETTING + GoogleCloudStorageClientSettings.PROXY_PORT_SETTING, + GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index a74e86d8ee677..f5c06b998ee18 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -9,145 +9,145 @@ package org.elasticsearch.repositories.gcs; import com.google.api.client.http.HttpResponse; -import com.google.cloud.BaseService; -import com.google.cloud.RetryHelper; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.core.IOUtils; +import org.elasticsearch.common.blobstore.RetryingInputStream; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.rest.RestStatus; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.core.Strings.format; /** * Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred. * This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing * the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future. */ -class GoogleCloudStorageRetryingInputStream extends InputStream { +class GoogleCloudStorageRetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class); - static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - - private final OperationPurpose purpose; - private final MeteredStorage client; - private final BlobId blobId; - private final long start; - private final long end; - private final int maxAttempts; - private InputStream currentStream; - private int attempt = 1; - private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); - private long currentOffset; - private boolean closed; - private Long lastGeneration; - // Used for testing only - GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException { - this(purpose, client, blobId, 0, Long.MAX_VALUE - 1); + GoogleCloudStorageRetryingInputStream(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) + throws IOException { + this(blobStore, purpose, blobId, 0, Long.MAX_VALUE - 1); } // Used for testing only - GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId, long start, long end) - throws IOException { - if (start < 0L) { - throw new IllegalArgumentException("start must be non-negative"); - } - if (end < start || end == Long.MAX_VALUE) { - throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); - } - this.purpose = purpose; - this.client = client; - this.blobId = blobId; - this.start = start; - this.end = end; - this.maxAttempts = client.getOptions().getRetrySettings().getMaxAttempts(); - this.currentStream = openStream(); + GoogleCloudStorageRetryingInputStream( + GoogleCloudStorageBlobStore blobStore, + OperationPurpose purpose, + BlobId blobId, + long start, + long end + ) throws IOException { + super(new GoogleCloudStorageBlobStoreServices(blobStore, purpose, blobId), purpose, start, end); } - private InputStream openStream() throws IOException { - try { + private static class GoogleCloudStorageBlobStoreServices implements BlobStoreServices { + + private final GoogleCloudStorageBlobStore blobStore; + private final OperationPurpose purpose; + private final BlobId blobId; + + private GoogleCloudStorageBlobStoreServices(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) { + this.blobStore = blobStore; + this.purpose = purpose; + this.blobId = blobId; + } + + @Override + public SingleAttemptInputStream getInputStream(@Nullable Long lastGeneration, long start, long end) throws IOException { + final MeteredStorage client = blobStore.clientNoRetries(); try { - return RetryHelper.runWithRetries(() -> { - try { - final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); - meteredGet.setReturnRawInputStream(true); - if (lastGeneration != null) { - meteredGet.setGeneration(lastGeneration); - } + try { + final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); + meteredGet.setReturnRawInputStream(true); + if (lastGeneration != null) { + meteredGet.setGeneration(lastGeneration); + } - if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { - if (meteredGet.getRequestHeaders() != null) { - meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end); - } - } - final HttpResponse resp = meteredGet.executeMedia(); - // Store the generation of the first response we received, so we can detect - // if the file has changed if we need to resume - if (lastGeneration == null) { - lastGeneration = parseGenerationHeader(resp); + if (start > 0 || end < Long.MAX_VALUE - 1) { + if (meteredGet.getRequestHeaders() != null) { + meteredGet.getRequestHeaders().setRange("bytes=" + start + "-" + end); } + } + final HttpResponse resp = meteredGet.executeMedia(); + // Store the generation of the first response we received, so we can detect + // if the file has changed if we need to resume + if (lastGeneration == null) { + lastGeneration = parseGenerationHeader(resp); + } - final Long contentLength = resp.getHeaders().getContentLength(); - InputStream content = resp.getContent(); - if (contentLength != null) { - content = new ContentLengthValidatingInputStream(content, contentLength); - } - return content; - } catch (IOException e) { - throw StorageException.translate(e); + final Long contentLength = resp.getHeaders().getContentLength(); + InputStream content = resp.getContent(); + if (contentLength != null) { + return new ContentLengthValidatingInputStream(content, start, lastGeneration, contentLength); } - }, client.getOptions().getRetrySettings(), BaseService.EXCEPTION_HANDLER, client.getOptions().getClock()); - } catch (RetryHelper.RetryHelperException e) { - throw StorageException.translateAndThrow(e); - } - } catch (StorageException storageException) { - if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { - if (lastGeneration != null) { - throw addSuppressedExceptions( - new NoSuchFileException( + return new ContentLengthValidatingInputStream(content, start, lastGeneration); + } catch (IOException e) { + throw StorageException.translate(e); + } + } catch (StorageException storageException) { + if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { + if (lastGeneration != null) { + throw new NoSuchFileException( "Blob object [" + blobId.getName() + "] generation [" + lastGeneration + "] unavailable on resume (contents changed, or object deleted): " + storageException.getMessage() - ) - ); - } else { - throw addSuppressedExceptions( - new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()) - ); + ); + } else { + throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()); + } } - } - if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { - long currentPosition = Math.addExact(start, currentOffset); - throw addSuppressedExceptions( - new RequestedRangeNotSatisfiedException( + if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { + throw new RequestedRangeNotSatisfiedException( blobId.getName(), - currentPosition, - (end < Long.MAX_VALUE - 1) ? end - currentPosition + 1 : end, + start, + (end < Long.MAX_VALUE - 1) ? end - start + 1 : end, storageException - ) - ); + ); + } + throw storageException; } - throw addSuppressedExceptions(storageException); + } + + @Override + public void onRetryStarted(String action) { + // No retry metrics for GCS + } + + @Override + public void onRetrySucceeded(String action, long numberOfRetries) { + // No retry metrics for GCS + } + + @Override + public long getMeaningfulProgressSize() { + return Math.max(1L, GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE / 100L); + } + + @Override + public int getMaxRetries() { + return blobStore.getMaxRetries(); + } + + @Override + public String getBlobDescription() { + return blobId.toString(); } } - private Long parseGenerationHeader(HttpResponse response) { + private static Long parseGenerationHeader(HttpResponse response) { final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation"); if (generationHeader != null) { try { @@ -167,13 +167,23 @@ private Long parseGenerationHeader(HttpResponse response) { // Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream // We have to implement our own validation logic here - static final class ContentLengthValidatingInputStream extends FilterInputStream { + static final class ContentLengthValidatingInputStream extends SingleAttemptInputStream { + + private final InputStream in; + private final long firstOffset; + private final long generation; private final long contentLength; private long read = 0L; - ContentLengthValidatingInputStream(InputStream in, long contentLength) { - super(in); + ContentLengthValidatingInputStream(InputStream in, long firstOffset, Long generation) { + this(in, firstOffset, generation, -1L); + } + + ContentLengthValidatingInputStream(InputStream in, long firstOffset, Long generation, long contentLength) { + this.in = in; + this.firstOffset = firstOffset; + this.generation = generation; this.contentLength = contentLength; } @@ -211,36 +221,15 @@ private void checkContentLengthOnEOF() throws IOException { throw new IOException("Connection closed prematurely: read = " + read + ", Content-Length = " + contentLength); } } - } - @Override - public int read() throws IOException { - ensureOpen(); - while (true) { - try { - final int result = currentStream.read(); - currentOffset += 1; - return result; - } catch (IOException e) { - reopenStreamOrFail(StorageException.translate(e)); - } + @Override + protected long getFirstOffset() { + return firstOffset; } - } - @Override - public int read(byte[] b, int off, int len) throws IOException { - ensureOpen(); - while (true) { - try { - final int bytesRead = currentStream.read(b, off, len); - if (bytesRead == -1) { - return -1; - } - currentOffset += bytesRead; - return bytesRead; - } catch (IOException e) { - reopenStreamOrFail(StorageException.translate(e)); - } + @Override + protected Long getVersion() { + return generation; } } @@ -251,52 +240,4 @@ public int read(byte[] b, int off, int len) throws IOException { void closeCurrentStream() throws IOException { currentStream.close(); } - - private void ensureOpen() { - if (closed) { - assert false : "using GoogleCloudStorageRetryingInputStream after close"; - throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close"); - } - } - - private void reopenStreamOrFail(StorageException e) throws IOException { - if (attempt >= maxAttempts) { - throw addSuppressedExceptions(e); - } - logger.debug( - () -> format("failed reading [%s] at offset [%s], attempt [%s] of [%s], retrying", blobId, currentOffset, attempt, maxAttempts), - e - ); - attempt += 1; - if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { - failures.add(e); - } - IOUtils.closeWhileHandlingException(currentStream); - currentStream = openStream(); - } - - @Override - public void close() throws IOException { - currentStream.close(); - closed = true; - } - - @Override - public long skip(long n) throws IOException { - // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, - // so for now we will rely on the default implementation which just discards bytes by reading. - return super.skip(n); - } - - @Override - public void reset() { - throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking"); - } - - private T addSuppressedExceptions(T e) { - for (StorageException failure : failures) { - e.addSuppressed(failure); - } - return e; - } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index cb6b8b334181e..b7f8423f53c2a 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -15,6 +15,7 @@ import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.util.SecurityUtils; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.ServiceOptions; @@ -94,6 +95,23 @@ public void refreshAndClearCache(Map c clientsManager.refreshAndClearCacheForClusterClients(clientsSettings); } + public enum RetryBehaviour { + ClientConfigured { + @Override + public RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings) { + return ServiceOptions.getDefaultRetrySettings().toBuilder().setMaxAttempts(settings.getMaxRetries() + 1).build(); + } + }, + None { + @Override + public RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings) { + return ServiceOptions.getNoRetrySettings(); + } + }; + + public abstract RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings); + } + /** * Attempts to retrieve a client from the cache. If the client does not exist it * will be created from the latest settings and will populate the cache. The @@ -111,9 +129,14 @@ public MeteredStorage client( @Nullable final ProjectId projectId, final String clientName, final String repositoryName, - final GcsRepositoryStatsCollector statsCollector + final GcsRepositoryStatsCollector statsCollector, + final RetryBehaviour retryBehaviour ) throws IOException { - return clientsManager.client(projectId, clientName, repositoryName, statsCollector); + return clientsManager.client(projectId, clientName, repositoryName, statsCollector, retryBehaviour); + } + + GoogleCloudStorageClientSettings clientSettings(@Nullable ProjectId projectId, final String clientName) { + return clientsManager.clientSettings(projectId, clientName); } /** @@ -138,8 +161,11 @@ GoogleCloudStorageClientsManager getClientsManager() { * @return a new client storage instance that can be used to manage objects * (blobs) */ - private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) - throws IOException { + private MeteredStorage createClient( + GoogleCloudStorageClientSettings gcsClientSettings, + GcsRepositoryStatsCollector statsCollector, + RetryBehaviour retryBehaviour + ) throws IOException { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); // requires java.lang.RuntimePermission "setFactory" @@ -183,13 +209,14 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser } }; - final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions); + final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour); return new MeteredStorage(storageOptions.getService(), statsCollector); } StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder() .setStorageRetryStrategy(getRetryStrategy()) @@ -198,7 +225,8 @@ StorageOptions createStorageOptions( return Strings.hasLength(gcsClientSettings.getApplicationName()) ? Map.of("user-agent", gcsClientSettings.getApplicationName()) : Map.of(); - }); + }) + .setRetrySettings(retryBehaviour.createRetrySettings(gcsClientSettings)); if (Strings.hasLength(gcsClientSettings.getHost())) { storageOptionsBuilder.setHost(gcsClientSettings.getHost()); } @@ -403,12 +431,25 @@ void refreshAndClearCacheForClusterClients(Map clientCache = emptyMap(); + protected volatile Map clientCache = emptyMap(); /** * Get the current client settings for all clients in this holder. @@ -476,46 +519,57 @@ abstract class ClientsHolder { * @return a cached client storage instance that can be used to manage objects * (blobs) */ - MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector) - throws IOException { + MeteredStorage client( + final String clientName, + final String repositoryName, + final GcsRepositoryStatsCollector statsCollector, + final RetryBehaviour retryBehaviour + ) throws IOException { + final var cacheKey = new CacheKey(repositoryName, retryBehaviour); { - final MeteredStorage storage = clientCache.get(repositoryName); + final MeteredStorage storage = clientCache.get(cacheKey); if (storage != null) { return storage; } } synchronized (this) { - final MeteredStorage existing = clientCache.get(repositoryName); + final MeteredStorage existing = clientCache.get(cacheKey); if (existing != null) { return existing; } - final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName); - - if (settings == null) { - throw new IllegalArgumentException( - "Unknown client name [" + clientName + "]. Existing client configs: " + allClientSettings().keySet() - ); - } + final GoogleCloudStorageClientSettings settings = clientSettings(clientName); logger.debug(() -> format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost())); - final MeteredStorage storage = createClient(settings, statsCollector); - clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage); + final MeteredStorage storage = createClient(settings, statsCollector, retryBehaviour); + clientCache = Maps.copyMapWithAddedEntry(clientCache, cacheKey, storage); return storage; } } + public GoogleCloudStorageClientSettings clientSettings(String clientName) { + final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName); + + if (settings == null) { + throw new IllegalArgumentException( + "Unknown client name [" + clientName + "]. Existing client configs: " + allClientSettings().keySet() + ); + } + + return settings; + } + synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() - .filter(entry -> entry.getKey().equals(repositoryName) == false) + .filter(entry -> entry.getKey().repositoryName().equals(repositoryName) == false) .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } // package private for tests final boolean hasCachedClientForRepository(String repositoryName) { - return clientCache.containsKey(repositoryName); + return clientCache.keySet().stream().anyMatch(key -> key.repositoryName().equals(repositoryName)); } } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index a09aa8142ad63..48cf6b5d96d90 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -85,6 +85,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.hamcrest.Matchers.anyOf; @@ -146,6 +147,9 @@ protected BlobContainer createBlobContainer( if (readTimeout != null) { clientSettings.put(READ_TIMEOUT_SETTING.getConcreteSettingForNamespace(client).getKey(), readTimeout); } + if (maxRetries != null) { + clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(client).getKey(), maxRetries); + } final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random())); @@ -155,7 +159,8 @@ protected BlobContainer createBlobContainer( @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { final HttpTransportOptions requestCountingHttpTransportOptions = new HttpTransportOptions( HttpTransportOptions.newBuilder() @@ -182,7 +187,11 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser }; } }; - final StorageOptions options = super.createStorageOptions(gcsClientSettings, requestCountingHttpTransportOptions); + final StorageOptions options = super.createStorageOptions( + gcsClientSettings, + requestCountingHttpTransportOptions, + retryBehaviour + ); final RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder() .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) .setInitialRetryDelay(Duration.ofMillis(10L)) @@ -191,10 +200,8 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser .setJittered(false) .setInitialRpcTimeout(Duration.ofSeconds(1)) .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) - .setMaxRpcTimeout(Duration.ofSeconds(1)); - if (maxRetries != null) { - retrySettingsBuilder.setMaxAttempts(maxRetries + 1); - } + .setMaxRpcTimeout(Duration.ofSeconds(1)) + .setMaxAttempts(options.getRetrySettings().getMaxAttempts()); return options.toBuilder() .setStorageRetryStrategy(getRetryStrategy()) .setHost(options.getHost()) @@ -273,7 +280,7 @@ public void testReadLargeBlobWithRetries() throws Exception { exchange.close(); }); - try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "large_blob_retries")) { + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "large_blob_retries")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); } } @@ -600,10 +607,10 @@ public void testContentsChangeWhileStreaming() throws IOException { byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); - BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key)); assertEquals(new BytesArray(initialValue), reference); - try (InputStream inputStream = container.readBlob(randomPurpose(), key)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), key)) { // Trigger the first chunk to load int read = inputStream.read(); assert read != -1; diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java index 7e2b4348823fd..6938393077b0b 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java @@ -53,6 +53,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.StorageOperation.GET; @@ -253,7 +254,8 @@ private ContainerAndBlobStore createBlobContainer(final String repositoryName) t READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(getEndpointForServer(httpServer) + "/token"), - null + null, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); googleCloudStorageService.refreshAndClearCache(Map.of(clientName, clientSettings)); final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 849c09c506078..e6c854c23be9d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -82,8 +82,15 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti final MeteredStorage meteredStorage = new MeteredStorage(storage, storageRpc, new GcsRepositoryStatsCollector()); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); - when(storageService.client(eq(ProjectId.DEFAULT), any(String.class), any(String.class), any(GcsRepositoryStatsCollector.class))) - .thenReturn(meteredStorage); + when( + storageService.client( + eq(ProjectId.DEFAULT), + any(String.class), + any(String.class), + any(GcsRepositoryStatsCollector.class), + any(GoogleCloudStorageService.RetryBehaviour.class) + ) + ).thenReturn(meteredStorage); try ( BlobStore store = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java index ac69afe7def6a..e0dcb5040ff8d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java @@ -40,6 +40,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings; @@ -123,7 +124,8 @@ public void testProjectIdDefaultsToCredentials() throws Exception { READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(""), - null + null, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); assertEquals(credential.getProjectId(), googleCloudStorageClientSettings.getProjectId()); } @@ -140,7 +142,8 @@ public void testLoadsProxySettings() throws Exception { READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(""), - proxy + proxy, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); assertEquals(proxy, googleCloudStorageClientSettings.getProxy()); } @@ -278,6 +281,13 @@ private static GoogleCloudStorageClientSettings randomClient( applicationName = APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY); } + int maxRetries; + if (randomBoolean()) { + maxRetries = randomIntBetween(0, 5); + } else { + maxRetries = MAX_RETRIES_SETTING.getDefault(Settings.EMPTY); + } + return new GoogleCloudStorageClientSettings( credential, endpoint, @@ -286,7 +296,8 @@ private static GoogleCloudStorageClientSettings randomClient( readTimeout, applicationName, new URI(""), - null + null, + maxRetries ); } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java index a3a538b82aa6f..b1c9e85834f3e 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java @@ -147,32 +147,42 @@ public void testClientsLifeCycleForSingleProject() throws Exception { final ProjectId projectId = randomUniqueProjectId(); final String clientName = randomFrom(clientNames); final String anotherClientName = randomValueOtherThan(clientName, () -> randomFrom(clientNames)); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); + final GoogleCloudStorageService.RetryBehaviour otherRetryBehaviour = randomValueOtherThan( + retryBehaviour, + GoogleCloudStorageClientsManagerTests::randomRetryBehaviour + ); // Configure project secrets for one client - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); { assertProjectClientSettings(projectId, clientName); // Retrieve client for the 1st time - final var initialClient = getClientFromManager(projectId, clientName); + final var initialClient = getClientFromManager(projectId, clientName, retryBehaviour); assertClientCredentials(projectId, clientName, initialClient); // Client is cached when retrieved again - assertThat(initialClient, sameInstance(getClientFromManager(projectId, clientName))); + assertThat(initialClient, sameInstance(getClientFromManager(projectId, clientName, retryBehaviour))); // Client not configured cannot be accessed, - assertClientNotFound(projectId, anotherClientName); + assertClientNotFound(projectId, anotherClientName, retryBehaviour); // Update client secrets should release and recreate the client updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName, anotherClientName)); assertProjectClientSettings(projectId, clientName, anotherClientName); - final var clientUpdated = getClientFromManager(projectId, clientName); + final var clientUpdated = getClientFromManager(projectId, clientName, retryBehaviour); assertThat(clientUpdated, not(sameInstance(initialClient))); // A different client for a different client name - final var anotherClient = getClientFromManager(projectId, anotherClientName); + final var anotherClient = getClientFromManager(projectId, anotherClientName, retryBehaviour); assertClientCredentials(projectId, anotherClientName, anotherClient); assertThat(anotherClient, not(sameInstance(clientUpdated))); + + // A different client for a different retry behaviour + final var anotherRetryBehaviour = getClientFromManager(projectId, clientName, otherRetryBehaviour); + assertClientCredentials(projectId, clientName, anotherRetryBehaviour); + assertThat(anotherRetryBehaviour, not(sameInstance(clientUpdated))); } // Remove project secrets or the entire project @@ -181,16 +191,17 @@ public void testClientsLifeCycleForSingleProject() throws Exception { } else { removeProjectFromClusterState(projectId); } - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); assertThat(gcsClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId))); } public void testClientsWithNoCredentialsAreFilteredOut() throws IOException { final ProjectId projectId = randomUniqueProjectId(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientNames.toArray(String[]::new))); for (var clientName : clientNames) { - assertNotNull(getClientFromManager(projectId, clientName)); + assertNotNull(getClientFromManager(projectId, clientName, retryBehaviour)); } final List clientsWithIncorrectSecretsConfig = randomNonEmptySubsetOf(clientNames); @@ -205,15 +216,16 @@ public void testClientsWithNoCredentialsAreFilteredOut() throws IOException { for (var clientName : clientNames) { if (clientsWithIncorrectSecretsConfig.contains(clientName)) { - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); } else { - assertNotNull(getClientFromManager(projectId, clientName)); + assertNotNull(getClientFromManager(projectId, clientName, retryBehaviour)); } } } public void testClientsForMultipleProjects() throws InterruptedException { final List projectIds = randomList(2, 8, ESTestCase::randomUniqueProjectId); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); final List threads = projectIds.stream().map(projectId -> new Thread(() -> { final int iterations = between(1, 3); @@ -224,7 +236,7 @@ public void testClientsForMultipleProjects() throws InterruptedException { assertProjectClientSettings(projectId, clientNames.toArray(String[]::new)); for (var clientName : shuffledList(clientNames)) { try { - final var meteredStorage = getClientFromManager(projectId, clientName); + final var meteredStorage = getClientFromManager(projectId, clientName, retryBehaviour); assertClientCredentials(projectId, clientName, meteredStorage); } catch (IOException e) { fail(e); @@ -237,7 +249,7 @@ public void testClientsForMultipleProjects() throws InterruptedException { removeProjectFromClusterState(projectId); } assertThat(gcsClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId))); - clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName)); + clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName, retryBehaviour)); } })).toList(); @@ -251,16 +263,17 @@ public void testClusterAndProjectClients() throws IOException { final ProjectId projectId = randomUniqueProjectId(); final String clientName = randomFrom(clientNames); final boolean configureProjectClientsFirst = randomBoolean(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); if (configureProjectClientsFirst) { updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); } - final var clusterClient = getClientFromService(projectIdForClusterClient(), clientName); + final var clusterClient = getClientFromService(projectIdForClusterClient(), clientName, retryBehaviour); if (configureProjectClientsFirst == false) { assertThat(gcsClientsManager.getPerProjectClientsHolders(), anEmptyMap()); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); } - final var projectClient = getClientFromService(projectId, clientName); + final var projectClient = getClientFromService(projectId, clientName, retryBehaviour); assertThat(projectClient, not(sameInstance(clusterClient))); // Release the cluster client @@ -290,15 +303,27 @@ public void testProjectClientsDisabled() throws IOException { // Cluster client still works final String clientName = randomFrom(clientNames); - assertNotNull(getClientFromService(projectIdForClusterClient(), clientName)); + assertNotNull(getClientFromService(projectIdForClusterClient(), clientName, randomRetryBehaviour())); } - private MeteredStorage getClientFromManager(ProjectId projectId, String clientName) throws IOException { - return gcsClientsManager.client(projectId, clientName, repoNameForClient(clientName), statsCollector); + private MeteredStorage getClientFromManager( + ProjectId projectId, + String clientName, + GoogleCloudStorageService.RetryBehaviour retryBehaviour + ) throws IOException { + return gcsClientsManager.client(projectId, clientName, repoNameForClient(clientName), statsCollector, retryBehaviour); } - private MeteredStorage getClientFromService(ProjectId projectId, String clientName) throws IOException { - return googleCloudStorageService.client(projectId, clientName, repoNameForClient(clientName), statsCollector); + private static GoogleCloudStorageService.RetryBehaviour randomRetryBehaviour() { + return randomFrom(GoogleCloudStorageService.RetryBehaviour.values()); + } + + private MeteredStorage getClientFromService( + ProjectId projectId, + String clientName, + GoogleCloudStorageService.RetryBehaviour retryBehaviour + ) throws IOException { + return googleCloudStorageService.client(projectId, clientName, repoNameForClient(clientName), statsCollector, retryBehaviour); } private ProjectId projectIdForClusterClient() { @@ -332,8 +357,11 @@ private void assertClientCredentials(ProjectId projectId, String clientName, Met assertThat(credentials.getPrivateKeyId(), equalTo(projectClientPrivateKeyId(projectId, clientName))); } - private void assertClientNotFound(ProjectId projectId, String clientName) { - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> getClientFromManager(projectId, clientName)); + private void assertClientNotFound(ProjectId projectId, String clientName, GoogleCloudStorageService.RetryBehaviour retryBehaviour) { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> getClientFromManager(projectId, clientName, retryBehaviour) + ); assertThat( e.getMessage(), anyOf( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java index 7b9ef189b7459..e96746dabb536 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java @@ -42,7 +42,8 @@ public void testExposedSettings() { "gcs.client.*.token_uri", "gcs.client.*.proxy.type", "gcs.client.*.proxy.host", - "gcs.client.*.proxy.port" + "gcs.client.*.proxy.port", + "gcs.client.*.max_retries" ), settings.stream().map(Setting::getKey).toList() ); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java index 2b223c7a16342..5908f29bb194d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java @@ -131,8 +131,11 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d HttpRequest httpRequest = transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); HttpResponse httpResponse = httpRequest.execute(); when(get.executeMedia()).thenReturn(httpResponse); + final GoogleCloudStorageBlobStore mockBlobStore = mock(GoogleCloudStorageBlobStore.class); + when(mockBlobStore.clientNoRetries()).thenReturn(meteredStorage); + when(mockBlobStore.getMaxRetries()).thenReturn(6); - return new GoogleCloudStorageRetryingInputStream(OperationPurpose.SNAPSHOT_DATA, meteredStorage, blobId); + return new GoogleCloudStorageRetryingInputStream(mockBlobStore, OperationPurpose.SNAPSHOT_DATA, blobId); } private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] data, int position, int length) throws IOException { @@ -148,9 +151,13 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d when(get.executeMedia()).thenReturn(httpResponse); } + final GoogleCloudStorageBlobStore mockBlobStore = mock(GoogleCloudStorageBlobStore.class); + when(mockBlobStore.clientNoRetries()).thenReturn(meteredStorage); + when(mockBlobStore.getMaxRetries()).thenReturn(6); + return new GoogleCloudStorageRetryingInputStream( + mockBlobStore, OperationPurpose.SNAPSHOT_DATA, - meteredStorage, blobId, position, position + length - 1 diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index bf4745a16b41c..369d820a638a0 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -40,6 +40,7 @@ import java.util.Locale; import java.util.UUID; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageService.RetryBehaviour.ClientConfigured; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -58,6 +59,7 @@ public void testClientInitializer() throws Exception { + ":" + randomIntBetween(1, 65535); final String projectIdName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); + final int maxRetries = randomIntBetween(0, 10); final Settings settings = Settings.builder() .put( GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), @@ -76,6 +78,7 @@ public void testClientInitializer() throws Exception { .put(GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "HTTP") .put(GoogleCloudStorageClientSettings.PROXY_HOST_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "192.168.52.15") .put(GoogleCloudStorageClientSettings.PROXY_PORT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), 8080) + .put(GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries) .build(); SetOnce proxy = new SetOnce<>(); final var clusterService = ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()); @@ -89,17 +92,18 @@ void notifyProxyIsSet(Proxy p) { var statsCollector = new GcsRepositoryStatsCollector(); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> service.client(projectIdForClusterClient(), "another_client", "repo", statsCollector) + () -> service.client(projectIdForClusterClient(), "another_client", "repo", statsCollector, randomRetryBehaviour()) ); assertThat(e.getMessage(), Matchers.startsWith("Unknown client name")); assertSettingDeprecationsAndWarnings( new Setting[] { GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING.getConcreteSettingForNamespace(clientName) } ); - final var storage = service.client(projectIdForClusterClient(), clientName, "repo", statsCollector); + final var storage = service.client(projectIdForClusterClient(), clientName, "repo", statsCollector, ClientConfigured); assertThat(storage.getOptions().getApplicationName(), Matchers.containsString(applicationName)); assertThat(storage.getOptions().getHost(), Matchers.is(endpoint)); assertThat(storage.getOptions().getProjectId(), Matchers.is(projectIdName)); assertThat(storage.getOptions().getTransportOptions(), Matchers.instanceOf(HttpTransportOptions.class)); + assertThat(storage.getOptions().getRetrySettings().getMaxAttempts(), equalTo(maxRetries + 1)); assertThat( ((HttpTransportOptions) storage.getOptions().getTransportOptions()).getConnectTimeout(), Matchers.is((int) connectTimeValue.millis()) @@ -125,18 +129,19 @@ public void testReinitClientSettings() throws Exception { ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()) ); when(pluginServices.projectResolver()).thenReturn(TestProjectResolvers.DEFAULT_PROJECT_ONLY); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { plugin.createComponents(pluginServices); final GoogleCloudStorageService storageService = plugin.storageService.get(); var statsCollector = new GcsRepositoryStatsCollector(); - final var client11 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector); + final var client11 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector, retryBehaviour); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); - final var client12 = storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector); + final var client12 = storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector, retryBehaviour); assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // client 3 is missing final IllegalArgumentException e1 = expectThrows( IllegalArgumentException.class, - () -> storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector) + () -> storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector, retryBehaviour) ); assertThat(e1.getMessage(), containsString("Unknown client name [gcs3].")); // update client settings @@ -144,18 +149,18 @@ public void testReinitClientSettings() throws Exception { // old client 1 not changed assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); // new client 1 is changed - final var client21 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector); + final var client21 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector, retryBehaviour); assertThat(client21.getOptions().getProjectId(), equalTo("project_gcs21")); // old client 2 not changed assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // new client2 is gone final IllegalArgumentException e2 = expectThrows( IllegalArgumentException.class, - () -> storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector) + () -> storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector, retryBehaviour) ); assertThat(e2.getMessage(), containsString("Unknown client name [gcs2].")); // client 3 emerged - final var client23 = storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector); + final var client23 = storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector, retryBehaviour); assertThat(client23.getOptions().getProjectId(), equalTo("project_gcs23")); } } @@ -173,23 +178,27 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { plugin.createComponents(pluginServices); final GoogleCloudStorageService storageService = plugin.storageService.get(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); final MeteredStorage repo1Client = storageService.client( projectIdForClusterClient(), "gcs1", "repo1", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); final MeteredStorage repo2Client = storageService.client( projectIdForClusterClient(), "gcs1", "repo2", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); final MeteredStorage repo1ClientSecondInstance = storageService.client( projectIdForClusterClient(), "gcs1", "repo1", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); assertNotSame(repo1Client, repo2Client); @@ -240,4 +249,8 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte private ProjectId projectIdForClusterClient() { return randomBoolean() ? ProjectId.DEFAULT : null; } + + private static GoogleCloudStorageService.RetryBehaviour randomRetryBehaviour() { + return randomFrom(GoogleCloudStorageService.RetryBehaviour.values()); + } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 18c3f2d6b194b..0d35365e9f011 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -36,7 +36,7 @@ * * See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue */ -class S3RetryingInputStream extends RetryingInputStream { +class S3RetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class); @@ -49,10 +49,10 @@ class S3RetryingInputStream extends RetryingInputStream { super(new S3BlobStoreServices(blobStore, blobKey, purpose), purpose, start, end); } - private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose) implements BlobStoreServices { + private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose) implements BlobStoreServices { @Override - public S3SingleAttemptInputStream getInputStream(long start, long end) throws IOException { + public S3SingleAttemptInputStream getInputStream(Void version, long start, long end) throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { final var getObjectRequestBuilder = GetObjectRequest.builder().bucket(blobStore.bucket()).key(blobKey); configureRequestForMetrics(getObjectRequestBuilder, blobStore, Operation.GET_OBJECT, purpose); @@ -171,7 +171,7 @@ private static long tryGetStreamLength(GetObjectResponse getObjectResponse, long return getObjectResponse.contentLength(); } - private static class S3SingleAttemptInputStream extends SingleAttemptInputStream { + private static class S3SingleAttemptInputStream extends SingleAttemptInputStream { private final ResponseInputStream responseStream; private final long start; @@ -281,6 +281,11 @@ private long tryGetStreamLength(GetObjectResponse response) { protected long getFirstOffset() { return start; } + + @Override + protected Void getVersion() { + return null; + } } // exposed for testing diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index 483c63b34079f..0042e5fed4d95 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import java.io.IOException; @@ -23,30 +24,31 @@ import static org.elasticsearch.core.Strings.format; -public abstract class RetryingInputStream extends InputStream { +public abstract class RetryingInputStream extends InputStream { private static final Logger logger = LogManager.getLogger(RetryingInputStream.class); public static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - private final BlobStoreServices blobStoreServices; + private final BlobStoreServices blobStoreServices; private final OperationPurpose purpose; private final long start; private final long end; private final List failures; - protected SingleAttemptInputStream currentStream; + protected SingleAttemptInputStream currentStream; private long offset = 0; private int attempt = 1; private int failuresAfterMeaningfulProgress = 0; private boolean closed = false; - protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose) throws IOException { + protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose) throws IOException { this(blobStoreServices, purpose, 0L, Long.MAX_VALUE - 1L); } @SuppressWarnings("this-escape") // TODO: We can do better than this but I don't want to touch the tests for the first implementation - protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose, long start, long end) throws IOException { + protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose, long start, long end) + throws IOException { if (start < 0L) { throw new IllegalArgumentException("start must be non-negative"); } @@ -69,7 +71,11 @@ private void openStreamWithRetry() throws IOException { assert start + offset <= end : "requesting beyond end, start = " + start + " offset=" + offset + " end=" + end; } try { - currentStream = blobStoreServices.getInputStream(Math.addExact(start, offset), end); + currentStream = blobStoreServices.getInputStream( + currentStream != null ? currentStream.getVersion() : null, + Math.addExact(start, offset), + end + ); return; } catch (NoSuchFileException | RequestedRangeNotSatisfiedException e) { throw e; @@ -279,12 +285,15 @@ private T addSuppressedExceptions(T e) { /** * This implements all the behavior that is blob-store-specific + * + * @param The type of the version used */ - protected interface BlobStoreServices { + protected interface BlobStoreServices { /** - * Get an input stream for the blob at the given position + * Get an input stream for the given version * + * @param version The version to request, or null if the latest version should be used * @param start The start of the range to read, inclusive * @param end The end of the range to read, exclusive, or {@code Long.MAX_VALUE - 1} if the end of the blob should be used * @return An input stream for the given version @@ -292,7 +301,7 @@ protected interface BlobStoreServices { * @throws NoSuchFileException if the blob does not exist, this is not retry-able * @throws RequestedRangeNotSatisfiedException if the requested range is not valid, this is not retry-able */ - SingleAttemptInputStream getInputStream(long start, long end) throws IOException; + SingleAttemptInputStream getInputStream(@Nullable V version, long start, long end) throws IOException; void onRetryStarted(String action); @@ -309,11 +318,20 @@ protected interface BlobStoreServices { * Represents an {@link InputStream} for a single attempt to read a blob. Each retry * will attempt to create a new one of these. If reading from it fails, it should not retry. */ - protected abstract static class SingleAttemptInputStream extends InputStream { + protected abstract static class SingleAttemptInputStream extends InputStream { /** * @return the offset of the first byte returned by this input stream */ protected abstract long getFirstOffset(); + + /** + * Get the version of this input stream + */ + protected abstract V getVersion(); + } + + public static boolean willRetry(OperationPurpose purpose) { + return purpose != OperationPurpose.REPOSITORY_ANALYSIS; } } diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java index 93f7acdb94996..34cb5b996ee33 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Streams; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; @@ -34,15 +35,17 @@ public void testRetryableErrorsWhenReadingAreRetried() throws IOException { final var retryableFailures = randomIntBetween(1, 5); final var failureCounter = new AtomicInteger(retryableFailures); final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(retryableFailures * 2) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; - byte[] results = copyToBytes(new RetryingInputStream(services, randomRetryingPurpose()) { + byte[] results = copyToBytes(new RetryingInputStream<>(services, randomRetryingPurpose()) { }); assertEquals(resourceBytes.length(), results.length); assertEquals(resourceBytes, new BytesArray(results)); @@ -53,17 +56,19 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, public void testReadWillFailWhenRetryableErrorsExceedMaxRetries() { final var maxRetries = randomIntBetween(1, 5); final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(10, 100)).getBytes()); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(maxRetries) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, true); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true); } }; final var ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, randomFiniteRetryingPurpose()) { + () -> copyToBytes(new RetryingInputStream<>(services, randomFiniteRetryingPurpose()) { }) ); assertEquals("This is retry-able", ioException.getMessage()); @@ -74,17 +79,19 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, public void testReadWillFailWhenRetryableErrorsOccurDuringRepositoryAnalysis() { final var maxRetries = randomIntBetween(2, 5); final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(maxRetries) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, true); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true); } }; final var ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, OperationPurpose.REPOSITORY_ANALYSIS) { + () -> copyToBytes(new RetryingInputStream<>(services, OperationPurpose.REPOSITORY_ANALYSIS) { }) ); assertEquals("This is retry-able", ioException.getMessage()); @@ -96,15 +103,17 @@ public void testReadWillRetryIndefinitelyWhenErrorsOccurDuringIndicesOperation() final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); final int numberOfFailures = randomIntBetween(1, 10); final AtomicInteger failureCounter = new AtomicInteger(numberOfFailures); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(0) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; - byte[] result = copyToBytes(new RetryingInputStream(services, OperationPurpose.INDICES) { + byte[] result = copyToBytes(new RetryingInputStream<>(services, OperationPurpose.INDICES) { }); assertEquals(resourceBytes, new BytesArray(result)); assertEquals(numberOfFailures + 1, services.getAttempts()); @@ -117,13 +126,15 @@ public void testRetriesWillBeExtendedWhenMeaningfulProgressIsMade() { final var meaningfulProgressSize = randomIntBetween(1024, 4096); final var meaningfulProgressAttempts = randomIntBetween(1, 3); final var meaningfulProgressAttemptsCounter = new AtomicInteger(meaningfulProgressAttempts); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(maxRetries) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { final var inputStream = meaningfulProgressAttemptsCounter.decrementAndGet() > 0 - ? new FailureAtIndexInputStream(resourceBytes, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) - : new FailureAtIndexInputStream(resourceBytes, (int) start, true, 1, meaningfulProgressSize - 1); + ? new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) + : new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true, 1, meaningfulProgressSize - 1); return inputStream; } @@ -135,7 +146,7 @@ public long getMeaningfulProgressSize() { final var ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, randomFiniteRetryingPurpose()) { + () -> copyToBytes(new RetryingInputStream<>(services, randomFiniteRetryingPurpose()) { }) ); assertEquals("This is retry-able", ioException.getMessage()); @@ -154,7 +165,8 @@ public void testNoSuchFileExceptionAndRangeNotSatisfiedTerminatesWithoutRetry() final var services = new BlobStoreServicesAdapter(retryableFailures * 2) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { if (failureCounter.getAndDecrement() > 0) { throw new RuntimeException("This is retry-able"); } @@ -163,7 +175,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, }; final IOException ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, randomRetryingPurpose()) { + () -> copyToBytes(new RetryingInputStream<>(services, randomRetryingPurpose()) { }) ); assertSame(notRetriableException, ioException); @@ -172,6 +184,29 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, assertThat(services.getRetrySucceeded(), empty()); } + public void testBlobVersionIsRequestedForSecondAndSubsequentAttempts() throws IOException { + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final int numberOfFailures = randomIntBetween(1, 10); + final AtomicInteger failureCounter = new AtomicInteger(numberOfFailures); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(0) { + @Override + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + if (getAttempts() > 1) { + assertEquals(eTag, version); + } else { + assertNull(version); + } + return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); + } + }; + + copyToBytes(new RetryingInputStream<>(services, OperationPurpose.INDICES) { + }); + } + private static byte[] copyToBytes(InputStream inputStream) throws IOException { final var outputStream = new ByteArrayOutputStream(); if (randomBoolean()) { @@ -188,7 +223,7 @@ private static byte[] copyToBytes(InputStream inputStream) throws IOException { return outputStream.toByteArray(); } - private abstract static class BlobStoreServicesAdapter implements RetryingInputStream.BlobStoreServices { + private abstract static class BlobStoreServicesAdapter implements RetryingInputStream.BlobStoreServices { private final AtomicInteger attemptCounter = new AtomicInteger(); private final List retryStarted = new ArrayList<>(); @@ -199,12 +234,18 @@ private BlobStoreServicesAdapter(int maxRetries) { this.maxRetries = maxRetries; } - public final RetryingInputStream.SingleAttemptInputStream getInputStream(long start, long end) throws IOException { + @Override + public final RetryingInputStream.SingleAttemptInputStream getInputStream(@Nullable String version, long start, long end) + throws IOException { attemptCounter.incrementAndGet(); - return doGetInputStream(start, end); + return doGetInputStream(version, start, end); } - protected abstract RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException; + protected abstract RetryingInputStream.SingleAttemptInputStream doGetInputStream( + @Nullable String version, + long start, + long end + ) throws IOException; @Override public void onRetryStarted(String action) { @@ -246,18 +287,21 @@ public List getRetrySucceeded() { } } - private static class FailureAtIndexInputStream extends RetryingInputStream.SingleAttemptInputStream { + private static class FailureAtIndexInputStream extends RetryingInputStream.SingleAttemptInputStream { private final long firstOffset; + private final String version; private final InputStream delegate; private int readRemaining; - private FailureAtIndexInputStream(BytesReference bytesReference, int startIndex, boolean failBeforeEnd) throws IOException { - this(bytesReference, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); + private FailureAtIndexInputStream(BytesReference bytesReference, String version, int startIndex, boolean failBeforeEnd) + throws IOException { + this(bytesReference, version, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); } private FailureAtIndexInputStream( BytesReference bytesReference, + String version, int startIndex, boolean failBeforeEnd, int minimumSuccess, @@ -271,6 +315,7 @@ private FailureAtIndexInputStream( this.readRemaining = Integer.MAX_VALUE; } this.firstOffset = startIndex; + this.version = version; } @Override @@ -292,6 +337,11 @@ public String toString() { protected long getFirstOffset() { return firstOffset; } + + @Override + protected String getVersion() { + return version; + } } public static OperationPurpose randomRetryingPurpose() { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 8cd3aa1b15dfc..8cb984e289fbd 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -16,6 +16,8 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.RetryingInputStream; import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -524,4 +526,8 @@ private Set listChildren(BlobPath path) { protected BlobStoreRepository getRepository() { return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository(TEST_REPO_NAME); } + + public static OperationPurpose randomRetryingPurpose() { + return randomFrom(Arrays.stream(OperationPurpose.values()).filter(RetryingInputStream::willRetry).toArray(OperationPurpose[]::new)); + } } From 709fdf12685b95bed5b7aba0699b20d3f1a2436c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 25 Nov 2025 21:51:26 +1100 Subject: [PATCH 02/15] Use FilterInputStream for SingleAttemptInputStream (delegate close etc.) --- ...GoogleCloudStorageRetryingInputStream.java | 3 +-- .../s3/S3RetryingInputStream.java | 24 +++++++++---------- .../common/blobstore/RetryingInputStream.java | 7 +++++- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index f5c06b998ee18..5ab37fc832787 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -169,7 +169,6 @@ private static Long parseGenerationHeader(HttpResponse response) { // We have to implement our own validation logic here static final class ContentLengthValidatingInputStream extends SingleAttemptInputStream { - private final InputStream in; private final long firstOffset; private final long generation; private final long contentLength; @@ -181,7 +180,7 @@ static final class ContentLengthValidatingInputStream extends SingleAttemptInput } ContentLengthValidatingInputStream(InputStream in, long firstOffset, Long generation, long contentLength) { - this.in = in; + super(in); this.firstOffset = firstOffset; this.generation = generation; this.contentLength = contentLength; diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 0d35365e9f011..780dc20bbadd3 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -8,12 +8,6 @@ */ package org.elasticsearch.repositories.s3; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.core.exception.SdkServiceException; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; @@ -23,6 +17,12 @@ import org.elasticsearch.repositories.s3.S3BlobStore.Operation; import org.elasticsearch.rest.RestStatus; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Map; @@ -173,7 +173,6 @@ private static long tryGetStreamLength(GetObjectResponse getObjectResponse, long private static class S3SingleAttemptInputStream extends SingleAttemptInputStream { - private final ResponseInputStream responseStream; private final long start; private final long end; private final long lastOffset; @@ -183,7 +182,7 @@ private static class S3SingleAttemptInputStream extends SingleAttemptInputStream private boolean aborted; private S3SingleAttemptInputStream(ResponseInputStream responseStream, long start, long end) { - this.responseStream = responseStream; + super(responseStream); this.start = start; this.end = end; lastOffset = getStreamLength(responseStream.response(), start, end); @@ -192,7 +191,7 @@ private S3SingleAttemptInputStream(ResponseInputStream respon @Override public int read() throws IOException { ensureOpen(); - int result = responseStream.read(); + int result = in.read(); if (result == -1) { eof = true; } else { @@ -204,7 +203,7 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { ensureOpen(); - final int bytesRead = responseStream.read(b, off, len); + final int bytesRead = in.read(b, off, len); if (bytesRead == -1) { eof = true; } else { @@ -221,11 +220,12 @@ private void ensureOpen() { } } + @SuppressWarnings("unchecked") @Override public void close() throws IOException { - maybeAbort(responseStream); + maybeAbort((ResponseInputStream) in); try { - responseStream.close(); + in.close(); } finally { closed = true; } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index 0042e5fed4d95..2f5ad92ccf4da 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -16,6 +16,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; @@ -318,7 +319,11 @@ protected interface BlobStoreServices { * Represents an {@link InputStream} for a single attempt to read a blob. Each retry * will attempt to create a new one of these. If reading from it fails, it should not retry. */ - protected abstract static class SingleAttemptInputStream extends InputStream { + protected abstract static class SingleAttemptInputStream extends FilterInputStream { + + protected SingleAttemptInputStream(InputStream in) { + super(in); + } /** * @return the offset of the first byte returned by this input stream From f0fca997ceea1f1a5994c91e66b5b5fe95476e20 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 25 Nov 2025 22:12:57 +1100 Subject: [PATCH 03/15] Account for bytes skipped in offset --- .../s3/S3RetryingInputStream.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 780dc20bbadd3..b73d351aee075 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -8,6 +8,12 @@ */ package org.elasticsearch.repositories.s3; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; @@ -17,12 +23,6 @@ import org.elasticsearch.repositories.s3.S3BlobStore.Operation; import org.elasticsearch.rest.RestStatus; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.core.exception.SdkServiceException; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; - import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Map; @@ -250,10 +250,12 @@ private void maybeAbort(ResponseInputStream stream) { } @Override - public long skip(long n) throws IOException { + public long skip(long len) throws IOException { // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, - // so for now we will rely on the default implementation which just discards bytes by reading. - return super.skip(n); + // so for now we will just delegate to the underlying stream. + final long n = in.skip(len); + offset += n; + return n; } @Override From a078ff8be95125eda743ad7292792a83b322267f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 12:22:14 +1100 Subject: [PATCH 04/15] Make SingleAttemptInputStream a decorator --- ...GoogleCloudStorageRetryingInputStream.java | 28 ++---- .../s3/S3RetryingInputStream.java | 43 ++++------ .../common/blobstore/RetryingInputStream.java | 28 +++++- .../blobstore/RetryingInputStreamTests.java | 85 ++++++++++--------- 4 files changed, 89 insertions(+), 95 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index 5ab37fc832787..6f5f774a1de52 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -20,6 +20,7 @@ import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.rest.RestStatus; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; @@ -88,9 +89,9 @@ public SingleAttemptInputStream getInputStream(@Nullable Long lastGenerati final Long contentLength = resp.getHeaders().getContentLength(); InputStream content = resp.getContent(); if (contentLength != null) { - return new ContentLengthValidatingInputStream(content, start, lastGeneration, contentLength); + content = new ContentLengthValidatingInputStream(content, contentLength); } - return new ContentLengthValidatingInputStream(content, start, lastGeneration); + return new SingleAttemptInputStream<>(content, start, lastGeneration); } catch (IOException e) { throw StorageException.translate(e); } @@ -167,22 +168,13 @@ private static Long parseGenerationHeader(HttpResponse response) { // Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream // We have to implement our own validation logic here - static final class ContentLengthValidatingInputStream extends SingleAttemptInputStream { - - private final long firstOffset; - private final long generation; + static final class ContentLengthValidatingInputStream extends FilterInputStream { private final long contentLength; private long read = 0L; - ContentLengthValidatingInputStream(InputStream in, long firstOffset, Long generation) { - this(in, firstOffset, generation, -1L); - } - - ContentLengthValidatingInputStream(InputStream in, long firstOffset, Long generation, long contentLength) { + ContentLengthValidatingInputStream(InputStream in, long contentLength) { super(in); - this.firstOffset = firstOffset; - this.generation = generation; this.contentLength = contentLength; } @@ -220,16 +212,6 @@ private void checkContentLengthOnEOF() throws IOException { throw new IOException("Connection closed prematurely: read = " + read + ", Content-Length = " + contentLength); } } - - @Override - protected long getFirstOffset() { - return firstOffset; - } - - @Override - protected Long getVersion() { - return generation; - } } /** diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index b73d351aee075..c21a62ecf7ae8 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.Map; @@ -52,7 +53,7 @@ class S3RetryingInputStream extends RetryingInputStream { private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose) implements BlobStoreServices { @Override - public S3SingleAttemptInputStream getInputStream(Void version, long start, long end) throws IOException { + public SingleAttemptInputStream getInputStream(Void version, long start, long end) throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { final var getObjectRequestBuilder = GetObjectRequest.builder().bucket(blobStore.bucket()).key(blobKey); configureRequestForMetrics(getObjectRequestBuilder, blobStore, Operation.GET_OBJECT, purpose); @@ -62,7 +63,7 @@ public S3SingleAttemptInputStream getInputStream(Void version, long start, long } final var getObjectRequest = getObjectRequestBuilder.build(); final var getObjectResponse = clientReference.client().getObject(getObjectRequest); - return new S3SingleAttemptInputStream(getObjectResponse, start, end); + return new SingleAttemptInputStream<>(new S3SingleAttemptInputStream(getObjectResponse, start, end), start, null); } catch (SdkException e) { if (e instanceof SdkServiceException sdkServiceException) { if (sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) { @@ -171,8 +172,9 @@ private static long tryGetStreamLength(GetObjectResponse getObjectResponse, long return getObjectResponse.contentLength(); } - private static class S3SingleAttemptInputStream extends SingleAttemptInputStream { + private static class S3SingleAttemptInputStream extends InputStream { + private final ResponseInputStream responseStream; private final long start; private final long end; private final long lastOffset; @@ -182,7 +184,7 @@ private static class S3SingleAttemptInputStream extends SingleAttemptInputStream private boolean aborted; private S3SingleAttemptInputStream(ResponseInputStream responseStream, long start, long end) { - super(responseStream); + this.responseStream = responseStream; this.start = start; this.end = end; lastOffset = getStreamLength(responseStream.response(), start, end); @@ -191,7 +193,7 @@ private S3SingleAttemptInputStream(ResponseInputStream respon @Override public int read() throws IOException { ensureOpen(); - int result = in.read(); + int result = responseStream.read(); if (result == -1) { eof = true; } else { @@ -203,7 +205,7 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { ensureOpen(); - final int bytesRead = in.read(b, off, len); + final int bytesRead = responseStream.read(b, off, len); if (bytesRead == -1) { eof = true; } else { @@ -220,12 +222,11 @@ private void ensureOpen() { } } - @SuppressWarnings("unchecked") @Override public void close() throws IOException { - maybeAbort((ResponseInputStream) in); + maybeAbort(responseStream); try { - in.close(); + responseStream.close(); } finally { closed = true; } @@ -250,12 +251,10 @@ private void maybeAbort(ResponseInputStream stream) { } @Override - public long skip(long len) throws IOException { + public long skip(long n) throws IOException { // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, - // so for now we will just delegate to the underlying stream. - final long n = in.skip(len); - offset += n; - return n; + // so for now we will rely on the default implementation which just discards bytes by reading. + return super.skip(n); } @Override @@ -278,30 +277,20 @@ private boolean isAborted() { private long tryGetStreamLength(GetObjectResponse response) { return S3RetryingInputStream.tryGetStreamLength(response, start, end); } - - @Override - protected long getFirstOffset() { - return start; - } - - @Override - protected Void getVersion() { - return null; - } } // exposed for testing boolean isEof() { - return ((S3SingleAttemptInputStream) currentStream).isEof(); + return currentStream.unwrap(S3SingleAttemptInputStream.class).isEof(); } // exposed for testing boolean isAborted() { - return ((S3SingleAttemptInputStream) currentStream).isAborted(); + return currentStream.unwrap(S3SingleAttemptInputStream.class).isAborted(); } // exposed for testing long tryGetStreamLength(GetObjectResponse getObjectResponse) { - return ((S3SingleAttemptInputStream) currentStream).tryGetStreamLength(getObjectResponse); + return currentStream.unwrap(S3SingleAttemptInputStream.class).tryGetStreamLength(getObjectResponse); } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index 2f5ad92ccf4da..bfa19ea712449 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -319,21 +319,41 @@ protected interface BlobStoreServices { * Represents an {@link InputStream} for a single attempt to read a blob. Each retry * will attempt to create a new one of these. If reading from it fails, it should not retry. */ - protected abstract static class SingleAttemptInputStream extends FilterInputStream { + protected static final class SingleAttemptInputStream extends FilterInputStream { - protected SingleAttemptInputStream(InputStream in) { + private final long firstOffset; + private final V version; + + public SingleAttemptInputStream(InputStream in, long firstOffset, V version) { super(in); + this.firstOffset = firstOffset; + this.version = version; } /** * @return the offset of the first byte returned by this input stream */ - protected abstract long getFirstOffset(); + public long getFirstOffset() { + return firstOffset; + } /** * Get the version of this input stream */ - protected abstract V getVersion(); + public V getVersion() { + return version; + } + + /** + * Unwrap the underlying input stream + * + * @param clazz The expected class of the underlying input stream + * @return The underlying input stream + * @param The type of the underlying input stream + */ + public T unwrap(Class clazz) { + return clazz.cast(in); + } } public static boolean willRetry(OperationPurpose purpose) { diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java index 34cb5b996ee33..91ad66296ba33 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java @@ -41,7 +41,7 @@ public void testRetryableErrorsWhenReadingAreRetried() throws IOException { @Override public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; @@ -62,7 +62,7 @@ public void testReadWillFailWhenRetryableErrorsExceedMaxRetries() { @Override public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true); + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true); } }; @@ -85,7 +85,7 @@ public void testReadWillFailWhenRetryableErrorsOccurDuringRepositoryAnalysis() { @Override public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true); + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true); } }; @@ -109,7 +109,7 @@ public void testReadWillRetryIndefinitelyWhenErrorsOccurDuringIndicesOperation() @Override public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; @@ -133,8 +133,8 @@ public void testRetriesWillBeExtendedWhenMeaningfulProgressIsMade() { public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) throws IOException { final var inputStream = meaningfulProgressAttemptsCounter.decrementAndGet() > 0 - ? new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) - : new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, true, 1, meaningfulProgressSize - 1); + ? createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) + : createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true, 1, meaningfulProgressSize - 1); return inputStream; } @@ -199,7 +199,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nu } else { assertNull(version); } - return new FailureAtIndexInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; @@ -287,42 +287,50 @@ public List getRetrySucceeded() { } } - private static class FailureAtIndexInputStream extends RetryingInputStream.SingleAttemptInputStream { + private static RetryingInputStream.SingleAttemptInputStream createSingleAttemptInputStream( + BytesReference bytesReference, + String version, + int startIndex, + boolean failBeforeEnd + ) throws IOException { + return createSingleAttemptInputStream(bytesReference, version, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); + } + + private static RetryingInputStream.SingleAttemptInputStream createSingleAttemptInputStream( + BytesReference bytesReference, + String version, + int startIndex, + boolean failBeforeEnd, + int minimumSuccess, + int maximumSuccess + ) throws IOException { + if (failBeforeEnd) { + return new RetryingInputStream.SingleAttemptInputStream<>( + new FailureAtIndexInputStream(bytesReference, startIndex, minimumSuccess, maximumSuccess), + startIndex, + version + ); + } + return new RetryingInputStream.SingleAttemptInputStream<>(inputStreamAtIndex(bytesReference, startIndex), startIndex, version); + } - private final long firstOffset; - private final String version; - private final InputStream delegate; + private static class FailureAtIndexInputStream extends InputStream { + + private final InputStream inputStream; private int readRemaining; - private FailureAtIndexInputStream(BytesReference bytesReference, String version, int startIndex, boolean failBeforeEnd) + private FailureAtIndexInputStream(BytesReference bytesReference, int startIndex, int minimumSuccess, int maximumSuccess) throws IOException { - this(bytesReference, version, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); - } - - private FailureAtIndexInputStream( - BytesReference bytesReference, - String version, - int startIndex, - boolean failBeforeEnd, - int minimumSuccess, - int maximumSuccess - ) throws IOException { + this.inputStream = inputStreamAtIndex(bytesReference, startIndex); final int remainingBytes = bytesReference.length() - startIndex; - this.delegate = bytesReference.slice(startIndex, remainingBytes).streamInput(); - if (failBeforeEnd) { - this.readRemaining = randomIntBetween(Math.max(1, minimumSuccess), Math.min(maximumSuccess, remainingBytes / 2)); - } else { - this.readRemaining = Integer.MAX_VALUE; - } - this.firstOffset = startIndex; - this.version = version; + this.readRemaining = randomIntBetween(Math.max(1, minimumSuccess), Math.min(maximumSuccess, remainingBytes / 2)); } @Override public int read() throws IOException { if (readRemaining > 0) { readRemaining--; - return delegate.read(); + return inputStream.read(); } else { throw new IOException("This is retry-able"); } @@ -332,16 +340,11 @@ public int read() throws IOException { public String toString() { return "Failing after " + readRemaining; } + } - @Override - protected long getFirstOffset() { - return firstOffset; - } - - @Override - protected String getVersion() { - return version; - } + private static InputStream inputStreamAtIndex(BytesReference bytesReference, int startIndex) throws IOException { + final int remainingBytes = bytesReference.length() - startIndex; + return bytesReference.slice(startIndex, remainingBytes).streamInput(); } public static OperationPurpose randomRetryingPurpose() { From 39f8db66ad55fef3c83946ded8ed3105cc64f7e5 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 14:24:37 +1100 Subject: [PATCH 05/15] Allow CSPs to define what is retry-able --- ...GoogleCloudStorageRetryingInputStream.java | 11 ++- .../gcs/GoogleCloudStorageService.java | 4 +- ...CloudStorageBlobContainerRetriesTests.java | 2 +- .../s3/S3RetryingInputStream.java | 16 +++-- .../common/blobstore/RetryingInputStream.java | 68 +++++++++++++------ .../blobstore/RetryingInputStreamTests.java | 32 ++++++--- 6 files changed, 94 insertions(+), 39 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index 6f5f774a1de52..b58a7ac867c67 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -11,6 +11,7 @@ import com.google.api.client.http.HttpResponse; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageRetryStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,6 +34,7 @@ class GoogleCloudStorageRetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class); + private static final StorageRetryStrategy STORAGE_RETRY_STRATEGY = GoogleCloudStorageService.createStorageRetryStrategy(); // Used for testing only GoogleCloudStorageRetryingInputStream(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) @@ -123,12 +125,12 @@ public SingleAttemptInputStream getInputStream(@Nullable Long lastGenerati } @Override - public void onRetryStarted(String action) { + public void onRetryStarted(StreamAction action) { // No retry metrics for GCS } @Override - public void onRetrySucceeded(String action, long numberOfRetries) { + public void onRetrySucceeded(StreamAction action, long numberOfRetries) { // No retry metrics for GCS } @@ -146,6 +148,11 @@ public int getMaxRetries() { public String getBlobDescription() { return blobId.toString(); } + + @Override + public boolean isRetryableException(StreamAction action, Exception e) { + return STORAGE_RETRY_STRATEGY.getIdempotentHandler().shouldRetry(e, null); + } } private static Long parseGenerationHeader(HttpResponse response) { diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index b7f8423f53c2a..c175a991e5fa9 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -219,7 +219,7 @@ StorageOptions createStorageOptions( final RetryBehaviour retryBehaviour ) { final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder() - .setStorageRetryStrategy(getRetryStrategy()) + .setStorageRetryStrategy(createStorageRetryStrategy()) .setTransportOptions(httpTransportOptions) .setHeaderProvider(() -> { return Strings.hasLength(gcsClientSettings.getApplicationName()) @@ -275,7 +275,7 @@ StorageOptions createStorageOptions( return storageOptionsBuilder.build(); } - protected StorageRetryStrategy getRetryStrategy() { + static StorageRetryStrategy createStorageRetryStrategy() { return ShouldRetryDecorator.decorate( StorageRetryStrategy.getLegacyStorageRetryStrategy(), (Throwable prevThrowable, Object prevResponse, ResultRetryAlgorithm delegate) -> { diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 48cf6b5d96d90..b3beba4c74d36 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -203,7 +203,7 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser .setMaxRpcTimeout(Duration.ofSeconds(1)) .setMaxAttempts(options.getRetrySettings().getMaxAttempts()); return options.toBuilder() - .setStorageRetryStrategy(getRetryStrategy()) + .setStorageRetryStrategy(createStorageRetryStrategy()) .setHost(options.getHost()) .setCredentials(options.getCredentials()) .setRetrySettings(retrySettingsBuilder.build()) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index c21a62ecf7ae8..5508fb3a03149 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -83,12 +83,12 @@ public SingleAttemptInputStream getInputStream(Void version, long start, l } @Override - public void onRetryStarted(String action) { + public void onRetryStarted(StreamAction action) { blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes(action)); } @Override - public void onRetrySucceeded(String action, long numberOfRetries) { + public void onRetrySucceeded(StreamAction action, long numberOfRetries) { final Map attributes = metricAttributes(action); blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); @@ -109,7 +109,15 @@ public String getBlobDescription() { return blobStore.bucket() + "/" + blobKey; } - private Map metricAttributes(String action) { + @Override + public boolean isRetryableException(StreamAction action, Exception e) { + return switch (action) { + case OPEN -> e instanceof RuntimeException; + case READ -> e instanceof IOException; + }; + } + + private Map metricAttributes(StreamAction action) { return Map.of( "repo_type", S3Repository.TYPE, @@ -120,7 +128,7 @@ private Map metricAttributes(String action) { "purpose", purpose.getKey(), "action", - action + action.toString() ); } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index bfa19ea712449..b6ede3748e1a9 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.OPEN; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.READ; import static org.elasticsearch.core.Strings.format; public abstract class RetryingInputStream extends InputStream { @@ -31,6 +33,25 @@ public abstract class RetryingInputStream extends InputStream { public static final int MAX_SUPPRESSED_EXCEPTIONS = 10; + public enum StreamAction { + OPEN("opening"), + READ("reading"); + + private final String verb; + + StreamAction(String verb) { + this.verb = verb; + } + + public String getVerb() { + return verb; + } + + public String toString() { + return name().toLowerCase(); + } + } + private final BlobStoreServices blobStoreServices; private final OperationPurpose purpose; private final long start; @@ -63,7 +84,7 @@ protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationP this.end = end; final int initialAttempt = attempt; openStreamWithRetry(); - maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, OPEN); } private void openStreamWithRetry() throws IOException { @@ -71,6 +92,7 @@ private void openStreamWithRetry() throws IOException { if (offset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { assert start + offset <= end : "requesting beyond end, start = " + start + " offset=" + offset + " end=" + end; } + // noinspection TryWithIdenticalCatches try { currentStream = blobStoreServices.getInputStream( currentStream != null ? currentStream.getVersion() : null, @@ -81,15 +103,21 @@ private void openStreamWithRetry() throws IOException { } catch (NoSuchFileException | RequestedRangeNotSatisfiedException e) { throw e; } catch (RuntimeException e) { - if (attempt == 1) { - blobStoreServices.onRetryStarted("open"); - } - final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); - delayBeforeRetry(delayInMillis); + retryOrAbortOnOpen(e); + } catch (IOException e) { + retryOrAbortOnOpen(e); } } } + private void retryOrAbortOnOpen(T exception) throws T { + if (attempt == 1) { + blobStoreServices.onRetryStarted(StreamAction.OPEN); + } + final long delayInMillis = maybeLogAndComputeRetryDelay(StreamAction.OPEN, exception); + delayBeforeRetry(delayInMillis); + } + @Override public int read() throws IOException { ensureOpen(); @@ -100,11 +128,11 @@ public int read() throws IOException { if (result != -1) { offset += 1; } - maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, READ); return result; } catch (IOException e) { if (attempt == initialAttempt) { - blobStoreServices.onRetryStarted("read"); + blobStoreServices.onRetryStarted(READ); } reopenStreamOrFail(e); } @@ -121,11 +149,11 @@ public int read(byte[] b, int off, int len) throws IOException { if (bytesRead != -1) { offset += bytesRead; } - maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, READ); return bytesRead; } catch (IOException e) { if (attempt == initialAttempt) { - blobStoreServices.onRetryStarted("read"); + blobStoreServices.onRetryStarted(READ); } reopenStreamOrFail(e); } @@ -144,7 +172,7 @@ private void reopenStreamOrFail(IOException e) throws IOException { if (currentStreamProgress() >= meaningfulProgressSize) { failuresAfterMeaningfulProgress += 1; } - final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e); + final long delayInMillis = maybeLogAndComputeRetryDelay(READ, e); IOUtils.closeWhileHandlingException(currentStream); delayBeforeRetry(delayInMillis); @@ -153,8 +181,8 @@ private void reopenStreamOrFail(IOException e) throws IOException { // The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure // and compute the delay before retry. - private long maybeLogAndComputeRetryDelay(String action, T e) throws T { - if (shouldRetry(attempt) == false) { + private long maybeLogAndComputeRetryDelay(StreamAction action, T e) throws T { + if (blobStoreServices.isRetryableException(action, e) == false || shouldRetry(attempt) == false) { final var finalException = addSuppressedExceptions(e); logForFailure(action, finalException); throw finalException; @@ -170,11 +198,11 @@ private long maybeLogAndComputeRetryDelay(String action, T return delayInMillis; } - private void logForFailure(String action, Exception e) { + private void logForFailure(StreamAction action, Exception e) { logger.warn( () -> format( "failed %s [%s] at offset [%s] with purpose [%s]", - action, + action.getVerb(), blobStoreServices.getBlobDescription(), start + offset, purpose.getKey() @@ -183,7 +211,7 @@ private void logForFailure(String action, Exception e) { ); } - private void logForRetry(Level level, String action, Exception e) { + private void logForRetry(Level level, StreamAction action, Exception e) { logger.log( level, () -> format( @@ -205,7 +233,7 @@ private void logForRetry(Level level, String action, Exception e) { ); } - private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, StreamAction action) { if (attempt > initialAttempt) { final int numberOfRetries = attempt - initialAttempt; logger.info( @@ -304,15 +332,17 @@ protected interface BlobStoreServices { */ SingleAttemptInputStream getInputStream(@Nullable V version, long start, long end) throws IOException; - void onRetryStarted(String action); + void onRetryStarted(StreamAction action); - void onRetrySucceeded(String action, long numberOfRetries); + void onRetrySucceeded(StreamAction action, long numberOfRetries); long getMeaningfulProgressSize(); int getMaxRetries(); String getBlobDescription(); + + boolean isRetryableException(StreamAction action, Exception e); } /** diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java index 91ad66296ba33..ca8ea2fc5b71b 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.OPEN; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.READ; import static org.hamcrest.Matchers.empty; public class RetryingInputStreamTests extends ESTestCase { @@ -50,7 +52,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nu assertEquals(resourceBytes.length(), results.length); assertEquals(resourceBytes, new BytesArray(results)); assertEquals(retryableFailures + 1, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(retryableFailures).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(retryableFailures).toList(), services.getRetryStarted()); } public void testReadWillFailWhenRetryableErrorsExceedMaxRetries() { @@ -73,7 +75,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nu ); assertEquals("This is retry-able", ioException.getMessage()); assertEquals(maxRetries + 1, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(maxRetries + 1).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(maxRetries + 1).toList(), services.getRetryStarted()); } public void testReadWillFailWhenRetryableErrorsOccurDuringRepositoryAnalysis() { @@ -96,7 +98,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nu ); assertEquals("This is retry-able", ioException.getMessage()); assertEquals(1, services.getAttempts()); - assertEquals(List.of("read"), services.getRetryStarted()); + assertEquals(List.of(READ), services.getRetryStarted()); } public void testReadWillRetryIndefinitelyWhenErrorsOccurDuringIndicesOperation() throws IOException { @@ -117,7 +119,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nu }); assertEquals(resourceBytes, new BytesArray(result)); assertEquals(numberOfFailures + 1, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(numberOfFailures).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(numberOfFailures).toList(), services.getRetryStarted()); } public void testRetriesWillBeExtendedWhenMeaningfulProgressIsMade() { @@ -151,7 +153,7 @@ public long getMeaningfulProgressSize() { ); assertEquals("This is retry-able", ioException.getMessage()); assertEquals(maxRetries + meaningfulProgressAttempts, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(maxRetries + meaningfulProgressAttempts).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(maxRetries + meaningfulProgressAttempts).toList(), services.getRetryStarted()); } public void testNoSuchFileExceptionAndRangeNotSatisfiedTerminatesWithoutRetry() { @@ -180,7 +182,7 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nu ); assertSame(notRetriableException, ioException); assertEquals(retryableFailures + 1, services.getAttempts()); - assertEquals(List.of("open"), services.getRetryStarted()); + assertEquals(List.of(OPEN), services.getRetryStarted()); assertThat(services.getRetrySucceeded(), empty()); } @@ -226,7 +228,7 @@ private static byte[] copyToBytes(InputStream inputStream) throws IOException { private abstract static class BlobStoreServicesAdapter implements RetryingInputStream.BlobStoreServices { private final AtomicInteger attemptCounter = new AtomicInteger(); - private final List retryStarted = new ArrayList<>(); + private final List retryStarted = new ArrayList<>(); private final List retrySucceeded = new ArrayList<>(); private final int maxRetries; @@ -248,15 +250,23 @@ protected abstract RetryingInputStream.SingleAttemptInputStream doGetInp ) throws IOException; @Override - public void onRetryStarted(String action) { + public void onRetryStarted(RetryingInputStream.StreamAction action) { retryStarted.add(action); } @Override - public void onRetrySucceeded(String action, long numberOfRetries) { + public void onRetrySucceeded(RetryingInputStream.StreamAction action, long numberOfRetries) { retrySucceeded.add(new Success(action, numberOfRetries)); } + @Override + public boolean isRetryableException(RetryingInputStream.StreamAction action, Exception e) { + return switch (action) { + case OPEN -> e instanceof RuntimeException; + case READ -> e instanceof IOException; + }; + } + @Override public long getMeaningfulProgressSize() { return Long.MAX_VALUE; @@ -272,13 +282,13 @@ public String getBlobDescription() { return ""; } - record Success(String action, long numberOfRetries) {}; + record Success(RetryingInputStream.StreamAction action, long numberOfRetries) {}; public int getAttempts() { return attemptCounter.get(); } - public List getRetryStarted() { + public List getRetryStarted() { return retryStarted; } From 1f4b905d73f6e1de6e30b70ce9d3db3b8bce3b93 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 14:35:06 +1100 Subject: [PATCH 06/15] Remove forbidden API call (toLowerCase) --- .../common/blobstore/RetryingInputStream.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index b6ede3748e1a9..b50de93bbaf8e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -34,21 +34,23 @@ public abstract class RetryingInputStream extends InputStream { public static final int MAX_SUPPRESSED_EXCEPTIONS = 10; public enum StreamAction { - OPEN("opening"), - READ("reading"); + OPEN("open", "opening"), + READ("read", "reading"); - private final String verb; + private final String pastTense; + private final String presentTense; - StreamAction(String verb) { - this.verb = verb; + StreamAction(String pastTense, String presentTense) { + this.pastTense = pastTense; + this.presentTense = presentTense; } - public String getVerb() { - return verb; + public String getPastTense() { + return pastTense; } - public String toString() { - return name().toLowerCase(); + public String getPresentTense() { + return presentTense; } } @@ -202,7 +204,7 @@ private void logForFailure(StreamAction action, Exception e) { logger.warn( () -> format( "failed %s [%s] at offset [%s] with purpose [%s]", - action.getVerb(), + action.getPresentTense(), blobStoreServices.getBlobDescription(), start + offset, purpose.getKey() @@ -220,7 +222,7 @@ private void logForRetry(Level level, StreamAction action, Exception e) { this was attempt [%s] to read this blob which yielded [%s] bytes; in total \ [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \ retries; the maximum number of read attempts which do not make meaningful progress is [%s]""", - action, + action.getPresentTense(), blobStoreServices.getBlobDescription(), start + offset, purpose.getKey(), @@ -238,7 +240,7 @@ private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, StreamAction final int numberOfRetries = attempt - initialAttempt; logger.info( "successfully {} input stream for [{}] with purpose [{}] after [{}] retries", - action, + action.getPastTense(), blobStoreServices.getBlobDescription(), purpose.getKey(), numberOfRetries From b70d52ff459d11a498b3c734b018bf97d75ea3fd Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 14:47:20 +1100 Subject: [PATCH 07/15] Remove misleading comments --- .../repositories/gcs/GoogleCloudStorageRetryingInputStream.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index b58a7ac867c67..8bc896a039b7d 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -36,13 +36,11 @@ class GoogleCloudStorageRetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class); private static final StorageRetryStrategy STORAGE_RETRY_STRATEGY = GoogleCloudStorageService.createStorageRetryStrategy(); - // Used for testing only GoogleCloudStorageRetryingInputStream(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) throws IOException { this(blobStore, purpose, blobId, 0, Long.MAX_VALUE - 1); } - // Used for testing only GoogleCloudStorageRetryingInputStream( GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, From 5c03349142fbb04d4423c35f9ac20c0d45dd8389 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 14:50:38 +1100 Subject: [PATCH 08/15] Update main javadoc to indicate why this class exists --- .../gcs/GoogleCloudStorageRetryingInputStream.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index 8bc896a039b7d..7de6d05febb40 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -28,8 +28,9 @@ /** * Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred. - * This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing - * the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future. + *

+ * We make use of the retry logic from {@link RetryingInputStream}, which is slightly more sophisticated and tailored to our needs than + * the retry logic the GCS SDK provides by default. */ class GoogleCloudStorageRetryingInputStream extends RetryingInputStream { From 4981c7ed43c44d85351b3c13825d4847acaf1c93 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 14:55:24 +1100 Subject: [PATCH 09/15] Improve Javadoc for RetryingInputStream --- .../common/blobstore/RetryingInputStream.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index b50de93bbaf8e..a3f950e473d1f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -27,6 +27,19 @@ import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.READ; import static org.elasticsearch.core.Strings.format; +/** + * An {@link InputStream} that resumes downloads from the point at which they failed when a retry-able error occurs. + *

+ * This class implements some Elasticsearch-specific retry behavior, including: + *

    + *
  • Retrying indefinitely for {@link OperationPurpose#INDICES} operations
  • + *
  • Not retrying at all for {@link OperationPurpose#REPOSITORY_ANALYSIS} operations
  • + *
  • Extending retries if "meaningful" progress was made on the last attempt
  • + *
  • More detailed logging about the status of operations being retried
  • + *
+ * + * @param The type used to represent the version of a blob + */ public abstract class RetryingInputStream extends InputStream { private static final Logger logger = LogManager.getLogger(RetryingInputStream.class); From fabe579155fd6dca58baed49889aa3761d9c9ecc Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 15:53:19 +1100 Subject: [PATCH 10/15] Remove copied config, configure randomXXXPurpose correctly for GCP, translate read exceptions correctly --- ...eCloudStorageBlobStoreRepositoryTests.java | 2 +- .../GoogleCloudStorageThirdPartyTests.java | 1 + ...GoogleCloudStorageRetryingInputStream.java | 54 ++++++++++++------- ...CloudStorageBlobContainerRetriesTests.java | 20 +++++-- .../s3/S3BlobContainerRetriesTests.java | 7 +-- .../common/blobstore/RetryingInputStream.java | 29 ++++++---- .../AbstractThirdPartyRepositoryTestCase.java | 6 --- .../blobstore/BlobStoreTestUtil.java | 16 ++++++ 8 files changed, 90 insertions(+), 45 deletions(-) diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index b2d55169a7372..5aaab1c1aeaa9 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -59,8 +59,8 @@ import java.util.Map; import static org.elasticsearch.common.io.Streams.readFully; -import static org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase.randomRetryingPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomRetryingPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java index b85ddde004568..7dfce9afeca0b 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java @@ -34,6 +34,7 @@ import static org.elasticsearch.common.io.Streams.readFully; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomRetryingPurpose; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index 7de6d05febb40..defa2e385be10 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -9,6 +9,7 @@ package org.elasticsearch.repositories.gcs; import com.google.api.client.http.HttpResponse; +import com.google.cloud.BaseService; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageRetryStrategy; @@ -150,7 +151,10 @@ public String getBlobDescription() { @Override public boolean isRetryableException(StreamAction action, Exception e) { - return STORAGE_RETRY_STRATEGY.getIdempotentHandler().shouldRetry(e, null); + return switch (action) { + case OPEN -> BaseService.EXCEPTION_HANDLER.shouldRetry(e, null); + case READ -> STORAGE_RETRY_STRATEGY.getIdempotentHandler().shouldRetry(e, null); + }; } } @@ -185,32 +189,44 @@ static final class ContentLengthValidatingInputStream extends FilterInputStream } @Override - public int read(byte[] b, int off, int len) throws IOException { - final int n = in.read(b, off, len); - if (n == -1) { - checkContentLengthOnEOF(); - } else { - read += n; + public int read(byte[] b, int off, int len) { + try { + final int n = in.read(b, off, len); + if (n == -1) { + checkContentLengthOnEOF(); + } else { + read += n; + } + return n; + } catch (IOException e) { + throw StorageException.translate(e); } - return n; } @Override - public int read() throws IOException { - final int n = in.read(); - if (n == -1) { - checkContentLengthOnEOF(); - } else { - read++; + public int read() { + try { + final int n = in.read(); + if (n == -1) { + checkContentLengthOnEOF(); + } else { + read++; + } + return n; + } catch (IOException e) { + throw StorageException.translate(e); } - return n; } @Override - public long skip(long len) throws IOException { - final long n = in.skip(len); - read += n; - return n; + public long skip(long len) { + try { + final long n = in.skip(len); + read += n; + return n; + } catch (IOException e) { + throw StorageException.translate(e); + } } private void checkContentLengthOnEOF() throws IOException { diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index b3beba4c74d36..f04d37bf5c7cb 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.blobstore.OptionalBytesReference; import org.elasticsearch.common.blobstore.support.BlobContainerUtils; import org.elasticsearch.common.bytes.BytesArray; @@ -52,6 +53,7 @@ import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; @@ -192,16 +194,14 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser requestCountingHttpTransportOptions, retryBehaviour ); - final RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder() - .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) + final RetrySettings.Builder retrySettingsBuilder = options.getRetrySettings() + .toBuilder() .setInitialRetryDelay(Duration.ofMillis(10L)) .setRetryDelayMultiplier(1.0d) .setMaxRetryDelay(Duration.ofSeconds(1L)) .setJittered(false) .setInitialRpcTimeout(Duration.ofSeconds(1)) - .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) - .setMaxRpcTimeout(Duration.ofSeconds(1)) - .setMaxAttempts(options.getRetrySettings().getMaxAttempts()); + .setMaxRpcTimeout(Duration.ofSeconds(1)); return options.toBuilder() .setStorageRetryStrategy(createStorageRetryStrategy()) .setHost(options.getHost()) @@ -654,4 +654,14 @@ private HttpHandler safeHandler(HttpHandler handler) { } }; } + + @Override + protected OperationPurpose randomRetryingPurpose() { + return BlobStoreTestUtil.randomRetryingPurpose(); + } + + @Override + protected OperationPurpose randomFiniteRetryingPurpose() { + return BlobStoreTestUtil.randomFiniteRetryingPurpose(); + } } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index d07fa80551a24..27f6ef63fd7f3 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -1389,15 +1389,12 @@ protected Matcher getMaxRetriesMatcher(int maxRetries) { @Override protected OperationPurpose randomRetryingPurpose() { - return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose); + return BlobStoreTestUtil.randomRetryingPurpose(); } @Override protected OperationPurpose randomFiniteRetryingPurpose() { - return randomValueOtherThanMany( - purpose -> purpose == OperationPurpose.REPOSITORY_ANALYSIS || purpose == OperationPurpose.INDICES, - BlobStoreTestUtil::randomPurpose - ); + return BlobStoreTestUtil.randomFiniteRetryingPurpose(); } private void assertMetricsForOpeningStream() { diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index a3f950e473d1f..dc982b7521380 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -138,6 +138,7 @@ public int read() throws IOException { ensureOpen(); final int initialAttempt = attempt; while (true) { + // noinspection TryWithIdenticalCatches try { final int result = currentStream.read(); if (result != -1) { @@ -146,10 +147,9 @@ public int read() throws IOException { maybeLogAndRecordMetricsForSuccess(initialAttempt, READ); return result; } catch (IOException e) { - if (attempt == initialAttempt) { - blobStoreServices.onRetryStarted(READ); - } - reopenStreamOrFail(e); + retryOrAbortOnRead(initialAttempt, e); + } catch (RuntimeException e) { + retryOrAbortOnRead(initialAttempt, e); } } } @@ -159,6 +159,7 @@ public int read(byte[] b, int off, int len) throws IOException { ensureOpen(); final int initialAttempt = attempt; while (true) { + // noinspection TryWithIdenticalCatches try { final int bytesRead = currentStream.read(b, off, len); if (bytesRead != -1) { @@ -167,14 +168,20 @@ public int read(byte[] b, int off, int len) throws IOException { maybeLogAndRecordMetricsForSuccess(initialAttempt, READ); return bytesRead; } catch (IOException e) { - if (attempt == initialAttempt) { - blobStoreServices.onRetryStarted(READ); - } - reopenStreamOrFail(e); + retryOrAbortOnRead(initialAttempt, e); + } catch (RuntimeException e) { + retryOrAbortOnRead(initialAttempt, e); } } } + private void retryOrAbortOnRead(int initialAttempt, T exception) throws T, IOException { + if (attempt == initialAttempt) { + blobStoreServices.onRetryStarted(READ); + } + reopenStreamOrFail(exception); + } + private void ensureOpen() { if (closed) { assert false : "using RetryingInputStream after close"; @@ -182,7 +189,7 @@ private void ensureOpen() { } } - private void reopenStreamOrFail(IOException e) throws IOException { + private void reopenStreamOrFail(T e) throws T, IOException { final long meaningfulProgressSize = blobStoreServices.getMeaningfulProgressSize(); if (currentStreamProgress() >= meaningfulProgressSize) { failuresAfterMeaningfulProgress += 1; @@ -404,4 +411,8 @@ public T unwrap(Class clazz) { public static boolean willRetry(OperationPurpose purpose) { return purpose != OperationPurpose.REPOSITORY_ANALYSIS; } + + public static boolean willRetryForever(OperationPurpose purpose) { + return purpose == OperationPurpose.INDICES; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 8cb984e289fbd..8cd3aa1b15dfc 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -16,8 +16,6 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.common.blobstore.RetryingInputStream; import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -526,8 +524,4 @@ private Set listChildren(BlobPath path) { protected BlobStoreRepository getRepository() { return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository(TEST_REPO_NAME); } - - public static OperationPurpose randomRetryingPurpose() { - return randomFrom(Arrays.stream(OperationPurpose.values()).filter(RetryingInputStream::willRetry).toArray(OperationPurpose[]::new)); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 9d09f53e05b15..cfc4015b2d27e 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.RetryingInputStream; import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -76,6 +77,7 @@ import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.elasticsearch.test.ESTestCase.randomValueOtherThan; +import static org.elasticsearch.test.ESTestCase.randomValueOtherThanMany; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; @@ -471,6 +473,20 @@ public static OperationPurpose randomPurpose() { return randomFrom(OperationPurpose.values()); } + /** + * Random {@link OperationPurpose} that will be retried by {@link RetryingInputStream} + */ + public static OperationPurpose randomRetryingPurpose() { + return randomValueOtherThanMany(purpose -> RetryingInputStream.willRetry(purpose) == false, BlobStoreTestUtil::randomPurpose); + } + + /** + * Random {@link OperationPurpose} that will be retried a finite number of times by {@link RetryingInputStream} + */ + public static OperationPurpose randomFiniteRetryingPurpose() { + return randomValueOtherThanMany(RetryingInputStream::willRetryForever, BlobStoreTestUtil::randomRetryingPurpose); + } + public static OperationPurpose randomNonDataPurpose() { return randomValueOtherThan(OperationPurpose.SNAPSHOT_DATA, BlobStoreTestUtil::randomPurpose); } From d137a49e76258c82cf1e92115d53bf2f9cfb8675 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 16:53:21 +1100 Subject: [PATCH 11/15] Fix metric value --- .../elasticsearch/repositories/s3/S3RetryingInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 5508fb3a03149..26fd48761075c 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -128,7 +128,7 @@ private Map metricAttributes(StreamAction action) { "purpose", purpose.getKey(), "action", - action.toString() + action.getPastTense() ); } } From 55c98eb22cf39c7519bd10dd920fcef5116338f1 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 17:21:51 +1100 Subject: [PATCH 12/15] Override retry delay for retry tests --- .../gcs/GoogleCloudStorageBlobStore.java | 2 +- ...CloudStorageBlobContainerRetriesTests.java | 41 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index f02692a742a29..012cf450049bb 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -108,7 +108,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { @Nullable // for cluster level object store in MP private final ProjectId projectId; - private final String bucketName; + protected final String bucketName; private final String clientName; private final String repositoryName; private final GoogleCloudStorageService storageService; diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index f04d37bf5c7cb..6b1692bba1cf2 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -17,6 +17,7 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.ServiceOptions; import com.google.cloud.http.HttpTransportOptions; +import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; @@ -61,6 +62,7 @@ import org.elasticsearch.test.fixture.HttpHeaderParser; import org.threeten.bp.Duration; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -223,7 +225,44 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser randomIntBetween(1, 8) * 1024, BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)), new GcsRepositoryStatsCollector() - ); + ) { + + @Override + InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { + return new GoogleCloudStorageRetryingInputStream(this, purpose, BlobId.of(bucketName, blobName)) { + @Override + protected long getRetryDelayInMillis() { + return 10L; + } + }; + } + + @Override + InputStream readBlob(OperationPurpose purpose, String blobName, long position, long length) throws IOException { + if (position < 0L) { + throw new IllegalArgumentException("position must be non-negative"); + } + if (length < 0) { + throw new IllegalArgumentException("length must be non-negative"); + } + if (length == 0) { + return new ByteArrayInputStream(new byte[0]); + } else { + return new GoogleCloudStorageRetryingInputStream( + this, + purpose, + BlobId.of(bucketName, blobName), + position, + Math.addExact(position, length - 1) + ) { + @Override + protected long getRetryDelayInMillis() { + return 10L; + } + }; + } + } + }; return new GoogleCloudStorageBlobContainer( Objects.requireNonNullElse(blobContainerPath, randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo")), From 732121f2239e0585b588a6c98b154f84445c2367 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 18:57:21 +1100 Subject: [PATCH 13/15] Align behaviour with current --- .../gcs/GoogleCloudStorageRetryingInputStream.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index defa2e385be10..e233af493422d 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -9,7 +9,6 @@ package org.elasticsearch.repositories.gcs; import com.google.api.client.http.HttpResponse; -import com.google.cloud.BaseService; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageRetryStrategy; @@ -152,8 +151,8 @@ public String getBlobDescription() { @Override public boolean isRetryableException(StreamAction action, Exception e) { return switch (action) { - case OPEN -> BaseService.EXCEPTION_HANDLER.shouldRetry(e, null); - case READ -> STORAGE_RETRY_STRATEGY.getIdempotentHandler().shouldRetry(e, null); + case OPEN -> STORAGE_RETRY_STRATEGY.getIdempotentHandler().shouldRetry(e, null); + case READ -> true; }; } } From 63d60182e49fe3da955fe6a6c2868d7a0303365e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 26 Nov 2025 21:58:00 +1100 Subject: [PATCH 14/15] Use retrying purpose when we expect reads to succeed --- .../blobstore/ESBlobStoreRepositoryIntegTestCase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 8870c79218780..d9fa6ee50f1c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -65,6 +65,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomRetryingPurpose; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -157,7 +158,7 @@ public void testWriteMaybeCopyRead() throws IOException { readBlobName = destinationBlobName; } catch (UnsupportedOperationException ignored) {} } - try (InputStream stream = container.readBlob(randomPurpose(), readBlobName)) { + try (InputStream stream = container.readBlob(randomRetryingPurpose(), readBlobName)) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; @@ -285,7 +286,7 @@ public static byte[] writeRandomBlob(BlobContainer container, String name, int l public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException { byte[] data = new byte[length]; - try (InputStream inputStream = container.readBlob(randomPurpose(), name)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), name)) { assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length)); assertThat(inputStream.read(), CoreMatchers.equalTo(-1)); } From 4636d9ac037b04188784a1e8d6d30c780b7a4c3b Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 28 Nov 2025 12:32:46 +1100 Subject: [PATCH 15/15] Update docs/changelog/138553.yaml --- docs/changelog/138553.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/138553.yaml diff --git a/docs/changelog/138553.yaml b/docs/changelog/138553.yaml new file mode 100644 index 0000000000000..e1d6f2c392dbb --- /dev/null +++ b/docs/changelog/138553.yaml @@ -0,0 +1,5 @@ +pr: 138553 +summary: Use common retry logic for GCS +area: Snapshot/Restore +type: enhancement +issues: []