Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1ebbae2
Use common retry logic for GCS
nicktindall Nov 25, 2025
915be8a
Merge branch 'main' into use_common_retry_logic_gcs
nicktindall Nov 25, 2025
709fdf1
Use FilterInputStream for SingleAttemptInputStream (delegate close etc.)
nicktindall Nov 25, 2025
f0fca99
Account for bytes skipped in offset
nicktindall Nov 25, 2025
a078ff8
Make SingleAttemptInputStream a decorator
nicktindall Nov 26, 2025
39f8db6
Allow CSPs to define what is retry-able
nicktindall Nov 26, 2025
f5502d5
Merge remote-tracking branch 'origin/main' into use_common_retry_logi…
nicktindall Nov 26, 2025
1f4b905
Remove forbidden API call (toLowerCase)
nicktindall Nov 26, 2025
b70d52f
Remove misleading comments
nicktindall Nov 26, 2025
5c03349
Update main javadoc to indicate why this class exists
nicktindall Nov 26, 2025
4981c7e
Improve Javadoc for RetryingInputStream
nicktindall Nov 26, 2025
fabe579
Remove copied config, configure randomXXXPurpose correctly for GCP, t…
nicktindall Nov 26, 2025
d137a49
Fix metric value
nicktindall Nov 26, 2025
55c98eb
Override retry delay for retry tests
nicktindall Nov 26, 2025
732121f
Align behaviour with current
nicktindall Nov 26, 2025
9da242f
Merge branch 'main' into use_common_retry_logic_gcs
nicktindall Nov 26, 2025
63d6018
Use retrying purpose when we expect reads to succeed
nicktindall Nov 26, 2025
f6c2497
Merge branch 'main' into use_common_retry_logic_gcs
nicktindall Nov 26, 2025
9d0e15b
Merge remote-tracking branch 'origin/main' into use_common_retry_logi…
nicktindall Nov 26, 2025
36a5821
Merge remote-tracking branch 'origin/use_common_retry_logic_gcs' into…
nicktindall Nov 26, 2025
4636d9a
Update docs/changelog/138553.yaml
nicktindall Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import fixture.gcs.GoogleCloudStorageHttpHandler;
import fixture.gcs.TestUtils;

import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRetryStrategy;
Expand Down Expand Up @@ -60,9 +59,11 @@
import java.util.Map;

import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase.randomRetryingPurpose;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BASE_PATH;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
Expand Down Expand Up @@ -118,6 +119,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
settings.put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl());
settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token");
settings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), 6);

