Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/123630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 123630
summary: Limit number of suppressed S3 deletion errors
area: Snapshot/Restore
type: bug
issues:
- 123354
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ protected BlobContainer createBlobContainer(
final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize
final @Nullable ByteSizeValue bufferSize,
final @Nullable Integer maxBulkDeletes
) {
final Settings.Builder clientSettings = Settings.builder();
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
Expand Down Expand Up @@ -176,7 +177,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final AtomicInteger countDown = new AtomicInteger(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);

// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
final byte[] bytes = randomBytes(1 << 22);
Expand Down Expand Up @@ -205,7 +206,7 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
Expand Down Expand Up @@ -247,7 +248,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);

// HTTP server does not send a response
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
Expand Down Expand Up @@ -300,7 +301,7 @@ public void testWriteLargeBlob() throws IOException {
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());

final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);

httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
Expand Down Expand Up @@ -440,7 +441,7 @@ public String next() {
return Integer.toString(totalDeletesSent++);
}
};
final BlobContainer blobContainer = createBlobContainer(1, null, null, null);
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null);
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;

Expand Down Expand Up @@ -476,7 +477,7 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));

final int maxRetries = randomIntBetween(1, 3);
final BlobContainer container = createBlobContainer(maxRetries, null, null, null);
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null);
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
final String key = randomIdentifier();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

Expand All @@ -69,6 +68,8 @@ class S3BlobStore implements BlobStore {
*/
static final int MAX_BULK_DELETES = 1000;

static final int MAX_DELETE_EXCEPTIONS = 10;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've picked this randomly more or less. Not sure what it should be. I'm not sure having more than this is useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 fine by me


private static final Logger logger = LogManager.getLogger(S3BlobStore.class);

private final S3Service service;
Expand Down Expand Up @@ -340,6 +341,18 @@ public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
}

private static class DeletionExceptions {
Exception exception = null;
private int count = 0;

void useOrMaybeSuppress(Exception e) {
if (count < MAX_DELETE_EXCEPTIONS) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
count++;
}
}
}

void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
if (blobNames.hasNext() == false) {
return;
Expand All @@ -348,19 +361,19 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
final List<String> partition = new ArrayList<>();
try {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final AtomicReference<Exception> aex = new AtomicReference<>();
final var deletionExceptions = new DeletionExceptions();
blobNames.forEachRemaining(key -> {
partition.add(key);
if (partition.size() == bulkDeletionBatchSize) {
deletePartition(purpose, partition, aex);
deletePartition(purpose, partition, deletionExceptions);
partition.clear();
}
});
if (partition.isEmpty() == false) {
deletePartition(purpose, partition, aex);
deletePartition(purpose, partition, deletionExceptions);
}
if (aex.get() != null) {
throw aex.get();
if (deletionExceptions.exception != null) {
throw deletionExceptions.exception;
}
} catch (Exception e) {
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
Expand All @@ -372,9 +385,9 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
*
* @param purpose The {@link OperationPurpose} of the deletion
* @param partition The list of blobs to delete
* @param aex A holder for any exception(s) thrown during the deletion
* @param deletionExceptions A holder for any exception(s) thrown during the deletion
*/
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) {
private void deletePartition(OperationPurpose purpose, List<String> partition, DeletionExceptions deletionExceptions) {
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
int retryCounter = 0;
while (true) {
Expand All @@ -395,7 +408,7 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
),
e
);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
deletionExceptions.useOrMaybeSuppress(e);
return;
} catch (AmazonClientException e) {
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
Expand All @@ -404,13 +417,13 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
retryCounter++;
} else {
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
deletionExceptions.useOrMaybeSuppress(e);
return;
}
} else {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
deletionExceptions.useOrMaybeSuppress(e);
return;
}
}
Expand Down
Loading
Loading