diff --git a/docs/changelog/138553.yaml b/docs/changelog/138553.yaml new file mode 100644 index 0000000000000..e1d6f2c392dbb --- /dev/null +++ b/docs/changelog/138553.yaml @@ -0,0 +1,5 @@ +pr: 138553 +summary: Use common retry logic for GCS +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index de5f4569116db..b0f3999a86e2f 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -13,7 +13,6 @@ import fixture.gcs.GoogleCloudStorageHttpHandler; import fixture.gcs.TestUtils; -import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRetryStrategy; @@ -62,8 +61,10 @@ import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes; import static org.elasticsearch.common.io.Streams.readFully; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomRetryingPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; @@ -119,6 +120,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { settings.put(super.nodeSettings(nodeOrdinal, otherSettings)); settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()); settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token"); + settings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), 6); final MockSecureSettings secureSettings = new MockSecureSettings(); final byte[] serviceAccount = TestUtils.createServiceAccount(random()); @@ -196,7 +198,7 @@ public void testWriteReadLarge() throws IOException { random().nextBytes(data); writeBlob(container, "foobar", new BytesArray(data), false); } - try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) { + try (InputStream stream = container.readBlob(randomRetryingPurpose(), "foobar")) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; @@ -219,7 +221,7 @@ public void testWriteFileMultipleOfChunkSize() throws IOException { byte[] initialValue = randomByteArrayOfLength(uploadSize); container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); - BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key)); assertThat(reference, equalBytes(new BytesArray(initialValue))); container.deleteBlobsIgnoringIfNotExists(randomPurpose(), Iterators.single(key)); @@ -243,24 +245,18 @@ protected GoogleCloudStorageService createStorageService(ClusterService clusterS @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { - StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions); + StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour); return options.toBuilder() .setStorageRetryStrategy(StorageRetryStrategy.getLegacyStorageRetryStrategy()) - .setHost(options.getHost()) - .setCredentials(options.getCredentials()) .setRetrySettings( - RetrySettings.newBuilder() - .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) + options.getRetrySettings() + .toBuilder() .setInitialRetryDelay(Duration.ofMillis(10L)) - .setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier()) .setMaxRetryDelay(Duration.ofSeconds(1L)) - .setMaxAttempts(0) .setJittered(false) - .setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout()) - .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) - .setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout()) .build() ) .build(); diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java index 520f922280eb0..7dfce9afeca0b 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java @@ -34,6 +34,7 @@ import static org.elasticsearch.common.io.Streams.readFully; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomRetryingPurpose; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -115,7 +116,7 @@ public void testResumeAfterUpdate() { executeOnBlobStore(repo, container -> { container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true); - try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), blobKey)) { // Trigger the first request for the blob, partially read it int read = inputStream.read(); assert read != -1; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 2af940c19bbac..012cf450049bb 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -108,7 +108,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { @Nullable // for cluster level object store in MP private final ProjectId projectId; - private final String bucketName; + protected final String bucketName; private final String clientName; private final String repositoryName; private final GoogleCloudStorageService storageService; @@ -139,8 +139,32 @@ class GoogleCloudStorageBlobStore implements BlobStore { this.casBackoffPolicy = casBackoffPolicy; } - private MeteredStorage client() throws IOException { - return storageService.client(projectId, clientName, repositoryName, statsCollector); + /** + * Get a client that will retry according to its configured settings + * + * @return A client + */ + MeteredStorage client() throws IOException { + return storageService.client( + projectId, + clientName, + repositoryName, + statsCollector, + GoogleCloudStorageService.RetryBehaviour.ClientConfigured + ); + } + + /** + * Get a client that will not retry on failure + * + * @return A client with max retries configured to zero + */ + MeteredStorage clientNoRetries() throws IOException { + return storageService.client(projectId, clientName, repositoryName, statsCollector, GoogleCloudStorageService.RetryBehaviour.None); + } + + int getMaxRetries() { + return storageService.clientSettings(projectId, clientName).getMaxRetries(); } @Override @@ -229,7 +253,7 @@ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException * @return the InputStream used to read the blob's content */ InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { - return new GoogleCloudStorageRetryingInputStream(purpose, client(), BlobId.of(bucketName, blobName)); + return new GoogleCloudStorageRetryingInputStream(this, purpose, BlobId.of(bucketName, blobName)); } /** @@ -252,8 +276,8 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l return new ByteArrayInputStream(new byte[0]); } else { return new GoogleCloudStorageRetryingInputStream( + this, purpose, - client(), BlobId.of(bucketName, blobName), position, Math.addExact(position, length - 1) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java index 582e0b48121fa..10d557ce2a8aa 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettings.java @@ -120,6 +120,17 @@ public class GoogleCloudStorageClientSettings { () -> PROXY_HOST_SETTING ); + /** + * The maximum number of retries to use when a GCS request fails. + *

