Skip to content

Commit 33195c9

Browse files
committed
Retry S3BlobContainer#getRegister on all exceptions
S3 register reads are subject to the regular client retry policy, but in practice we see failures of these reads sometimes for errors that are transient but for which the SDK does not retry. This commit adds another layer of retries to these reads. Relates ES-9721
1 parent 8240945 commit 33195c9

File tree

5 files changed

+134
-16
lines changed

5 files changed

+134
-16
lines changed

docs/reference/snapshot-restore/repository-s3.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,11 @@ include::repository-shared-settings.asciidoc[]
329329
`1000` which is the maximum number supported by the https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS
330330
ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up dangling multipart uploads.
331331

332+
`get_register_retry_delay`
333+
334+
(<<time-units,time value>>) Sets the time to wait before trying again if an attempt to read a
335+
<<repository-s3-linearizable-registers,linearizable register>> fails. Defaults to `5s`.
336+
332337
NOTE: The option of defining client settings in the repository settings as
333338
documented below is considered deprecated, and will be removed in a future
334339
version.

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.action.support.SubscribableListener;
4141
import org.elasticsearch.action.support.ThreadedActionListener;
4242
import org.elasticsearch.cluster.service.MasterService;
43+
import org.elasticsearch.common.BackoffPolicy;
4344
import org.elasticsearch.common.Randomness;
4445
import org.elasticsearch.common.Strings;
4546
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -910,21 +911,43 @@ public void compareAndExchangeRegister(
910911
@Override
911912
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
912913
ActionListener.completeWith(listener, () -> {
913-
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
914-
S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose);
915-
try (
916-
var clientReference = blobStore.clientReference();
917-
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
918-
var stream = s3Object.getObjectContent()
919-
) {
920-
return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key));
921-
} catch (AmazonS3Exception e) {
922-
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), e);
923-
if (e.getStatusCode() == 404) {
924-
return OptionalBytesReference.EMPTY;
925-
} else {
926-
throw e;
914+
final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS
915+
? BackoffPolicy.noBackoff()
916+
: BackoffPolicy.constantBackoff(blobStore.getGetRegisterRetryDelay(), blobStore.getMaxRetries());
917+
final var retryDelayIterator = backoffPolicy.iterator();
918+
919+
Exception finalException = null;
920+
while (true) {
921+
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
922+
S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose);
923+
try (
924+
var clientReference = blobStore.clientReference();
925+
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
926+
var stream = s3Object.getObjectContent()
927+
) {
928+
return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key));
929+
} catch (Exception attemptException) {
930+
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException);
931+
if (attemptException instanceof AmazonS3Exception amazonS3Exception && amazonS3Exception.getStatusCode() == 404) {
932+
return OptionalBytesReference.EMPTY;
933+
} else if (finalException == null) {
934+
finalException = attemptException;
935+
} else if (finalException != attemptException) {
936+
finalException.addSuppressed(attemptException);
937+
}
927938
}
939+
if (retryDelayIterator.hasNext()) {
940+
try {
941+
// noinspection BusyWait
942+
Thread.sleep(retryDelayIterator.next().millis());
943+
continue;
944+
} catch (InterruptedException interruptedException) {
945+
Thread.currentThread().interrupt();
946+
finalException.addSuppressed(interruptedException);
947+
}
948+
}
949+
950+
throw finalException;
928951
}
929952
});
930953
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ class S3BlobStore implements BlobStore {
9292

9393
private final int bulkDeletionBatchSize;
9494

95+
private final TimeValue getRegisterRetryDelay;
96+
9597
S3BlobStore(
9698
S3Service service,
9799
String bucket,
@@ -116,7 +118,7 @@ class S3BlobStore implements BlobStore {
116118
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
117119
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
118120
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
119-
121+
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
120122
}
121123

122124
RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) {
@@ -409,6 +411,10 @@ public StorageClass getStorageClass() {
409411
return storageClass;
410412
}
411413

414+
public TimeValue getGetRegisterRetryDelay() {
415+
return getRegisterRetryDelay;
416+
}
417+
412418
public static StorageClass initStorageClass(String storageClass) {
413419
if ((storageClass == null) || storageClass.equals("")) {
414420
return StorageClass.Standard;

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,16 @@ class S3Repository extends MeteredBlobStoreRepository {
202202
Setting.Property.Dynamic
203203
);
204204

205+
/**
206+
* Time to wait before trying again if getRegister fails.
207+
*/
208+
static final Setting<TimeValue> GET_REGISTER_RETRY_DELAY = Setting.timeSetting(
209+
"get_register_retry_delay",
210+
new TimeValue(5, TimeUnit.SECONDS),
211+
new TimeValue(0, TimeUnit.MILLISECONDS),
212+
Setting.Property.Dynamic
213+
);
214+
205215
private final S3Service service;
206216

207217
private final String bucket;

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.amazonaws.SdkClientException;
1515
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
1616
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
17+
import com.amazonaws.services.s3.model.AmazonS3Exception;
1718
import com.amazonaws.util.Base16;
1819
import com.sun.net.httpserver.HttpExchange;
1920
import com.sun.net.httpserver.HttpHandler;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.common.blobstore.BlobContainer;
2526
import org.elasticsearch.common.blobstore.BlobPath;
2627
import org.elasticsearch.common.blobstore.OperationPurpose;
28+
import org.elasticsearch.common.blobstore.OptionalBytesReference;
2729
import org.elasticsearch.common.bytes.BytesReference;
2830
import org.elasticsearch.common.io.Streams;
2931
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
@@ -183,7 +185,10 @@ protected BlobContainer createBlobContainer(
183185
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
184186
"repository",
185187
S3Repository.TYPE,
186-
Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build()
188+
Settings.builder()
189+
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
190+
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO)
191+
.build()
187192
);
188193

189194
final S3BlobStore s3BlobStore = new S3BlobStore(
@@ -771,6 +776,75 @@ public void handle(HttpExchange exchange) throws IOException {
771776
assertThat(getRetryHistogramMeasurements(), empty());
772777
}
773778

779+
public void testGetRegisterRetries() {
780+
final var maxRetries = between(0, 3);
781+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
782+
783+
interface FailingHandlerFactory {
784+
void addHandler(String blobName, Integer... responseCodes);
785+
}
786+
787+
final var requestCounter = new AtomicInteger();
788+
final FailingHandlerFactory countingFailingHandlerFactory = (blobName, responseCodes) -> httpServer.createContext(
789+
downloadStorageEndpoint(blobContainer, blobName),
790+
exchange -> {
791+
requestCounter.incrementAndGet();
792+
try (exchange) {
793+
exchange.sendResponseHeaders(randomFrom(responseCodes), -1);
794+
}
795+
}
796+
);
797+
798+
countingFailingHandlerFactory.addHandler("test_register_no_internal_retries", HttpStatus.SC_UNPROCESSABLE_ENTITY);
799+
countingFailingHandlerFactory.addHandler(
800+
"test_register_internal_retries",
801+
HttpStatus.SC_INTERNAL_SERVER_ERROR,
802+
HttpStatus.SC_SERVICE_UNAVAILABLE
803+
);
804+
countingFailingHandlerFactory.addHandler("test_register_not_found", HttpStatus.SC_NOT_FOUND);
805+
806+
{
807+
final var exceptionWithInternalRetries = safeAwaitFailure(
808+
OptionalBytesReference.class,
809+
l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_internal_retries", l)
810+
);
811+
assertThat(exceptionWithInternalRetries, instanceOf(AmazonS3Exception.class));
812+
assertEquals((maxRetries + 1) * (maxRetries + 1), requestCounter.get());
813+
assertEquals(maxRetries, exceptionWithInternalRetries.getSuppressed().length);
814+
}
815+
816+
{
817+
requestCounter.set(0);
818+
final var exceptionWithoutInternalRetries = safeAwaitFailure(
819+
OptionalBytesReference.class,
820+
l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_no_internal_retries", l)
821+
);
822+
assertThat(exceptionWithoutInternalRetries, instanceOf(AmazonS3Exception.class));
823+
assertEquals(maxRetries + 1, requestCounter.get());
824+
assertEquals(maxRetries, exceptionWithoutInternalRetries.getSuppressed().length);
825+
}
826+
827+
{
828+
requestCounter.set(0);
829+
final var repoAnalysisException = safeAwaitFailure(
830+
OptionalBytesReference.class,
831+
l -> blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, "test_register_no_internal_retries", l)
832+
);
833+
assertThat(repoAnalysisException, instanceOf(AmazonS3Exception.class));
834+
assertEquals(1, requestCounter.get());
835+
assertEquals(0, repoAnalysisException.getSuppressed().length);
836+
}
837+
838+
{
839+
requestCounter.set(0);
840+
final OptionalBytesReference expectEmpty = safeAwait(
841+
l -> blobContainer.getRegister(randomPurpose(), "test_register_not_found", l)
842+
);
843+
assertEquals(OptionalBytesReference.EMPTY, expectEmpty);
844+
assertEquals(1, requestCounter.get());
845+
}
846+
}
847+
774848
@Override
775849
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
776850
// some attempts make meaningful progress and do not count towards the max retry limit

0 commit comments

Comments
 (0)