Skip to content

Commit 9082e02

Browse files
authored
Retry S3BlobContainer#getRegister on all exceptions (#114813)
S3 register reads are subject to the regular client retry policy, but in practice we see failures of these reads sometimes for errors that are transient but for which the SDK does not retry. This commit adds another layer of retries to these reads. Relates ES-9721
1 parent a7e62f5 commit 9082e02

File tree

6 files changed

+139
-15
lines changed

6 files changed

+139
-15
lines changed

docs/changelog/114813.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114813
2+
summary: Retry `S3BlobContainer#getRegister` on all exceptions
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues: []

docs/reference/snapshot-restore/repository-s3.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,11 @@ include::repository-shared-settings.asciidoc[]
343343
will disable retries altogether. Note that if retries are enabled in the Azure client, each of these retries
344344
comprises that many client-level retries.
345345

346+
`get_register_retry_delay`
347+
348+
(<<time-units,time value>>) Sets the time to wait before trying again if an attempt to read a
349+
<<repository-s3-linearizable-registers,linearizable register>> fails. Defaults to `5s`.
350+
346351
NOTE: The option of defining client settings in the repository settings as
347352
documented below is considered deprecated, and will be removed in a future
348353
version.

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.action.support.SubscribableListener;
4141
import org.elasticsearch.action.support.ThreadedActionListener;
4242
import org.elasticsearch.cluster.service.MasterService;
43+
import org.elasticsearch.common.BackoffPolicy;
4344
import org.elasticsearch.common.Randomness;
4445
import org.elasticsearch.common.Strings;
4546
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -910,21 +911,44 @@ public void compareAndExchangeRegister(
910911
@Override
911912
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
912913
ActionListener.completeWith(listener, () -> {
913-
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
914-
S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose);
915-
try (
916-
var clientReference = blobStore.clientReference();
917-
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
918-
var stream = s3Object.getObjectContent()
919-
) {
920-
return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key));
921-
} catch (AmazonS3Exception e) {
922-
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), e);
923-
if (e.getStatusCode() == 404) {
924-
return OptionalBytesReference.EMPTY;
925-
} else {
926-
throw e;
914+
final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS
915+
? BackoffPolicy.noBackoff()
916+
: BackoffPolicy.constantBackoff(blobStore.getGetRegisterRetryDelay(), blobStore.getMaxRetries());
917+
final var retryDelayIterator = backoffPolicy.iterator();
918+
919+
Exception finalException = null;
920+
while (true) {
921+
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
922+
S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose);
923+
try (
924+
var clientReference = blobStore.clientReference();
925+
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
926+
var stream = s3Object.getObjectContent()
927+
) {
928+
return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key));
929+
} catch (Exception attemptException) {
930+
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException);
931+
if (attemptException instanceof AmazonS3Exception amazonS3Exception && amazonS3Exception.getStatusCode() == 404) {
932+
return OptionalBytesReference.EMPTY;
933+
} else if (finalException == null) {
934+
finalException = attemptException;
935+
} else if (finalException != attemptException) {
936+
finalException.addSuppressed(attemptException);
937+
}
927938
}
939+
if (retryDelayIterator.hasNext()) {
940+
try {
941+
// noinspection BusyWait
942+
Thread.sleep(retryDelayIterator.next().millis());
943+
continue;
944+
} catch (InterruptedException interruptedException) {
945+
Thread.currentThread().interrupt();
946+
finalException.addSuppressed(interruptedException);
947+
// fall through and throw the exception
948+
}
949+
}
950+
951+
throw finalException;
928952
}
929953
});
930954
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ class S3BlobStore implements BlobStore {
9595
private final int bulkDeletionBatchSize;
9696
private final BackoffPolicy retryThrottledDeleteBackoffPolicy;
9797

98+
private final TimeValue getRegisterRetryDelay;
99+
98100
S3BlobStore(
99101
S3Service service,
100102
String bucket,
@@ -121,6 +123,7 @@ class S3BlobStore implements BlobStore {
121123
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
122124
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
123125
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
126+
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
124127
}
125128

126129
RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) {
@@ -468,6 +471,10 @@ public StorageClass getStorageClass() {
468471
return storageClass;
469472
}
470473

474+
public TimeValue getGetRegisterRetryDelay() {
475+
return getRegisterRetryDelay;
476+
}
477+
471478
public static StorageClass initStorageClass(String storageClass) {
472479
if ((storageClass == null) || storageClass.equals("")) {
473480
return StorageClass.Standard;

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,16 @@ class S3Repository extends MeteredBlobStoreRepository {
223223
0
224224
);
225225

226+
/**
227+
* Time to wait before trying again if getRegister fails.
228+
*/
229+
static final Setting<TimeValue> GET_REGISTER_RETRY_DELAY = Setting.timeSetting(
230+
"get_register_retry_delay",
231+
new TimeValue(5, TimeUnit.SECONDS),
232+
new TimeValue(0, TimeUnit.MILLISECONDS),
233+
Setting.Property.Dynamic
234+
);
235+
226236
private final S3Service service;
227237

228238
private final String bucket;

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.blobstore.BlobContainer;
2929
import org.elasticsearch.common.blobstore.BlobPath;
3030
import org.elasticsearch.common.blobstore.OperationPurpose;
31+
import org.elasticsearch.common.blobstore.OptionalBytesReference;
3132
import org.elasticsearch.common.bytes.BytesReference;
3233
import org.elasticsearch.common.io.Streams;
3334
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
@@ -191,7 +192,10 @@ protected BlobContainer createBlobContainer(
191192
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
192193
"repository",
193194
S3Repository.TYPE,
194-
Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build()
195+
Settings.builder()
196+
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
197+
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO)
198+
.build()
195199
);
196200

197201
final S3BlobStore s3BlobStore = new S3BlobStore(
@@ -945,6 +949,75 @@ private Set<OperationPurpose> operationPurposesThatRetryOnDelete() {
945949
return Set.of(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA);
946950
}
947951

952+
public void testGetRegisterRetries() {
953+
final var maxRetries = between(0, 3);
954+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
955+
956+
interface FailingHandlerFactory {
957+
void addHandler(String blobName, Integer... responseCodes);
958+
}
959+
960+
final var requestCounter = new AtomicInteger();
961+
final FailingHandlerFactory countingFailingHandlerFactory = (blobName, responseCodes) -> httpServer.createContext(
962+
downloadStorageEndpoint(blobContainer, blobName),
963+
exchange -> {
964+
requestCounter.incrementAndGet();
965+
try (exchange) {
966+
exchange.sendResponseHeaders(randomFrom(responseCodes), -1);
967+
}
968+
}
969+
);
970+
971+
countingFailingHandlerFactory.addHandler("test_register_no_internal_retries", HttpStatus.SC_UNPROCESSABLE_ENTITY);
972+
countingFailingHandlerFactory.addHandler(
973+
"test_register_internal_retries",
974+
HttpStatus.SC_INTERNAL_SERVER_ERROR,
975+
HttpStatus.SC_SERVICE_UNAVAILABLE
976+
);
977+
countingFailingHandlerFactory.addHandler("test_register_not_found", HttpStatus.SC_NOT_FOUND);
978+
979+
{
980+
final var exceptionWithInternalRetries = safeAwaitFailure(
981+
OptionalBytesReference.class,
982+
l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_internal_retries", l)
983+
);
984+
assertThat(exceptionWithInternalRetries, instanceOf(AmazonS3Exception.class));
985+
assertEquals((maxRetries + 1) * (maxRetries + 1), requestCounter.get());
986+
assertEquals(maxRetries, exceptionWithInternalRetries.getSuppressed().length);
987+
}
988+
989+
{
990+
requestCounter.set(0);
991+
final var exceptionWithoutInternalRetries = safeAwaitFailure(
992+
OptionalBytesReference.class,
993+
l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_no_internal_retries", l)
994+
);
995+
assertThat(exceptionWithoutInternalRetries, instanceOf(AmazonS3Exception.class));
996+
assertEquals(maxRetries + 1, requestCounter.get());
997+
assertEquals(maxRetries, exceptionWithoutInternalRetries.getSuppressed().length);
998+
}
999+
1000+
{
1001+
requestCounter.set(0);
1002+
final var repoAnalysisException = safeAwaitFailure(
1003+
OptionalBytesReference.class,
1004+
l -> blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, "test_register_no_internal_retries", l)
1005+
);
1006+
assertThat(repoAnalysisException, instanceOf(AmazonS3Exception.class));
1007+
assertEquals(1, requestCounter.get());
1008+
assertEquals(0, repoAnalysisException.getSuppressed().length);
1009+
}
1010+
1011+
{
1012+
requestCounter.set(0);
1013+
final OptionalBytesReference expectEmpty = safeAwait(
1014+
l -> blobContainer.getRegister(randomPurpose(), "test_register_not_found", l)
1015+
);
1016+
assertEquals(OptionalBytesReference.EMPTY, expectEmpty);
1017+
assertEquals(1, requestCounter.get());
1018+
}
1019+
}
1020+
9481021
@Override
9491022
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
9501023
// some attempts make meaningful progress and do not count towards the max retry limit

0 commit comments

Comments
 (0)