+ * Default to 5 to match {@link com.google.cloud.ServiceOptions#getDefaultRetrySettings()} + */ + static final Setting.AffixSetting MAX_RETRIES_SETTING = Setting.affixKeySetting( + PREFIX, + "max_retries", + (key) -> Setting.intSetting(key, 5, 0, Setting.Property.NodeScope) + ); + /** The credentials used by the client to connect to the Storage endpoint. */ private final ServiceAccountCredentials credential; @@ -144,6 +155,8 @@ public class GoogleCloudStorageClientSettings { @Nullable private final Proxy proxy; + private final int maxRetries; + GoogleCloudStorageClientSettings( final ServiceAccountCredentials credential, final String endpoint, @@ -152,7 +165,8 @@ public class GoogleCloudStorageClientSettings { final TimeValue readTimeout, final String applicationName, final URI tokenUri, - final Proxy proxy + final Proxy proxy, + final int maxRetries ) { this.credential = credential; this.endpoint = endpoint; @@ -162,6 +176,7 @@ public class GoogleCloudStorageClientSettings { this.applicationName = applicationName; this.tokenUri = tokenUri; this.proxy = proxy; + this.maxRetries = maxRetries; } public ServiceAccountCredentials getCredential() { @@ -197,6 +212,10 @@ public Proxy getProxy() { return proxy; } + public int getMaxRetries() { + return maxRetries; + } + @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; @@ -249,7 +268,8 @@ static GoogleCloudStorageClientSettings getClientSettings(final Settings setting getConfigValue(settings, clientName, READ_TIMEOUT_SETTING), getConfigValue(settings, clientName, APPLICATION_NAME_SETTING), getConfigValue(settings, clientName, TOKEN_URI_SETTING), - proxy + proxy, + getConfigValue(settings, clientName, MAX_RETRIES_SETTING) ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 2c35aac1a46e8..5b930337bc447 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -93,7 +93,8 @@ public List> getSettings() { GoogleCloudStorageClientSettings.TOKEN_URI_SETTING, GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING, GoogleCloudStorageClientSettings.PROXY_HOST_SETTING, - GoogleCloudStorageClientSettings.PROXY_PORT_SETTING + GoogleCloudStorageClientSettings.PROXY_PORT_SETTING, + GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING ); } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index a74e86d8ee677..e233af493422d 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -9,15 +9,15 @@ package org.elasticsearch.repositories.gcs; import com.google.api.client.http.HttpResponse; -import com.google.cloud.BaseService; -import com.google.cloud.RetryHelper; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageRetryStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.core.IOUtils; +import org.elasticsearch.common.blobstore.RetryingInputStream; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; import org.elasticsearch.rest.RestStatus; @@ -25,129 +25,139 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.core.Strings.format; /** * Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred. - * This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing - * the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future. + *

+ * We make use of the retry logic from {@link RetryingInputStream}, which is slightly more sophisticated and tailored to our needs than + * the retry logic the GCS SDK provides by default. */ -class GoogleCloudStorageRetryingInputStream extends InputStream { +class GoogleCloudStorageRetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class); + private static final StorageRetryStrategy STORAGE_RETRY_STRATEGY = GoogleCloudStorageService.createStorageRetryStrategy(); - static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - - private final OperationPurpose purpose; - private final MeteredStorage client; - private final BlobId blobId; - private final long start; - private final long end; - private final int maxAttempts; - private InputStream currentStream; - private int attempt = 1; - private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); - private long currentOffset; - private boolean closed; - private Long lastGeneration; + GoogleCloudStorageRetryingInputStream(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) + throws IOException { + this(blobStore, purpose, blobId, 0, Long.MAX_VALUE - 1); + } - // Used for testing only - GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException { - this(purpose, client, blobId, 0, Long.MAX_VALUE - 1); + GoogleCloudStorageRetryingInputStream( + GoogleCloudStorageBlobStore blobStore, + OperationPurpose purpose, + BlobId blobId, + long start, + long end + ) throws IOException { + super(new GoogleCloudStorageBlobStoreServices(blobStore, purpose, blobId), purpose, start, end); } - // Used for testing only - GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId, long start, long end) - throws IOException { - if (start < 0L) { - throw new IllegalArgumentException("start must be non-negative"); - } - if (end < start || end == Long.MAX_VALUE) { - throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); + private static class GoogleCloudStorageBlobStoreServices implements BlobStoreServices { + + private final GoogleCloudStorageBlobStore blobStore; + private final OperationPurpose purpose; + private final BlobId blobId; + + private GoogleCloudStorageBlobStoreServices(GoogleCloudStorageBlobStore blobStore, OperationPurpose purpose, BlobId blobId) { + this.blobStore = blobStore; + this.purpose = purpose; + this.blobId = blobId; } - this.purpose = purpose; - this.client = client; - this.blobId = blobId; - this.start = start; - this.end = end; - this.maxAttempts = client.getOptions().getRetrySettings().getMaxAttempts(); - this.currentStream = openStream(); - } - private InputStream openStream() throws IOException { - try { + @Override + public SingleAttemptInputStream getInputStream(@Nullable Long lastGeneration, long start, long end) throws IOException { + final MeteredStorage client = blobStore.clientNoRetries(); try { - return RetryHelper.runWithRetries(() -> { - try { - final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); - meteredGet.setReturnRawInputStream(true); - if (lastGeneration != null) { - meteredGet.setGeneration(lastGeneration); - } + try { + final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); + meteredGet.setReturnRawInputStream(true); + if (lastGeneration != null) { + meteredGet.setGeneration(lastGeneration); + } - if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { - if (meteredGet.getRequestHeaders() != null) { - meteredGet.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end); - } - } - final HttpResponse resp = meteredGet.executeMedia(); - // Store the generation of the first response we received, so we can detect - // if the file has changed if we need to resume - if (lastGeneration == null) { - lastGeneration = parseGenerationHeader(resp); + if (start > 0 || end < Long.MAX_VALUE - 1) { + if (meteredGet.getRequestHeaders() != null) { + meteredGet.getRequestHeaders().setRange("bytes=" + start + "-" + end); } + } + final HttpResponse resp = meteredGet.executeMedia(); + // Store the generation of the first response we received, so we can detect + // if the file has changed if we need to resume + if (lastGeneration == null) { + lastGeneration = parseGenerationHeader(resp); + } - final Long contentLength = resp.getHeaders().getContentLength(); - InputStream content = resp.getContent(); - if (contentLength != null) { - content = new ContentLengthValidatingInputStream(content, contentLength); - } - return content; - } catch (IOException e) { - throw StorageException.translate(e); + final Long contentLength = resp.getHeaders().getContentLength(); + InputStream content = resp.getContent(); + if (contentLength != null) { + content = new ContentLengthValidatingInputStream(content, contentLength); } - }, client.getOptions().getRetrySettings(), BaseService.EXCEPTION_HANDLER, client.getOptions().getClock()); - } catch (RetryHelper.RetryHelperException e) { - throw StorageException.translateAndThrow(e); - } - } catch (StorageException storageException) { - if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { - if (lastGeneration != null) { - throw addSuppressedExceptions( - new NoSuchFileException( + return new SingleAttemptInputStream<>(content, start, lastGeneration); + } catch (IOException e) { + throw StorageException.translate(e); + } + } catch (StorageException storageException) { + if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { + if (lastGeneration != null) { + throw new NoSuchFileException( "Blob object [" + blobId.getName() + "] generation [" + lastGeneration + "] unavailable on resume (contents changed, or object deleted): " + storageException.getMessage() - ) - ); - } else { - throw addSuppressedExceptions( - new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()) - ); + ); + } else { + throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()); + } } - } - if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { - long currentPosition = Math.addExact(start, currentOffset); - throw addSuppressedExceptions( - new RequestedRangeNotSatisfiedException( + if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { + throw new RequestedRangeNotSatisfiedException( blobId.getName(), - currentPosition, - (end < Long.MAX_VALUE - 1) ? end - currentPosition + 1 : end, + start, + (end < Long.MAX_VALUE - 1) ? end - start + 1 : end, storageException - ) - ); + ); + } + throw storageException; } - throw addSuppressedExceptions(storageException); + } + + @Override + public void onRetryStarted(StreamAction action) { + // No retry metrics for GCS + } + + @Override + public void onRetrySucceeded(StreamAction action, long numberOfRetries) { + // No retry metrics for GCS + } + + @Override + public long getMeaningfulProgressSize() { + return Math.max(1L, GoogleCloudStorageBlobStore.SDK_DEFAULT_CHUNK_SIZE / 100L); + } + + @Override + public int getMaxRetries() { + return blobStore.getMaxRetries(); + } + + @Override + public String getBlobDescription() { + return blobId.toString(); + } + + @Override + public boolean isRetryableException(StreamAction action, Exception e) { + return switch (action) { + case OPEN -> STORAGE_RETRY_STRATEGY.getIdempotentHandler().shouldRetry(e, null); + case READ -> true; + }; } } - private Long parseGenerationHeader(HttpResponse response) { + private static Long parseGenerationHeader(HttpResponse response) { final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation"); if (generationHeader != null) { try { @@ -178,68 +188,49 @@ static final class ContentLengthValidatingInputStream extends FilterInputStream } @Override - public int read(byte[] b, int off, int len) throws IOException { - final int n = in.read(b, off, len); - if (n == -1) { - checkContentLengthOnEOF(); - } else { - read += n; + public int read(byte[] b, int off, int len) { + try { + final int n = in.read(b, off, len); + if (n == -1) { + checkContentLengthOnEOF(); + } else { + read += n; + } + return n; + } catch (IOException e) { + throw StorageException.translate(e); } - return n; } @Override - public int read() throws IOException { - final int n = in.read(); - if (n == -1) { - checkContentLengthOnEOF(); - } else { - read++; + public int read() { + try { + final int n = in.read(); + if (n == -1) { + checkContentLengthOnEOF(); + } else { + read++; + } + return n; + } catch (IOException e) { + throw StorageException.translate(e); } - return n; } @Override - public long skip(long len) throws IOException { - final long n = in.skip(len); - read += n; - return n; - } - - private void checkContentLengthOnEOF() throws IOException { - if (read < contentLength) { - throw new IOException("Connection closed prematurely: read = " + read + ", Content-Length = " + contentLength); - } - } - } - - @Override - public int read() throws IOException { - ensureOpen(); - while (true) { + public long skip(long len) { try { - final int result = currentStream.read(); - currentOffset += 1; - return result; + final long n = in.skip(len); + read += n; + return n; } catch (IOException e) { - reopenStreamOrFail(StorageException.translate(e)); + throw StorageException.translate(e); } } - } - @Override - public int read(byte[] b, int off, int len) throws IOException { - ensureOpen(); - while (true) { - try { - final int bytesRead = currentStream.read(b, off, len); - if (bytesRead == -1) { - return -1; - } - currentOffset += bytesRead; - return bytesRead; - } catch (IOException e) { - reopenStreamOrFail(StorageException.translate(e)); + private void checkContentLengthOnEOF() throws IOException { + if (read < contentLength) { + throw new IOException("Connection closed prematurely: read = " + read + ", Content-Length = " + contentLength); } } } @@ -251,52 +242,4 @@ public int read(byte[] b, int off, int len) throws IOException { void closeCurrentStream() throws IOException { currentStream.close(); } - - private void ensureOpen() { - if (closed) { - assert false : "using GoogleCloudStorageRetryingInputStream after close"; - throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close"); - } - } - - private void reopenStreamOrFail(StorageException e) throws IOException { - if (attempt >= maxAttempts) { - throw addSuppressedExceptions(e); - } - logger.debug( - () -> format("failed reading [%s] at offset [%s], attempt [%s] of [%s], retrying", blobId, currentOffset, attempt, maxAttempts), - e - ); - attempt += 1; - if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { - failures.add(e); - } - IOUtils.closeWhileHandlingException(currentStream); - currentStream = openStream(); - } - - @Override - public void close() throws IOException { - currentStream.close(); - closed = true; - } - - @Override - public long skip(long n) throws IOException { - // This could be optimized on a failure by re-opening stream directly to the preferred location. However, it is rarely called, - // so for now we will rely on the default implementation which just discards bytes by reading. - return super.skip(n); - } - - @Override - public void reset() { - throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking"); - } - - private T addSuppressedExceptions(T e) { - for (StorageException failure : failures) { - e.addSuppressed(failure); - } - return e; - } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index cb6b8b334181e..c175a991e5fa9 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -15,6 +15,7 @@ import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.util.SecurityUtils; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.ServiceOptions; @@ -94,6 +95,23 @@ public void refreshAndClearCache(Map c clientsManager.refreshAndClearCacheForClusterClients(clientsSettings); } + public enum RetryBehaviour { + ClientConfigured { + @Override + public RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings) { + return ServiceOptions.getDefaultRetrySettings().toBuilder().setMaxAttempts(settings.getMaxRetries() + 1).build(); + } + }, + None { + @Override + public RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings) { + return ServiceOptions.getNoRetrySettings(); + } + }; + + public abstract RetrySettings createRetrySettings(GoogleCloudStorageClientSettings settings); + } + /** * Attempts to retrieve a client from the cache. If the client does not exist it * will be created from the latest settings and will populate the cache. The @@ -111,9 +129,14 @@ public MeteredStorage client( @Nullable final ProjectId projectId, final String clientName, final String repositoryName, - final GcsRepositoryStatsCollector statsCollector + final GcsRepositoryStatsCollector statsCollector, + final RetryBehaviour retryBehaviour ) throws IOException { - return clientsManager.client(projectId, clientName, repositoryName, statsCollector); + return clientsManager.client(projectId, clientName, repositoryName, statsCollector, retryBehaviour); + } + + GoogleCloudStorageClientSettings clientSettings(@Nullable ProjectId projectId, final String clientName) { + return clientsManager.clientSettings(projectId, clientName); } /** @@ -138,8 +161,11 @@ GoogleCloudStorageClientsManager getClientsManager() { * @return a new client storage instance that can be used to manage objects * (blobs) */ - private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) - throws IOException { + private MeteredStorage createClient( + GoogleCloudStorageClientSettings gcsClientSettings, + GcsRepositoryStatsCollector statsCollector, + RetryBehaviour retryBehaviour + ) throws IOException { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); // requires java.lang.RuntimePermission "setFactory" @@ -183,22 +209,24 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser } }; - final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions); + final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour); return new MeteredStorage(storageOptions.getService(), statsCollector); } StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder() - .setStorageRetryStrategy(getRetryStrategy()) + .setStorageRetryStrategy(createStorageRetryStrategy()) .setTransportOptions(httpTransportOptions) .setHeaderProvider(() -> { return Strings.hasLength(gcsClientSettings.getApplicationName()) ? Map.of("user-agent", gcsClientSettings.getApplicationName()) : Map.of(); - }); + }) + .setRetrySettings(retryBehaviour.createRetrySettings(gcsClientSettings)); if (Strings.hasLength(gcsClientSettings.getHost())) { storageOptionsBuilder.setHost(gcsClientSettings.getHost()); } @@ -247,7 +275,7 @@ StorageOptions createStorageOptions( return storageOptionsBuilder.build(); } - protected StorageRetryStrategy getRetryStrategy() { + static StorageRetryStrategy createStorageRetryStrategy() { return ShouldRetryDecorator.decorate( StorageRetryStrategy.getLegacyStorageRetryStrategy(), (Throwable prevThrowable, Object prevResponse, ResultRetryAlgorithm delegate) -> { @@ -403,12 +431,25 @@ void refreshAndClearCacheForClusterClients(Map clientCache = emptyMap(); + protected volatile Map clientCache = emptyMap(); /** * Get the current client settings for all clients in this holder. @@ -476,46 +519,57 @@ abstract class ClientsHolder { * @return a cached client storage instance that can be used to manage objects * (blobs) */ - MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector) - throws IOException { + MeteredStorage client( + final String clientName, + final String repositoryName, + final GcsRepositoryStatsCollector statsCollector, + final RetryBehaviour retryBehaviour + ) throws IOException { + final var cacheKey = new CacheKey(repositoryName, retryBehaviour); { - final MeteredStorage storage = clientCache.get(repositoryName); + final MeteredStorage storage = clientCache.get(cacheKey); if (storage != null) { return storage; } } synchronized (this) { - final MeteredStorage existing = clientCache.get(repositoryName); + final MeteredStorage existing = clientCache.get(cacheKey); if (existing != null) { return existing; } - final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName); - - if (settings == null) { - throw new IllegalArgumentException( - "Unknown client name [" + clientName + "]. Existing client configs: " + allClientSettings().keySet() - ); - } + final GoogleCloudStorageClientSettings settings = clientSettings(clientName); logger.debug(() -> format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost())); - final MeteredStorage storage = createClient(settings, statsCollector); - clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage); + final MeteredStorage storage = createClient(settings, statsCollector, retryBehaviour); + clientCache = Maps.copyMapWithAddedEntry(clientCache, cacheKey, storage); return storage; } } + public GoogleCloudStorageClientSettings clientSettings(String clientName) { + final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName); + + if (settings == null) { + throw new IllegalArgumentException( + "Unknown client name [" + clientName + "]. Existing client configs: " + allClientSettings().keySet() + ); + } + + return settings; + } + synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() - .filter(entry -> entry.getKey().equals(repositoryName) == false) + .filter(entry -> entry.getKey().repositoryName().equals(repositoryName) == false) .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } // package private for tests final boolean hasCachedClientForRepository(String repositoryName) { - return clientCache.containsKey(repositoryName); + return clientCache.keySet().stream().anyMatch(key -> key.repositoryName().equals(repositoryName)); } } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 832e5149f92b2..7976887efc43a 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -17,6 +17,7 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.ServiceOptions; import com.google.cloud.http.HttpTransportOptions; +import com.google.cloud.storage.BlobId; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; @@ -32,6 +33,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.blobstore.OptionalBytesReference; import org.elasticsearch.common.blobstore.support.BlobContainerUtils; import org.elasticsearch.common.bytes.BytesArray; @@ -52,6 +54,7 @@ import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; @@ -59,6 +62,7 @@ import org.elasticsearch.test.fixture.HttpHeaderParser; import org.threeten.bp.Duration; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -86,6 +90,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.hamcrest.Matchers.anyOf; @@ -147,6 +152,9 @@ protected BlobContainer createBlobContainer( if (readTimeout != null) { clientSettings.put(READ_TIMEOUT_SETTING.getConcreteSettingForNamespace(client).getKey(), readTimeout); } + if (maxRetries != null) { + clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(client).getKey(), maxRetries); + } final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random())); @@ -156,7 +164,8 @@ protected BlobContainer createBlobContainer( @Override StorageOptions createStorageOptions( final GoogleCloudStorageClientSettings gcsClientSettings, - final HttpTransportOptions httpTransportOptions + final HttpTransportOptions httpTransportOptions, + final RetryBehaviour retryBehaviour ) { final HttpTransportOptions requestCountingHttpTransportOptions = new HttpTransportOptions( HttpTransportOptions.newBuilder() @@ -183,21 +192,21 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser }; } }; - final StorageOptions options = super.createStorageOptions(gcsClientSettings, requestCountingHttpTransportOptions); - final RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder() - .setTotalTimeout(options.getRetrySettings().getTotalTimeout()) + final StorageOptions options = super.createStorageOptions( + gcsClientSettings, + requestCountingHttpTransportOptions, + retryBehaviour + ); + final RetrySettings.Builder retrySettingsBuilder = options.getRetrySettings() + .toBuilder() .setInitialRetryDelay(Duration.ofMillis(10L)) .setRetryDelayMultiplier(1.0d) .setMaxRetryDelay(Duration.ofSeconds(1L)) .setJittered(false) .setInitialRpcTimeout(Duration.ofSeconds(1)) - .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) .setMaxRpcTimeout(Duration.ofSeconds(1)); - if (maxRetries != null) { - retrySettingsBuilder.setMaxAttempts(maxRetries + 1); - } return options.toBuilder() - .setStorageRetryStrategy(getRetryStrategy()) + .setStorageRetryStrategy(createStorageRetryStrategy()) .setHost(options.getHost()) .setCredentials(options.getCredentials()) .setRetrySettings(retrySettingsBuilder.build()) @@ -217,7 +226,44 @@ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions ser randomIntBetween(1, 8) * 1024, BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)), new GcsRepositoryStatsCollector() - ); + ) { + + @Override + InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { + return new GoogleCloudStorageRetryingInputStream(this, purpose, BlobId.of(bucketName, blobName)) { + @Override + protected long getRetryDelayInMillis() { + return 10L; + } + }; + } + + @Override + InputStream readBlob(OperationPurpose purpose, String blobName, long position, long length) throws IOException { + if (position < 0L) { + throw new IllegalArgumentException("position must be non-negative"); + } + if (length < 0) { + throw new IllegalArgumentException("length must be non-negative"); + } + if (length == 0) { + return new ByteArrayInputStream(new byte[0]); + } else { + return new GoogleCloudStorageRetryingInputStream( + this, + purpose, + BlobId.of(bucketName, blobName), + position, + Math.addExact(position, length - 1) + ) { + @Override + protected long getRetryDelayInMillis() { + return 10L; + } + }; + } + } + }; return new GoogleCloudStorageBlobContainer( Objects.requireNonNullElse(blobContainerPath, randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo")), @@ -274,7 +320,7 @@ public void testReadLargeBlobWithRetries() throws Exception { exchange.close(); }); - try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "large_blob_retries")) { + try (InputStream inputStream = blobContainer.readBlob(randomRetryingPurpose(), "large_blob_retries")) { assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); } } @@ -601,10 +647,10 @@ public void testContentsChangeWhileStreaming() throws IOException { byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); - BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key)); assertThat(reference, equalBytes(new BytesArray(initialValue))); - try (InputStream inputStream = container.readBlob(randomPurpose(), key)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), key)) { // Trigger the first chunk to load int read = inputStream.read(); assert read != -1; @@ -648,4 +694,14 @@ private HttpHandler safeHandler(HttpHandler handler) { } }; } + + @Override + protected OperationPurpose randomRetryingPurpose() { + return BlobStoreTestUtil.randomRetryingPurpose(); + } + + @Override + protected OperationPurpose randomFiniteRetryingPurpose() { + return BlobStoreTestUtil.randomFiniteRetryingPurpose(); + } } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java index 80bb9ae4f8246..bb3cc42ea16c2 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerStatsTests.java @@ -54,6 +54,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.StorageOperation.GET; @@ -254,7 +255,8 @@ private ContainerAndBlobStore createBlobContainer(final String repositoryName) t READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(getEndpointForServer(httpServer) + "/token"), - null + null, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); googleCloudStorageService.refreshAndClearCache(Map.of(clientName, clientSettings)); final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 849c09c506078..e6c854c23be9d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -82,8 +82,15 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti final MeteredStorage meteredStorage = new MeteredStorage(storage, storageRpc, new GcsRepositoryStatsCollector()); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); - when(storageService.client(eq(ProjectId.DEFAULT), any(String.class), any(String.class), any(GcsRepositoryStatsCollector.class))) - .thenReturn(meteredStorage); + when( + storageService.client( + eq(ProjectId.DEFAULT), + any(String.class), + any(String.class), + any(GcsRepositoryStatsCollector.class), + any(GoogleCloudStorageService.RetryBehaviour.class) + ) + ).thenReturn(meteredStorage); try ( BlobStore store = new GoogleCloudStorageBlobStore( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java index ac69afe7def6a..e0dcb5040ff8d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientSettingsTests.java @@ -40,6 +40,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings; @@ -123,7 +124,8 @@ public void testProjectIdDefaultsToCredentials() throws Exception { READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(""), - null + null, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); assertEquals(credential.getProjectId(), googleCloudStorageClientSettings.getProjectId()); } @@ -140,7 +142,8 @@ public void testLoadsProxySettings() throws Exception { READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY), APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY), new URI(""), - proxy + proxy, + MAX_RETRIES_SETTING.getDefault(Settings.EMPTY) ); assertEquals(proxy, googleCloudStorageClientSettings.getProxy()); } @@ -278,6 +281,13 @@ private static GoogleCloudStorageClientSettings randomClient( applicationName = APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY); } + int maxRetries; + if (randomBoolean()) { + maxRetries = randomIntBetween(0, 5); + } else { + maxRetries = MAX_RETRIES_SETTING.getDefault(Settings.EMPTY); + } + return new GoogleCloudStorageClientSettings( credential, endpoint, @@ -286,7 +296,8 @@ private static GoogleCloudStorageClientSettings randomClient( readTimeout, applicationName, new URI(""), - null + null, + maxRetries ); } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java index a3a538b82aa6f..b1c9e85834f3e 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageClientsManagerTests.java @@ -147,32 +147,42 @@ public void testClientsLifeCycleForSingleProject() throws Exception { final ProjectId projectId = randomUniqueProjectId(); final String clientName = randomFrom(clientNames); final String anotherClientName = randomValueOtherThan(clientName, () -> randomFrom(clientNames)); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); + final GoogleCloudStorageService.RetryBehaviour otherRetryBehaviour = randomValueOtherThan( + retryBehaviour, + GoogleCloudStorageClientsManagerTests::randomRetryBehaviour + ); // Configure project secrets for one client - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); { assertProjectClientSettings(projectId, clientName); // Retrieve client for the 1st time - final var initialClient = getClientFromManager(projectId, clientName); + final var initialClient = getClientFromManager(projectId, clientName, retryBehaviour); assertClientCredentials(projectId, clientName, initialClient); // Client is cached when retrieved again - assertThat(initialClient, sameInstance(getClientFromManager(projectId, clientName))); + assertThat(initialClient, sameInstance(getClientFromManager(projectId, clientName, retryBehaviour))); // Client not configured cannot be accessed, - assertClientNotFound(projectId, anotherClientName); + assertClientNotFound(projectId, anotherClientName, retryBehaviour); // Update client secrets should release and recreate the client updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName, anotherClientName)); assertProjectClientSettings(projectId, clientName, anotherClientName); - final var clientUpdated = getClientFromManager(projectId, clientName); + final var clientUpdated = getClientFromManager(projectId, clientName, retryBehaviour); assertThat(clientUpdated, not(sameInstance(initialClient))); // A different client for a different client name - final var anotherClient = getClientFromManager(projectId, anotherClientName); + final var anotherClient = getClientFromManager(projectId, anotherClientName, retryBehaviour); assertClientCredentials(projectId, anotherClientName, anotherClient); assertThat(anotherClient, not(sameInstance(clientUpdated))); + + // A different client for a different retry behaviour + final var anotherRetryBehaviour = getClientFromManager(projectId, clientName, otherRetryBehaviour); + assertClientCredentials(projectId, clientName, anotherRetryBehaviour); + assertThat(anotherRetryBehaviour, not(sameInstance(clientUpdated))); } // Remove project secrets or the entire project @@ -181,16 +191,17 @@ public void testClientsLifeCycleForSingleProject() throws Exception { } else { removeProjectFromClusterState(projectId); } - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); assertThat(gcsClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId))); } public void testClientsWithNoCredentialsAreFilteredOut() throws IOException { final ProjectId projectId = randomUniqueProjectId(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientNames.toArray(String[]::new))); for (var clientName : clientNames) { - assertNotNull(getClientFromManager(projectId, clientName)); + assertNotNull(getClientFromManager(projectId, clientName, retryBehaviour)); } final List clientsWithIncorrectSecretsConfig = randomNonEmptySubsetOf(clientNames); @@ -205,15 +216,16 @@ public void testClientsWithNoCredentialsAreFilteredOut() throws IOException { for (var clientName : clientNames) { if (clientsWithIncorrectSecretsConfig.contains(clientName)) { - assertClientNotFound(projectId, clientName); + assertClientNotFound(projectId, clientName, retryBehaviour); } else { - assertNotNull(getClientFromManager(projectId, clientName)); + assertNotNull(getClientFromManager(projectId, clientName, retryBehaviour)); } } } public void testClientsForMultipleProjects() throws InterruptedException { final List projectIds = randomList(2, 8, ESTestCase::randomUniqueProjectId); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); final List threads = projectIds.stream().map(projectId -> new Thread(() -> { final int iterations = between(1, 3); @@ -224,7 +236,7 @@ public void testClientsForMultipleProjects() throws InterruptedException { assertProjectClientSettings(projectId, clientNames.toArray(String[]::new)); for (var clientName : shuffledList(clientNames)) { try { - final var meteredStorage = getClientFromManager(projectId, clientName); + final var meteredStorage = getClientFromManager(projectId, clientName, retryBehaviour); assertClientCredentials(projectId, clientName, meteredStorage); } catch (IOException e) { fail(e); @@ -237,7 +249,7 @@ public void testClientsForMultipleProjects() throws InterruptedException { removeProjectFromClusterState(projectId); } assertThat(gcsClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId))); - clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName)); + clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName, retryBehaviour)); } })).toList(); @@ -251,16 +263,17 @@ public void testClusterAndProjectClients() throws IOException { final ProjectId projectId = randomUniqueProjectId(); final String clientName = randomFrom(clientNames); final boolean configureProjectClientsFirst = randomBoolean(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); if (configureProjectClientsFirst) { updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); } - final var clusterClient = getClientFromService(projectIdForClusterClient(), clientName); + final var clusterClient = getClientFromService(projectIdForClusterClient(), clientName, retryBehaviour); if (configureProjectClientsFirst == false) { assertThat(gcsClientsManager.getPerProjectClientsHolders(), anEmptyMap()); updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName)); } - final var projectClient = getClientFromService(projectId, clientName); + final var projectClient = getClientFromService(projectId, clientName, retryBehaviour); assertThat(projectClient, not(sameInstance(clusterClient))); // Release the cluster client @@ -290,15 +303,27 @@ public void testProjectClientsDisabled() throws IOException { // Cluster client still works final String clientName = randomFrom(clientNames); - assertNotNull(getClientFromService(projectIdForClusterClient(), clientName)); + assertNotNull(getClientFromService(projectIdForClusterClient(), clientName, randomRetryBehaviour())); } - private MeteredStorage getClientFromManager(ProjectId projectId, String clientName) throws IOException { - return gcsClientsManager.client(projectId, clientName, repoNameForClient(clientName), statsCollector); + private MeteredStorage getClientFromManager( + ProjectId projectId, + String clientName, + GoogleCloudStorageService.RetryBehaviour retryBehaviour + ) throws IOException { + return gcsClientsManager.client(projectId, clientName, repoNameForClient(clientName), statsCollector, retryBehaviour); } - private MeteredStorage getClientFromService(ProjectId projectId, String clientName) throws IOException { - return googleCloudStorageService.client(projectId, clientName, repoNameForClient(clientName), statsCollector); + private static GoogleCloudStorageService.RetryBehaviour randomRetryBehaviour() { + return randomFrom(GoogleCloudStorageService.RetryBehaviour.values()); + } + + private MeteredStorage getClientFromService( + ProjectId projectId, + String clientName, + GoogleCloudStorageService.RetryBehaviour retryBehaviour + ) throws IOException { + return googleCloudStorageService.client(projectId, clientName, repoNameForClient(clientName), statsCollector, retryBehaviour); } private ProjectId projectIdForClusterClient() { @@ -332,8 +357,11 @@ private void assertClientCredentials(ProjectId projectId, String clientName, Met assertThat(credentials.getPrivateKeyId(), equalTo(projectClientPrivateKeyId(projectId, clientName))); } - private void assertClientNotFound(ProjectId projectId, String clientName) { - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> getClientFromManager(projectId, clientName)); + private void assertClientNotFound(ProjectId projectId, String clientName, GoogleCloudStorageService.RetryBehaviour retryBehaviour) { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> getClientFromManager(projectId, clientName, retryBehaviour) + ); assertThat( e.getMessage(), anyOf( diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java index 7b9ef189b7459..e96746dabb536 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePluginTests.java @@ -42,7 +42,8 @@ public void testExposedSettings() { "gcs.client.*.token_uri", "gcs.client.*.proxy.type", "gcs.client.*.proxy.host", - "gcs.client.*.proxy.port" + "gcs.client.*.proxy.port", + "gcs.client.*.max_retries" ), settings.stream().map(Setting::getKey).toList() ); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java index 2b223c7a16342..5908f29bb194d 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java @@ -131,8 +131,11 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d HttpRequest httpRequest = transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); HttpResponse httpResponse = httpRequest.execute(); when(get.executeMedia()).thenReturn(httpResponse); + final GoogleCloudStorageBlobStore mockBlobStore = mock(GoogleCloudStorageBlobStore.class); + when(mockBlobStore.clientNoRetries()).thenReturn(meteredStorage); + when(mockBlobStore.getMaxRetries()).thenReturn(6); - return new GoogleCloudStorageRetryingInputStream(OperationPurpose.SNAPSHOT_DATA, meteredStorage, blobId); + return new GoogleCloudStorageRetryingInputStream(mockBlobStore, OperationPurpose.SNAPSHOT_DATA, blobId); } private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] data, int position, int length) throws IOException { @@ -148,9 +151,13 @@ private GoogleCloudStorageRetryingInputStream createRetryingInputStream(byte[] d when(get.executeMedia()).thenReturn(httpResponse); } + final GoogleCloudStorageBlobStore mockBlobStore = mock(GoogleCloudStorageBlobStore.class); + when(mockBlobStore.clientNoRetries()).thenReturn(meteredStorage); + when(mockBlobStore.getMaxRetries()).thenReturn(6); + return new GoogleCloudStorageRetryingInputStream( + mockBlobStore, OperationPurpose.SNAPSHOT_DATA, - meteredStorage, blobId, position, position + length - 1 diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index bf4745a16b41c..369d820a638a0 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -40,6 +40,7 @@ import java.util.Locale; import java.util.UUID; +import static org.elasticsearch.repositories.gcs.GoogleCloudStorageService.RetryBehaviour.ClientConfigured; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -58,6 +59,7 @@ public void testClientInitializer() throws Exception { + ":" + randomIntBetween(1, 65535); final String projectIdName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); + final int maxRetries = randomIntBetween(0, 10); final Settings settings = Settings.builder() .put( GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), @@ -76,6 +78,7 @@ public void testClientInitializer() throws Exception { .put(GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "HTTP") .put(GoogleCloudStorageClientSettings.PROXY_HOST_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "192.168.52.15") .put(GoogleCloudStorageClientSettings.PROXY_PORT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), 8080) + .put(GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries) .build(); SetOnce proxy = new SetOnce<>(); final var clusterService = ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()); @@ -89,17 +92,18 @@ void notifyProxyIsSet(Proxy p) { var statsCollector = new GcsRepositoryStatsCollector(); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> service.client(projectIdForClusterClient(), "another_client", "repo", statsCollector) + () -> service.client(projectIdForClusterClient(), "another_client", "repo", statsCollector, randomRetryBehaviour()) ); assertThat(e.getMessage(), Matchers.startsWith("Unknown client name")); assertSettingDeprecationsAndWarnings( new Setting[] { GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING.getConcreteSettingForNamespace(clientName) } ); - final var storage = service.client(projectIdForClusterClient(), clientName, "repo", statsCollector); + final var storage = service.client(projectIdForClusterClient(), clientName, "repo", statsCollector, ClientConfigured); assertThat(storage.getOptions().getApplicationName(), Matchers.containsString(applicationName)); assertThat(storage.getOptions().getHost(), Matchers.is(endpoint)); assertThat(storage.getOptions().getProjectId(), Matchers.is(projectIdName)); assertThat(storage.getOptions().getTransportOptions(), Matchers.instanceOf(HttpTransportOptions.class)); + assertThat(storage.getOptions().getRetrySettings().getMaxAttempts(), equalTo(maxRetries + 1)); assertThat( ((HttpTransportOptions) storage.getOptions().getTransportOptions()).getConnectTimeout(), Matchers.is((int) connectTimeValue.millis()) @@ -125,18 +129,19 @@ public void testReinitClientSettings() throws Exception { ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()) ); when(pluginServices.projectResolver()).thenReturn(TestProjectResolvers.DEFAULT_PROJECT_ONLY); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { plugin.createComponents(pluginServices); final GoogleCloudStorageService storageService = plugin.storageService.get(); var statsCollector = new GcsRepositoryStatsCollector(); - final var client11 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector); + final var client11 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector, retryBehaviour); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); - final var client12 = storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector); + final var client12 = storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector, retryBehaviour); assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // client 3 is missing final IllegalArgumentException e1 = expectThrows( IllegalArgumentException.class, - () -> storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector) + () -> storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector, retryBehaviour) ); assertThat(e1.getMessage(), containsString("Unknown client name [gcs3].")); // update client settings @@ -144,18 +149,18 @@ public void testReinitClientSettings() throws Exception { // old client 1 not changed assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); // new client 1 is changed - final var client21 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector); + final var client21 = storageService.client(projectIdForClusterClient(), "gcs1", "repo1", statsCollector, retryBehaviour); assertThat(client21.getOptions().getProjectId(), equalTo("project_gcs21")); // old client 2 not changed assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12")); // new client2 is gone final IllegalArgumentException e2 = expectThrows( IllegalArgumentException.class, - () -> storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector) + () -> storageService.client(projectIdForClusterClient(), "gcs2", "repo2", statsCollector, retryBehaviour) ); assertThat(e2.getMessage(), containsString("Unknown client name [gcs2].")); // client 3 emerged - final var client23 = storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector); + final var client23 = storageService.client(projectIdForClusterClient(), "gcs3", "repo3", statsCollector, retryBehaviour); assertThat(client23.getOptions().getProjectId(), equalTo("project_gcs23")); } } @@ -173,23 +178,27 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { plugin.createComponents(pluginServices); final GoogleCloudStorageService storageService = plugin.storageService.get(); + final GoogleCloudStorageService.RetryBehaviour retryBehaviour = randomRetryBehaviour(); final MeteredStorage repo1Client = storageService.client( projectIdForClusterClient(), "gcs1", "repo1", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); final MeteredStorage repo2Client = storageService.client( projectIdForClusterClient(), "gcs1", "repo2", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); final MeteredStorage repo1ClientSecondInstance = storageService.client( projectIdForClusterClient(), "gcs1", "repo1", - new GcsRepositoryStatsCollector() + new GcsRepositoryStatsCollector(), + retryBehaviour ); assertNotSame(repo1Client, repo2Client); @@ -240,4 +249,8 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte private ProjectId projectIdForClusterClient() { return randomBoolean() ? ProjectId.DEFAULT : null; } + + private static GoogleCloudStorageService.RetryBehaviour randomRetryBehaviour() { + return randomFrom(GoogleCloudStorageService.RetryBehaviour.values()); + } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 18c3f2d6b194b..26fd48761075c 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.Map; @@ -36,7 +37,7 @@ * * See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue */ -class S3RetryingInputStream extends RetryingInputStream { +class S3RetryingInputStream extends RetryingInputStream { private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class); @@ -49,10 +50,10 @@ class S3RetryingInputStream extends RetryingInputStream { super(new S3BlobStoreServices(blobStore, blobKey, purpose), purpose, start, end); } - private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose) implements BlobStoreServices { + private record S3BlobStoreServices(S3BlobStore blobStore, String blobKey, OperationPurpose purpose) implements BlobStoreServices { @Override - public S3SingleAttemptInputStream getInputStream(long start, long end) throws IOException { + public SingleAttemptInputStream getInputStream(Void version, long start, long end) throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { final var getObjectRequestBuilder = GetObjectRequest.builder().bucket(blobStore.bucket()).key(blobKey); configureRequestForMetrics(getObjectRequestBuilder, blobStore, Operation.GET_OBJECT, purpose); @@ -62,7 +63,7 @@ public S3SingleAttemptInputStream getInputStream(long start, long end) throws IO } final var getObjectRequest = getObjectRequestBuilder.build(); final var getObjectResponse = clientReference.client().getObject(getObjectRequest); - return new S3SingleAttemptInputStream(getObjectResponse, start, end); + return new SingleAttemptInputStream<>(new S3SingleAttemptInputStream(getObjectResponse, start, end), start, null); } catch (SdkException e) { if (e instanceof SdkServiceException sdkServiceException) { if (sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) { @@ -82,12 +83,12 @@ public S3SingleAttemptInputStream getInputStream(long start, long end) throws IO } @Override - public void onRetryStarted(String action) { + public void onRetryStarted(StreamAction action) { blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes(action)); } @Override - public void onRetrySucceeded(String action, long numberOfRetries) { + public void onRetrySucceeded(StreamAction action, long numberOfRetries) { final Map attributes = metricAttributes(action); blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); @@ -108,7 +109,15 @@ public String getBlobDescription() { return blobStore.bucket() + "/" + blobKey; } - private Map metricAttributes(String action) { + @Override + public boolean isRetryableException(StreamAction action, Exception e) { + return switch (action) { + case OPEN -> e instanceof RuntimeException; + case READ -> e instanceof IOException; + }; + } + + private Map metricAttributes(StreamAction action) { return Map.of( "repo_type", S3Repository.TYPE, @@ -119,7 +128,7 @@ private Map metricAttributes(String action) { "purpose", purpose.getKey(), "action", - action + action.getPastTense() ); } } @@ -171,7 +180,7 @@ private static long tryGetStreamLength(GetObjectResponse getObjectResponse, long return getObjectResponse.contentLength(); } - private static class S3SingleAttemptInputStream extends SingleAttemptInputStream { + private static class S3SingleAttemptInputStream extends InputStream { private final ResponseInputStream responseStream; private final long start; @@ -276,25 +285,20 @@ private boolean isAborted() { private long tryGetStreamLength(GetObjectResponse response) { return S3RetryingInputStream.tryGetStreamLength(response, start, end); } - - @Override - protected long getFirstOffset() { - return start; - } } // exposed for testing boolean isEof() { - return ((S3SingleAttemptInputStream) currentStream).isEof(); + return currentStream.unwrap(S3SingleAttemptInputStream.class).isEof(); } // exposed for testing boolean isAborted() { - return ((S3SingleAttemptInputStream) currentStream).isAborted(); + return currentStream.unwrap(S3SingleAttemptInputStream.class).isAborted(); } // exposed for testing long tryGetStreamLength(GetObjectResponse getObjectResponse) { - return ((S3SingleAttemptInputStream) currentStream).tryGetStreamLength(getObjectResponse); + return currentStream.unwrap(S3SingleAttemptInputStream.class).tryGetStreamLength(getObjectResponse); } } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index d07fa80551a24..27f6ef63fd7f3 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -1389,15 +1389,12 @@ protected Matcher getMaxRetriesMatcher(int maxRetries) { @Override protected OperationPurpose randomRetryingPurpose() { - return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose); + return BlobStoreTestUtil.randomRetryingPurpose(); } @Override protected OperationPurpose randomFiniteRetryingPurpose() { - return randomValueOtherThanMany( - purpose -> purpose == OperationPurpose.REPOSITORY_ANALYSIS || purpose == OperationPurpose.INDICES, - BlobStoreTestUtil::randomPurpose - ); + return BlobStoreTestUtil.randomFiniteRetryingPurpose(); } private void assertMetricsForOpeningStream() { diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java index 483c63b34079f..dc982b7521380 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/RetryingInputStream.java @@ -13,40 +13,79 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.OPEN; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.READ; import static org.elasticsearch.core.Strings.format; -public abstract class RetryingInputStream extends InputStream { +/** + * An {@link InputStream} that resumes downloads from the point at which they failed when a retry-able error occurs. + *

