Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114813.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114813
summary: Retry `S3BlobContainer#getRegister` on all exceptions
area: Snapshot/Restore
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/reference/snapshot-restore/repository-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ include::repository-shared-settings.asciidoc[]
will disable retries altogether. Note that if retries are enabled in the Azure client, each of these retries
comprises that many client-level retries.

`get_register_retry_delay`

(<<time-units,time value>>) Sets the time to wait before trying again if an attempt to read a
<<repository-s3-linearizable-registers,linearizable register>> fails. Defaults to `5s`.

NOTE: The option of defining client settings in the repository settings as
documented below is considered deprecated, and will be removed in a future
version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -910,21 +911,44 @@ public void compareAndExchangeRegister(
@Override
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
ActionListener.completeWith(listener, () -> {
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose);
try (
var clientReference = blobStore.clientReference();
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
var stream = s3Object.getObjectContent()
) {
return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key));
} catch (AmazonS3Exception e) {
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), e);
if (e.getStatusCode() == 404) {
return OptionalBytesReference.EMPTY;
} else {
throw e;
final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS
? BackoffPolicy.noBackoff()
: BackoffPolicy.constantBackoff(blobStore.getGetRegisterRetryDelay(), blobStore.getMaxRetries());
final var retryDelayIterator = backoffPolicy.iterator();

Exception finalException = null;
while (true) {
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
S3BlobStore.configureRequestForMetrics(getObjectRequest, blobStore, Operation.GET_OBJECT, purpose);
try (
var clientReference = blobStore.clientReference();
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
var stream = s3Object.getObjectContent()
) {
return OptionalBytesReference.of(getRegisterUsingConsistentRead(stream, keyPath, key));
} catch (Exception attemptException) {
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException);
if (attemptException instanceof AmazonS3Exception amazonS3Exception && amazonS3Exception.getStatusCode() == 404) {
return OptionalBytesReference.EMPTY;
} else if (finalException == null) {
finalException = attemptException;
} else if (finalException != attemptException) {
finalException.addSuppressed(attemptException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it possible that finalException == attemptException? does something we call throw a singleton exception? I'm not sure I follow this condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who knows what the SDK does or might do in future, but calling addSuppressed with the same exception throws an IllegalArgumentException and we don't want that here.

}
}
if (retryDelayIterator.hasNext()) {
try {
// noinspection BusyWait
Thread.sleep(retryDelayIterator.next().millis());
continue;
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
finalException.addSuppressed(interruptedException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we break out of the loop if we're interrupted? I'm guessing something else in the loop will throw an InterruptedException if we continue, but would it be more polite to bail early?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we do break out here, we fall through to the throw finalException line here. I added a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes

// fall through and throw the exception
}
}

throw finalException;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class S3BlobStore implements BlobStore {
private final int bulkDeletionBatchSize;
private final BackoffPolicy retryThrottledDeleteBackoffPolicy;

private final TimeValue getRegisterRetryDelay;

S3BlobStore(
S3Service service,
String bucket,
Expand All @@ -121,6 +123,7 @@ class S3BlobStore implements BlobStore {
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
}

RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) {
Expand Down Expand Up @@ -468,6 +471,10 @@ public StorageClass getStorageClass() {
return storageClass;
}

public TimeValue getGetRegisterRetryDelay() {
return getRegisterRetryDelay;
}

public static StorageClass initStorageClass(String storageClass) {
if ((storageClass == null) || storageClass.equals("")) {
return StorageClass.Standard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,16 @@ class S3Repository extends MeteredBlobStoreRepository {
0
);

/**
* Time to wait before trying again if getRegister fails.
*/
static final Setting<TimeValue> GET_REGISTER_RETRY_DELAY = Setting.timeSetting(
"get_register_retry_delay",
new TimeValue(5, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic
);

private final S3Service service;

private final String bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.OptionalBytesReference;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
Expand Down Expand Up @@ -191,7 +192,10 @@ protected BlobContainer createBlobContainer(
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
"repository",
S3Repository.TYPE,
Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build()
Settings.builder()
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO)
.build()
);

final S3BlobStore s3BlobStore = new S3BlobStore(
Expand Down Expand Up @@ -945,6 +949,75 @@ private Set<OperationPurpose> operationPurposesThatRetryOnDelete() {
return Set.of(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA);
}

public void testGetRegisterRetries() {
final var maxRetries = between(0, 3);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);

interface FailingHandlerFactory {
void addHandler(String blobName, Integer... responseCodes);
}

final var requestCounter = new AtomicInteger();
final FailingHandlerFactory countingFailingHandlerFactory = (blobName, responseCodes) -> httpServer.createContext(
downloadStorageEndpoint(blobContainer, blobName),
exchange -> {
requestCounter.incrementAndGet();
try (exchange) {
exchange.sendResponseHeaders(randomFrom(responseCodes), -1);
}
}
);

countingFailingHandlerFactory.addHandler("test_register_no_internal_retries", HttpStatus.SC_UNPROCESSABLE_ENTITY);
countingFailingHandlerFactory.addHandler(
"test_register_internal_retries",
HttpStatus.SC_INTERNAL_SERVER_ERROR,
HttpStatus.SC_SERVICE_UNAVAILABLE
);
countingFailingHandlerFactory.addHandler("test_register_not_found", HttpStatus.SC_NOT_FOUND);

{
final var exceptionWithInternalRetries = safeAwaitFailure(
OptionalBytesReference.class,
l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_internal_retries", l)
);
assertThat(exceptionWithInternalRetries, instanceOf(AmazonS3Exception.class));
assertEquals((maxRetries + 1) * (maxRetries + 1), requestCounter.get());
assertEquals(maxRetries, exceptionWithInternalRetries.getSuppressed().length);
}

{
requestCounter.set(0);
final var exceptionWithoutInternalRetries = safeAwaitFailure(
OptionalBytesReference.class,
l -> blobContainer.getRegister(randomRetryingPurpose(), "test_register_no_internal_retries", l)
);
assertThat(exceptionWithoutInternalRetries, instanceOf(AmazonS3Exception.class));
assertEquals(maxRetries + 1, requestCounter.get());
assertEquals(maxRetries, exceptionWithoutInternalRetries.getSuppressed().length);
}

{
requestCounter.set(0);
final var repoAnalysisException = safeAwaitFailure(
OptionalBytesReference.class,
l -> blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, "test_register_no_internal_retries", l)
);
assertThat(repoAnalysisException, instanceOf(AmazonS3Exception.class));
assertEquals(1, requestCounter.get());
assertEquals(0, repoAnalysisException.getSuppressed().length);
}

{
requestCounter.set(0);
final OptionalBytesReference expectEmpty = safeAwait(
l -> blobContainer.getRegister(randomPurpose(), "test_register_not_found", l)
);
assertEquals(OptionalBytesReference.EMPTY, expectEmpty);
assertEquals(1, requestCounter.get());
}
}

@Override
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
// some attempts make meaningful progress and do not count towards the max retry limit
Expand Down