Skip to content

Commit 113f0c1

Browse files
authored
Limit number of suppressed S3 deletion errors (#123630)
We've seen this being an issue on 7.x although can happen on all versions (I'm pretty sure this PR doesn't cleanly back-port to 7.x though). Closes #123354
1 parent a320809 commit 113f0c1

File tree

6 files changed

+104
-49
lines changed

6 files changed

+104
-49
lines changed

docs/changelog/123630.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123630
2+
summary: Limit number of suppressed S3 deletion errors
3+
area: Snapshot/Restore
4+
type: bug
5+
issues:
6+
- 123354

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ protected BlobContainer createBlobContainer(
115115
final @Nullable Integer maxRetries,
116116
final @Nullable TimeValue readTimeout,
117117
final @Nullable Boolean disableChunkedEncoding,
118-
final @Nullable ByteSizeValue bufferSize
118+
final @Nullable ByteSizeValue bufferSize,
119+
final @Nullable Integer maxBulkDeletes
119120
) {
120121
final Settings.Builder clientSettings = Settings.builder();
121122
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
@@ -176,7 +177,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
176177
final int maxRetries = randomIntBetween(2, 10);
177178
final AtomicInteger countDown = new AtomicInteger(maxRetries);
178179

179-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
180+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
180181

181182
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
182183
final byte[] bytes = randomBytes(1 << 22);
@@ -205,7 +206,7 @@ public void testWriteBlobWithRetries() throws Exception {
205206
final int maxRetries = randomIntBetween(2, 10);
206207
final CountDown countDown = new CountDown(maxRetries);
207208

208-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
209+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
209210
final byte[] bytes = randomBlobContent();
210211
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
211212
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
@@ -247,7 +248,7 @@ public void testWriteBlobWithRetries() throws Exception {
247248
public void testWriteBlobWithReadTimeouts() {
248249
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
249250
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
250-
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
251+
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);
251252

252253
// HTTP server does not send a response
253254
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
@@ -300,7 +301,7 @@ public void testWriteLargeBlob() throws IOException {
300301
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
301302

302303
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
303-
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
304+
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);
304305

305306
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
306307
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
@@ -440,7 +441,7 @@ public String next() {
440441
return Integer.toString(totalDeletesSent++);
441442
}
442443
};
443-
final BlobContainer blobContainer = createBlobContainer(1, null, null, null);
444+
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null);
444445
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
445446
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
446447

@@ -476,7 +477,7 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
476477
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));
477478

478479
final int maxRetries = randomIntBetween(1, 3);
479-
final BlobContainer container = createBlobContainer(maxRetries, null, null, null);
480+
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null);
480481
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
481482
final String key = randomIdentifier();
482483

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import java.util.concurrent.ConcurrentHashMap;
5353
import java.util.concurrent.Executor;
5454
import java.util.concurrent.TimeUnit;
55-
import java.util.concurrent.atomic.AtomicReference;
5655
import java.util.concurrent.atomic.LongAdder;
5756
import java.util.stream.Collectors;
5857

@@ -69,6 +68,8 @@ class S3BlobStore implements BlobStore {
6968
*/
7069
static final int MAX_BULK_DELETES = 1000;
7170

71+
static final int MAX_DELETE_EXCEPTIONS = 10;
72+
7273
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
7374

7475
private final S3Service service;
@@ -340,6 +341,18 @@ public BlobContainer blobContainer(BlobPath path) {
340341
return new S3BlobContainer(path, this);
341342
}
342343

344+
private static class DeletionExceptions {
345+
Exception exception = null;
346+
private int count = 0;
347+
348+
void useOrMaybeSuppress(Exception e) {
349+
if (count < MAX_DELETE_EXCEPTIONS) {
350+
exception = ExceptionsHelper.useOrSuppress(exception, e);
351+
count++;
352+
}
353+
}
354+
}
355+
343356
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
344357
if (blobNames.hasNext() == false) {
345358
return;
@@ -348,19 +361,19 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
348361
final List<String> partition = new ArrayList<>();
349362
try {
350363
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
351-
final AtomicReference<Exception> aex = new AtomicReference<>();
364+
final var deletionExceptions = new DeletionExceptions();
352365
blobNames.forEachRemaining(key -> {
353366
partition.add(key);
354367
if (partition.size() == bulkDeletionBatchSize) {
355-
deletePartition(purpose, partition, aex);
368+
deletePartition(purpose, partition, deletionExceptions);
356369
partition.clear();
357370
}
358371
});
359372
if (partition.isEmpty() == false) {
360-
deletePartition(purpose, partition, aex);
373+
deletePartition(purpose, partition, deletionExceptions);
361374
}
362-
if (aex.get() != null) {
363-
throw aex.get();
375+
if (deletionExceptions.exception != null) {
376+
throw deletionExceptions.exception;
364377
}
365378
} catch (Exception e) {
366379
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
@@ -372,9 +385,9 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
372385
*
373386
* @param purpose The {@link OperationPurpose} of the deletion
374387
* @param partition The list of blobs to delete
375-
* @param aex A holder for any exception(s) thrown during the deletion
388+
* @param deletionExceptions A holder for any exception(s) thrown during the deletion
376389
*/
377-
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) {
390+
private void deletePartition(OperationPurpose purpose, List<String> partition, DeletionExceptions deletionExceptions) {
378391
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
379392
int retryCounter = 0;
380393
while (true) {
@@ -395,7 +408,7 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
395408
),
396409
e
397410
);
398-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
411+
deletionExceptions.useOrMaybeSuppress(e);
399412
return;
400413
} catch (AmazonClientException e) {
401414
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
@@ -404,13 +417,13 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
404417
retryCounter++;
405418
} else {
406419
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
407-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
420+
deletionExceptions.useOrMaybeSuppress(e);
408421
return;
409422
}
410423
} else {
411424
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
412425
// remove any keys from the outstanding deletes set.
413-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
426+
deletionExceptions.useOrMaybeSuppress(e);
414427
return;
415428
}
416429
}

0 commit comments

Comments
 (0)