+ * This class implements some Elasticsearch-specific retry behavior, including: + *

    + *
  • Retrying indefinitely for {@link OperationPurpose#INDICES} operations
  • + *
  • Not retrying at all for {@link OperationPurpose#REPOSITORY_ANALYSIS} operations
  • + *
  • Extending retries if "meaningful" progress was made on the last attempt
  • + *
  • More detailed logging about the status of operations being retried
  • + *
+ * + * @param The type used to represent the version of a blob + */ +public abstract class RetryingInputStream extends InputStream { private static final Logger logger = LogManager.getLogger(RetryingInputStream.class); public static final int MAX_SUPPRESSED_EXCEPTIONS = 10; - private final BlobStoreServices blobStoreServices; + public enum StreamAction { + OPEN("open", "opening"), + READ("read", "reading"); + + private final String pastTense; + private final String presentTense; + + StreamAction(String pastTense, String presentTense) { + this.pastTense = pastTense; + this.presentTense = presentTense; + } + + public String getPastTense() { + return pastTense; + } + + public String getPresentTense() { + return presentTense; + } + } + + private final BlobStoreServices blobStoreServices; private final OperationPurpose purpose; private final long start; private final long end; private final List failures; - protected SingleAttemptInputStream currentStream; + protected SingleAttemptInputStream currentStream; private long offset = 0; private int attempt = 1; private int failuresAfterMeaningfulProgress = 0; private boolean closed = false; - protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose) throws IOException { + protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose) throws IOException { this(blobStoreServices, purpose, 0L, Long.MAX_VALUE - 1L); } @SuppressWarnings("this-escape") // TODO: We can do better than this but I don't want to touch the tests for the first implementation - protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose, long start, long end) throws IOException { + protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose, long start, long end) + throws IOException { if (start < 0L) { throw new IllegalArgumentException("start must be non-negative"); } @@ -60,7 +99,7 @@ protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurp this.end = end; final int initialAttempt = attempt; openStreamWithRetry(); - maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, OPEN); } private void openStreamWithRetry() throws IOException { @@ -68,38 +107,49 @@ private void openStreamWithRetry() throws IOException { if (offset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { assert start + offset <= end : "requesting beyond end, start = " + start + " offset=" + offset + " end=" + end; } + // noinspection TryWithIdenticalCatches try { - currentStream = blobStoreServices.getInputStream(Math.addExact(start, offset), end); + currentStream = blobStoreServices.getInputStream( + currentStream != null ? currentStream.getVersion() : null, + Math.addExact(start, offset), + end + ); return; } catch (NoSuchFileException | RequestedRangeNotSatisfiedException e) { throw e; } catch (RuntimeException e) { - if (attempt == 1) { - blobStoreServices.onRetryStarted("open"); - } - final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); - delayBeforeRetry(delayInMillis); + retryOrAbortOnOpen(e); + } catch (IOException e) { + retryOrAbortOnOpen(e); } } } + private void retryOrAbortOnOpen(T exception) throws T { + if (attempt == 1) { + blobStoreServices.onRetryStarted(StreamAction.OPEN); + } + final long delayInMillis = maybeLogAndComputeRetryDelay(StreamAction.OPEN, exception); + delayBeforeRetry(delayInMillis); + } + @Override public int read() throws IOException { ensureOpen(); final int initialAttempt = attempt; while (true) { + // noinspection TryWithIdenticalCatches try { final int result = currentStream.read(); if (result != -1) { offset += 1; } - maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, READ); return result; } catch (IOException e) { - if (attempt == initialAttempt) { - blobStoreServices.onRetryStarted("read"); - } - reopenStreamOrFail(e); + retryOrAbortOnRead(initialAttempt, e); + } catch (RuntimeException e) { + retryOrAbortOnRead(initialAttempt, e); } } } @@ -109,22 +159,29 @@ public int read(byte[] b, int off, int len) throws IOException { ensureOpen(); final int initialAttempt = attempt; while (true) { + // noinspection TryWithIdenticalCatches try { final int bytesRead = currentStream.read(b, off, len); if (bytesRead != -1) { offset += bytesRead; } - maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, READ); return bytesRead; } catch (IOException e) { - if (attempt == initialAttempt) { - blobStoreServices.onRetryStarted("read"); - } - reopenStreamOrFail(e); + retryOrAbortOnRead(initialAttempt, e); + } catch (RuntimeException e) { + retryOrAbortOnRead(initialAttempt, e); } } } + private void retryOrAbortOnRead(int initialAttempt, T exception) throws T, IOException { + if (attempt == initialAttempt) { + blobStoreServices.onRetryStarted(READ); + } + reopenStreamOrFail(exception); + } + private void ensureOpen() { if (closed) { assert false : "using RetryingInputStream after close"; @@ -132,12 +189,12 @@ private void ensureOpen() { } } - private void reopenStreamOrFail(IOException e) throws IOException { + private void reopenStreamOrFail(T e) throws T, IOException { final long meaningfulProgressSize = blobStoreServices.getMeaningfulProgressSize(); if (currentStreamProgress() >= meaningfulProgressSize) { failuresAfterMeaningfulProgress += 1; } - final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e); + final long delayInMillis = maybeLogAndComputeRetryDelay(READ, e); IOUtils.closeWhileHandlingException(currentStream); delayBeforeRetry(delayInMillis); @@ -146,8 +203,8 @@ private void reopenStreamOrFail(IOException e) throws IOException { // The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure // and compute the delay before retry. - private long maybeLogAndComputeRetryDelay(String action, T e) throws T { - if (shouldRetry(attempt) == false) { + private long maybeLogAndComputeRetryDelay(StreamAction action, T e) throws T { + if (blobStoreServices.isRetryableException(action, e) == false || shouldRetry(attempt) == false) { final var finalException = addSuppressedExceptions(e); logForFailure(action, finalException); throw finalException; @@ -163,11 +220,11 @@ private long maybeLogAndComputeRetryDelay(String action, T return delayInMillis; } - private void logForFailure(String action, Exception e) { + private void logForFailure(StreamAction action, Exception e) { logger.warn( () -> format( "failed %s [%s] at offset [%s] with purpose [%s]", - action, + action.getPresentTense(), blobStoreServices.getBlobDescription(), start + offset, purpose.getKey() @@ -176,7 +233,7 @@ private void logForFailure(String action, Exception e) { ); } - private void logForRetry(Level level, String action, Exception e) { + private void logForRetry(Level level, StreamAction action, Exception e) { logger.log( level, () -> format( @@ -185,7 +242,7 @@ private void logForRetry(Level level, String action, Exception e) { this was attempt [%s] to read this blob which yielded [%s] bytes; in total \ [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \ retries; the maximum number of read attempts which do not make meaningful progress is [%s]""", - action, + action.getPresentTense(), blobStoreServices.getBlobDescription(), start + offset, purpose.getKey(), @@ -198,12 +255,12 @@ private void logForRetry(Level level, String action, Exception e) { ); } - private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, StreamAction action) { if (attempt > initialAttempt) { final int numberOfRetries = attempt - initialAttempt; logger.info( "successfully {} input stream for [{}] with purpose [{}] after [{}] retries", - action, + action.getPastTense(), blobStoreServices.getBlobDescription(), purpose.getKey(), numberOfRetries @@ -279,12 +336,15 @@ private T addSuppressedExceptions(T e) { /** * This implements all the behavior that is blob-store-specific + * + * @param The type of the version used */ - protected interface BlobStoreServices { + protected interface BlobStoreServices { /** - * Get an input stream for the blob at the given position + * Get an input stream for the given version * + * @param version The version to request, or null if the latest version should be used * @param start The start of the range to read, inclusive * @param end The end of the range to read, exclusive, or {@code Long.MAX_VALUE - 1} if the end of the blob should be used * @return An input stream for the given version @@ -292,28 +352,67 @@ protected interface BlobStoreServices { * @throws NoSuchFileException if the blob does not exist, this is not retry-able * @throws RequestedRangeNotSatisfiedException if the requested range is not valid, this is not retry-able */ - SingleAttemptInputStream getInputStream(long start, long end) throws IOException; + SingleAttemptInputStream getInputStream(@Nullable V version, long start, long end) throws IOException; - void onRetryStarted(String action); + void onRetryStarted(StreamAction action); - void onRetrySucceeded(String action, long numberOfRetries); + void onRetrySucceeded(StreamAction action, long numberOfRetries); long getMeaningfulProgressSize(); int getMaxRetries(); String getBlobDescription(); + + boolean isRetryableException(StreamAction action, Exception e); } /** * Represents an {@link InputStream} for a single attempt to read a blob. Each retry * will attempt to create a new one of these. If reading from it fails, it should not retry. */ - protected abstract static class SingleAttemptInputStream extends InputStream { + protected static final class SingleAttemptInputStream extends FilterInputStream { + + private final long firstOffset; + private final V version; + + public SingleAttemptInputStream(InputStream in, long firstOffset, V version) { + super(in); + this.firstOffset = firstOffset; + this.version = version; + } /** * @return the offset of the first byte returned by this input stream */ - protected abstract long getFirstOffset(); + public long getFirstOffset() { + return firstOffset; + } + + /** + * Get the version of this input stream + */ + public V getVersion() { + return version; + } + + /** + * Unwrap the underlying input stream + * + * @param clazz The expected class of the underlying input stream + * @return The underlying input stream + * @param The type of the underlying input stream + */ + public T unwrap(Class clazz) { + return clazz.cast(in); + } + } + + public static boolean willRetry(OperationPurpose purpose) { + return purpose != OperationPurpose.REPOSITORY_ANALYSIS; + } + + public static boolean willRetryForever(OperationPurpose purpose) { + return purpose == OperationPurpose.INDICES; } } diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java index 0f6f590005297..31b33ab13bd52 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/RetryingInputStreamTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Streams; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException; @@ -26,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.OPEN; +import static org.elasticsearch.common.blobstore.RetryingInputStream.StreamAction.READ; import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes; import static org.hamcrest.Matchers.empty; @@ -35,81 +38,89 @@ public void testRetryableErrorsWhenReadingAreRetried() throws IOException { final var retryableFailures = randomIntBetween(1, 5); final var failureCounter = new AtomicInteger(retryableFailures); final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(retryableFailures * 2) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; - byte[] results = copyToBytes(new RetryingInputStream(services, randomRetryingPurpose()) { + byte[] results = copyToBytes(new RetryingInputStream<>(services, randomRetryingPurpose()) { }); assertEquals(resourceBytes.length(), results.length); assertThat(new BytesArray(results), equalBytes(resourceBytes)); assertEquals(retryableFailures + 1, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(retryableFailures).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(retryableFailures).toList(), services.getRetryStarted()); } public void testReadWillFailWhenRetryableErrorsExceedMaxRetries() { final var maxRetries = randomIntBetween(1, 5); final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(10, 100)).getBytes()); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(maxRetries) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, true); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true); } }; final var ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, randomFiniteRetryingPurpose()) { + () -> copyToBytes(new RetryingInputStream<>(services, randomFiniteRetryingPurpose()) { }) ); assertEquals("This is retry-able", ioException.getMessage()); assertEquals(maxRetries + 1, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(maxRetries + 1).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(maxRetries + 1).toList(), services.getRetryStarted()); } public void testReadWillFailWhenRetryableErrorsOccurDuringRepositoryAnalysis() { final var maxRetries = randomIntBetween(2, 5); final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(maxRetries) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, true); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true); } }; final var ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, OperationPurpose.REPOSITORY_ANALYSIS) { + () -> copyToBytes(new RetryingInputStream<>(services, OperationPurpose.REPOSITORY_ANALYSIS) { }) ); assertEquals("This is retry-able", ioException.getMessage()); assertEquals(1, services.getAttempts()); - assertEquals(List.of("read"), services.getRetryStarted()); + assertEquals(List.of(READ), services.getRetryStarted()); } public void testReadWillRetryIndefinitelyWhenErrorsOccurDuringIndicesOperation() throws IOException { final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); final int numberOfFailures = randomIntBetween(1, 10); final AtomicInteger failureCounter = new AtomicInteger(numberOfFailures); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(0) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { - return new FailureAtIndexInputStream(resourceBytes, (int) start, failureCounter.getAndDecrement() > 0); + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); } }; - byte[] result = copyToBytes(new RetryingInputStream(services, OperationPurpose.INDICES) { + byte[] result = copyToBytes(new RetryingInputStream<>(services, OperationPurpose.INDICES) { }); assertThat(new BytesArray(result), equalBytes(resourceBytes)); assertEquals(numberOfFailures + 1, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(numberOfFailures).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(numberOfFailures).toList(), services.getRetryStarted()); } public void testRetriesWillBeExtendedWhenMeaningfulProgressIsMade() { @@ -118,13 +129,15 @@ public void testRetriesWillBeExtendedWhenMeaningfulProgressIsMade() { final var meaningfulProgressSize = randomIntBetween(1024, 4096); final var meaningfulProgressAttempts = randomIntBetween(1, 3); final var meaningfulProgressAttemptsCounter = new AtomicInteger(meaningfulProgressAttempts); + final var eTag = randomUUID(); final var services = new BlobStoreServicesAdapter(maxRetries) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { final var inputStream = meaningfulProgressAttemptsCounter.decrementAndGet() > 0 - ? new FailureAtIndexInputStream(resourceBytes, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) - : new FailureAtIndexInputStream(resourceBytes, (int) start, true, 1, meaningfulProgressSize - 1); + ? createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true, meaningfulProgressSize, Integer.MAX_VALUE) + : createSingleAttemptInputStream(resourceBytes, eTag, (int) start, true, 1, meaningfulProgressSize - 1); return inputStream; } @@ -136,12 +149,12 @@ public long getMeaningfulProgressSize() { final var ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, randomFiniteRetryingPurpose()) { + () -> copyToBytes(new RetryingInputStream<>(services, randomFiniteRetryingPurpose()) { }) ); assertEquals("This is retry-able", ioException.getMessage()); assertEquals(maxRetries + meaningfulProgressAttempts, services.getAttempts()); - assertEquals(Stream.generate(() -> "read").limit(maxRetries + meaningfulProgressAttempts).toList(), services.getRetryStarted()); + assertEquals(Stream.generate(() -> READ).limit(maxRetries + meaningfulProgressAttempts).toList(), services.getRetryStarted()); } public void testNoSuchFileExceptionAndRangeNotSatisfiedTerminatesWithoutRetry() { @@ -155,7 +168,8 @@ public void testNoSuchFileExceptionAndRangeNotSatisfiedTerminatesWithoutRetry() final var services = new BlobStoreServicesAdapter(retryableFailures * 2) { @Override - public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException { + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { if (failureCounter.getAndDecrement() > 0) { throw new RuntimeException("This is retry-able"); } @@ -164,15 +178,38 @@ public RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, }; final IOException ioException = assertThrows( IOException.class, - () -> copyToBytes(new RetryingInputStream(services, randomRetryingPurpose()) { + () -> copyToBytes(new RetryingInputStream<>(services, randomRetryingPurpose()) { }) ); assertSame(notRetriableException, ioException); assertEquals(retryableFailures + 1, services.getAttempts()); - assertEquals(List.of("open"), services.getRetryStarted()); + assertEquals(List.of(OPEN), services.getRetryStarted()); assertThat(services.getRetrySucceeded(), empty()); } + public void testBlobVersionIsRequestedForSecondAndSubsequentAttempts() throws IOException { + final var resourceBytes = randomBytesReference((int) ByteSizeValue.ofKb(randomIntBetween(5, 200)).getBytes()); + final int numberOfFailures = randomIntBetween(1, 10); + final AtomicInteger failureCounter = new AtomicInteger(numberOfFailures); + final var eTag = randomUUID(); + + final var services = new BlobStoreServicesAdapter(0) { + @Override + public RetryingInputStream.SingleAttemptInputStream doGetInputStream(@Nullable String version, long start, long end) + throws IOException { + if (getAttempts() > 1) { + assertEquals(eTag, version); + } else { + assertNull(version); + } + return createSingleAttemptInputStream(resourceBytes, eTag, (int) start, failureCounter.getAndDecrement() > 0); + } + }; + + copyToBytes(new RetryingInputStream<>(services, OperationPurpose.INDICES) { + }); + } + private static byte[] copyToBytes(InputStream inputStream) throws IOException { final var outputStream = new ByteArrayOutputStream(); if (randomBoolean()) { @@ -189,10 +226,10 @@ private static byte[] copyToBytes(InputStream inputStream) throws IOException { return outputStream.toByteArray(); } - private abstract static class BlobStoreServicesAdapter implements RetryingInputStream.BlobStoreServices { + private abstract static class BlobStoreServicesAdapter implements RetryingInputStream.BlobStoreServices { private final AtomicInteger attemptCounter = new AtomicInteger(); - private final List retryStarted = new ArrayList<>(); + private final List retryStarted = new ArrayList<>(); private final List retrySucceeded = new ArrayList<>(); private final int maxRetries; @@ -200,23 +237,37 @@ private BlobStoreServicesAdapter(int maxRetries) { this.maxRetries = maxRetries; } - public final RetryingInputStream.SingleAttemptInputStream getInputStream(long start, long end) throws IOException { + @Override + public final RetryingInputStream.SingleAttemptInputStream getInputStream(@Nullable String version, long start, long end) + throws IOException { attemptCounter.incrementAndGet(); - return doGetInputStream(start, end); + return doGetInputStream(version, start, end); } - protected abstract RetryingInputStream.SingleAttemptInputStream doGetInputStream(long start, long end) throws IOException; + protected abstract RetryingInputStream.SingleAttemptInputStream doGetInputStream( + @Nullable String version, + long start, + long end + ) throws IOException; @Override - public void onRetryStarted(String action) { + public void onRetryStarted(RetryingInputStream.StreamAction action) { retryStarted.add(action); } @Override - public void onRetrySucceeded(String action, long numberOfRetries) { + public void onRetrySucceeded(RetryingInputStream.StreamAction action, long numberOfRetries) { retrySucceeded.add(new Success(action, numberOfRetries)); } + @Override + public boolean isRetryableException(RetryingInputStream.StreamAction action, Exception e) { + return switch (action) { + case OPEN -> e instanceof RuntimeException; + case READ -> e instanceof IOException; + }; + } + @Override public long getMeaningfulProgressSize() { return Long.MAX_VALUE; @@ -232,13 +283,13 @@ public String getBlobDescription() { return ""; } - record Success(String action, long numberOfRetries) {}; + record Success(RetryingInputStream.StreamAction action, long numberOfRetries) {}; public int getAttempts() { return attemptCounter.get(); } - public List getRetryStarted() { + public List getRetryStarted() { return retryStarted; } @@ -247,38 +298,50 @@ public List getRetrySucceeded() { } } - private static class FailureAtIndexInputStream extends RetryingInputStream.SingleAttemptInputStream { - - private final long firstOffset; - private final InputStream delegate; - private int readRemaining; + private static RetryingInputStream.SingleAttemptInputStream createSingleAttemptInputStream( + BytesReference bytesReference, + String version, + int startIndex, + boolean failBeforeEnd + ) throws IOException { + return createSingleAttemptInputStream(bytesReference, version, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); + } - private FailureAtIndexInputStream(BytesReference bytesReference, int startIndex, boolean failBeforeEnd) throws IOException { - this(bytesReference, startIndex, failBeforeEnd, 1, Integer.MAX_VALUE); + private static RetryingInputStream.SingleAttemptInputStream createSingleAttemptInputStream( + BytesReference bytesReference, + String version, + int startIndex, + boolean failBeforeEnd, + int minimumSuccess, + int maximumSuccess + ) throws IOException { + if (failBeforeEnd) { + return new RetryingInputStream.SingleAttemptInputStream<>( + new FailureAtIndexInputStream(bytesReference, startIndex, minimumSuccess, maximumSuccess), + startIndex, + version + ); } + return new RetryingInputStream.SingleAttemptInputStream<>(inputStreamAtIndex(bytesReference, startIndex), startIndex, version); + } + + private static class FailureAtIndexInputStream extends InputStream { - private FailureAtIndexInputStream( - BytesReference bytesReference, - int startIndex, - boolean failBeforeEnd, - int minimumSuccess, - int maximumSuccess - ) throws IOException { + private final InputStream inputStream; + private int readRemaining; + + private FailureAtIndexInputStream(BytesReference bytesReference, int startIndex, int minimumSuccess, int maximumSuccess) + throws IOException { + this.inputStream = inputStreamAtIndex(bytesReference, startIndex); final int remainingBytes = bytesReference.length() - startIndex; - this.delegate = bytesReference.slice(startIndex, remainingBytes).streamInput(); - if (failBeforeEnd) { - this.readRemaining = randomIntBetween(Math.max(1, minimumSuccess), Math.min(maximumSuccess, remainingBytes / 2)); - } else { - this.readRemaining = Integer.MAX_VALUE; - } - this.firstOffset = startIndex; + this.readRemaining = randomIntBetween(Math.max(1, minimumSuccess), Math.min(maximumSuccess, remainingBytes / 2)); } @Override public int read() throws IOException { if (readRemaining > 0) { readRemaining--; - return delegate.read(); + return inputStream.read(); } else { throw new IOException("This is retry-able"); } @@ -288,11 +351,11 @@ public int read() throws IOException { public String toString() { return "Failing after " + readRemaining; } + } - @Override - protected long getFirstOffset() { - return firstOffset; - } + private static InputStream inputStreamAtIndex(BytesReference bytesReference, int startIndex) throws IOException { + final int remainingBytes = bytesReference.length() - startIndex; + return bytesReference.slice(startIndex, remainingBytes).streamInput(); } public static OperationPurpose randomRetryingPurpose() { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 9d09f53e05b15..cfc4015b2d27e 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.RetryingInputStream; import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -76,6 +77,7 @@ import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.elasticsearch.test.ESTestCase.randomValueOtherThan; +import static org.elasticsearch.test.ESTestCase.randomValueOtherThanMany; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; @@ -471,6 +473,20 @@ public static OperationPurpose randomPurpose() { return randomFrom(OperationPurpose.values()); } + /** + * Random {@link OperationPurpose} that will be retried by {@link RetryingInputStream} + */ + public static OperationPurpose randomRetryingPurpose() { + return randomValueOtherThanMany(purpose -> RetryingInputStream.willRetry(purpose) == false, BlobStoreTestUtil::randomPurpose); + } + + /** + * Random {@link OperationPurpose} that will be retried a finite number of times by {@link RetryingInputStream} + */ + public static OperationPurpose randomFiniteRetryingPurpose() { + return randomValueOtherThanMany(RetryingInputStream::willRetryForever, BlobStoreTestUtil::randomRetryingPurpose); + } + public static OperationPurpose randomNonDataPurpose() { return randomValueOtherThan(OperationPurpose.SNAPSHOT_DATA, BlobStoreTestUtil::randomPurpose); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 8870c79218780..d9fa6ee50f1c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -65,6 +65,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomRetryingPurpose; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -157,7 +158,7 @@ public void testWriteMaybeCopyRead() throws IOException { readBlobName = destinationBlobName; } catch (UnsupportedOperationException ignored) {} } - try (InputStream stream = container.readBlob(randomPurpose(), readBlobName)) { + try (InputStream stream = container.readBlob(randomRetryingPurpose(), readBlobName)) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; @@ -285,7 +286,7 @@ public static byte[] writeRandomBlob(BlobContainer container, String name, int l public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException { byte[] data = new byte[length]; - try (InputStream inputStream = container.readBlob(randomPurpose(), name)) { + try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), name)) { assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length)); assertThat(inputStream.read(), CoreMatchers.equalTo(-1)); }