final MockSecureSettings secureSettings = new MockSecureSettings();
final byte[] serviceAccount = TestUtils.createServiceAccount(random());
Expand Down Expand Up @@ -195,7 +197,7 @@ public void testWriteReadLarge() throws IOException {
random().nextBytes(data);
writeBlob(container, "foobar", new BytesArray(data), false);
}
try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) {
try (InputStream stream = container.readBlob(randomRetryingPurpose(), "foobar")) {
BytesRefBuilder target = new BytesRefBuilder();
while (target.length() < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
Expand All @@ -218,7 +220,7 @@ public void testWriteFileMultipleOfChunkSize() throws IOException {
byte[] initialValue = randomByteArrayOfLength(uploadSize);
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);

BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
BytesReference reference = readFully(container.readBlob(randomRetryingPurpose(), key));
assertEquals(new BytesArray(initialValue), reference);

container.deleteBlobsIgnoringIfNotExists(randomPurpose(), Iterators.single(key));
Expand All @@ -242,24 +244,18 @@ protected GoogleCloudStorageService createStorageService(ClusterService clusterS
@Override
StorageOptions createStorageOptions(
final GoogleCloudStorageClientSettings gcsClientSettings,
final HttpTransportOptions httpTransportOptions
final HttpTransportOptions httpTransportOptions,
final RetryBehaviour retryBehaviour
) {
StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions);
StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions, retryBehaviour);
return options.toBuilder()
.setStorageRetryStrategy(StorageRetryStrategy.getLegacyStorageRetryStrategy())
.setHost(options.getHost())
.setCredentials(options.getCredentials())
.setRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
options.getRetrySettings()
.toBuilder()
.setInitialRetryDelay(Duration.ofMillis(10L))
.setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier())
.setMaxRetryDelay(Duration.ofSeconds(1L))
.setMaxAttempts(0)
.setJittered(false)
Copy link
Contributor Author

@nicktindall nicktindall Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test originally configured retries to be time-based (i.e. no limit on the attempts, just keep retrying for some amount of time). I changed it to just make the retry intervals small and depend on the configured retry limits because we don't support time-based retries anymore.

.setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout())
.setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
.setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout())
.build()
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testResumeAfterUpdate() {
executeOnBlobStore(repo, container -> {
container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true);

try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) {
try (InputStream inputStream = container.readBlob(randomRetryingPurpose(), blobKey)) {
Copy link
Contributor Author

@nicktindall nicktindall Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to be careful where we use randomPurpose() now because some purposes no longer retry (e.g. REPOSITORY_ANALYSIS)

// Trigger the first request for the blob, partially read it
int read = inputStream.read();
assert read != -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,32 @@ class GoogleCloudStorageBlobStore implements BlobStore {
this.casBackoffPolicy = casBackoffPolicy;
}

private MeteredStorage client() throws IOException {
return storageService.client(projectId, clientName, repositoryName, statsCollector);
/**
* Get a client that will retry according to its configured settings
*
* @return A client
*/
MeteredStorage client() throws IOException {
return storageService.client(
projectId,
clientName,
repositoryName,
statsCollector,
GoogleCloudStorageService.RetryBehaviour.ClientConfigured
);
}

/**
* Get a client that will not retry on failure
*
* @return A client with max retries configured to zero
*/
MeteredStorage clientNoRetries() throws IOException {
return storageService.client(projectId, clientName, repositoryName, statsCollector, GoogleCloudStorageService.RetryBehaviour.None);
}

int getMaxRetries() {
return storageService.clientSettings(projectId, clientName).getMaxRetries();
}

@Override
Expand Down Expand Up @@ -229,7 +253,7 @@ boolean blobExists(OperationPurpose purpose, String blobName) throws IOException
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
return new GoogleCloudStorageRetryingInputStream(purpose, client(), BlobId.of(bucketName, blobName));
return new GoogleCloudStorageRetryingInputStream(this, purpose, BlobId.of(bucketName, blobName));
}

/**
Expand All @@ -252,8 +276,8 @@ InputStream readBlob(OperationPurpose purpose, String blobName, long position, l
return new ByteArrayInputStream(new byte[0]);
} else {
return new GoogleCloudStorageRetryingInputStream(
this,
purpose,
client(),
BlobId.of(bucketName, blobName),
position,
Math.addExact(position, length - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ public class GoogleCloudStorageClientSettings {
() -> PROXY_HOST_SETTING
);

/**
* The maximum number of retries to use when a GCS request fails.
* <p>
* Default to 5 to match {@link com.google.cloud.ServiceOptions#getDefaultRetrySettings()}
*/
static final Setting.AffixSetting<Integer> MAX_RETRIES_SETTING = Setting.affixKeySetting(
PREFIX,
"max_retries",
(key) -> Setting.intSetting(key, 5, 0, Setting.Property.NodeScope)
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never configured number of retries for GCS previously. The default settings were for 6 attempts (aka 5 retries)


/** The credentials used by the client to connect to the Storage endpoint. */
private final ServiceAccountCredentials credential;

Expand All @@ -144,6 +155,8 @@ public class GoogleCloudStorageClientSettings {
@Nullable
private final Proxy proxy;

private final int maxRetries;

GoogleCloudStorageClientSettings(
final ServiceAccountCredentials credential,
final String endpoint,
Expand All @@ -152,7 +165,8 @@ public class GoogleCloudStorageClientSettings {
final TimeValue readTimeout,
final String applicationName,
final URI tokenUri,
final Proxy proxy
final Proxy proxy,
final int maxRetries
) {
this.credential = credential;
this.endpoint = endpoint;
Expand All @@ -162,6 +176,7 @@ public class GoogleCloudStorageClientSettings {
this.applicationName = applicationName;
this.tokenUri = tokenUri;
this.proxy = proxy;
this.maxRetries = maxRetries;
}

public ServiceAccountCredentials getCredential() {
Expand Down Expand Up @@ -197,6 +212,10 @@ public Proxy getProxy() {
return proxy;
}

public int getMaxRetries() {
return maxRetries;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Expand Down Expand Up @@ -249,7 +268,8 @@ static GoogleCloudStorageClientSettings getClientSettings(final Settings setting
getConfigValue(settings, clientName, READ_TIMEOUT_SETTING),
getConfigValue(settings, clientName, APPLICATION_NAME_SETTING),
getConfigValue(settings, clientName, TOKEN_URI_SETTING),
proxy
proxy,
getConfigValue(settings, clientName, MAX_RETRIES_SETTING)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public List<Setting<?>> getSettings() {
GoogleCloudStorageClientSettings.TOKEN_URI_SETTING,
GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING,
GoogleCloudStorageClientSettings.PROXY_HOST_SETTING,
GoogleCloudStorageClientSettings.PROXY_PORT_SETTING
GoogleCloudStorageClientSettings.PROXY_PORT_SETTING,
GoogleCloudStorageClientSettings.MAX_RETRIES_SETTING
);
}

Expand Down
Loading