Skip to content

Commit 80cbf72

Browse files
committed
Limit number of suppressed S3 deletion errors
1 parent b7e3a1e commit 80cbf72

File tree

2 files changed

+68
-15
lines changed

2 files changed

+68
-15
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class S3BlobStore implements BlobStore {
6969
*/
7070
static final int MAX_BULK_DELETES = 1000;
7171

72+
static final int MAX_DELETE_EXCEPTIONS = 10;
73+
7274
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
7375

7476
private final S3Service service;
@@ -340,6 +342,8 @@ public BlobContainer blobContainer(BlobPath path) {
340342
return new S3BlobContainer(path, this);
341343
}
342344

345+
record DeletionExceptions(Exception exception, int count) {}
346+
343347
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
344348
if (blobNames.hasNext() == false) {
345349
return;
@@ -348,7 +352,7 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
348352
final List<String> partition = new ArrayList<>();
349353
try {
350354
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
351-
final AtomicReference<Exception> aex = new AtomicReference<>();
355+
final AtomicReference<DeletionExceptions> aex = new AtomicReference<>(new DeletionExceptions(null, 0));
352356
blobNames.forEachRemaining(key -> {
353357
partition.add(key);
354358
if (partition.size() == bulkDeletionBatchSize) {
@@ -359,8 +363,8 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
359363
if (partition.isEmpty() == false) {
360364
deletePartition(purpose, partition, aex);
361365
}
362-
if (aex.get() != null) {
363-
throw aex.get();
366+
if (aex.get().exception != null) {
367+
throw aex.get().exception;
364368
}
365369
} catch (Exception e) {
366370
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
@@ -374,7 +378,7 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
374378
* @param partition The list of blobs to delete
375379
* @param aex A holder for any exception(s) thrown during the deletion
376380
*/
377-
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) {
381+
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<DeletionExceptions> aex) {
378382
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
379383
int retryCounter = 0;
380384
while (true) {
@@ -395,7 +399,7 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
395399
),
396400
e
397401
);
398-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
402+
useOrMaybeSuppress(aex, e);
399403
return;
400404
} catch (AmazonClientException e) {
401405
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
@@ -404,19 +408,26 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
404408
retryCounter++;
405409
} else {
406410
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
407-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
411+
useOrMaybeSuppress(aex, e);
408412
return;
409413
}
410414
} else {
411415
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
412416
// remove any keys from the outstanding deletes set.
413-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
417+
useOrMaybeSuppress(aex, e);
414418
return;
415419
}
416420
}
417421
}
418422
}
419423

424+
private void useOrMaybeSuppress(AtomicReference<DeletionExceptions> aex, Exception e) {
425+
var deletionExceptions = aex.get();
426+
if (deletionExceptions.count < MAX_DELETE_EXCEPTIONS) {
427+
aex.set(new DeletionExceptions(ExceptionsHelper.useOrSuppress(deletionExceptions.exception, e), deletionExceptions.count + 1));
428+
}
429+
}
430+
420431
/**
421432
* If there are remaining retries, pause for the configured interval then return true
422433
*

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.telemetry.InstrumentType;
5555
import org.elasticsearch.telemetry.Measurement;
5656
import org.elasticsearch.telemetry.RecordingMeterRegistry;
57+
import org.elasticsearch.test.ESTestCase;
5758
import org.elasticsearch.watcher.ResourceWatcherService;
5859
import org.hamcrest.Matcher;
5960
import org.junit.After;
@@ -162,6 +163,16 @@ protected BlobContainer createBlobContainer(
162163
final @Nullable TimeValue readTimeout,
163164
final @Nullable Boolean disableChunkedEncoding,
164165
final @Nullable ByteSizeValue bufferSize
166+
) {
167+
return createBlobContainer(maxRetries, readTimeout, disableChunkedEncoding, bufferSize, null);
168+
}
169+
170+
protected BlobContainer createBlobContainer(
171+
final @Nullable Integer maxRetries,
172+
final @Nullable TimeValue readTimeout,
173+
final @Nullable Boolean disableChunkedEncoding,
174+
final @Nullable ByteSizeValue bufferSize,
175+
final @Nullable Integer maxBulkDeletes
165176
) {
166177
final Settings.Builder clientSettings = Settings.builder();
167178
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
@@ -192,14 +203,13 @@ protected BlobContainer createBlobContainer(
192203
clientSettings.setSecureSettings(secureSettings);
193204
service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build()));
194205

195-
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
196-
"repository",
197-
S3Repository.TYPE,
198-
Settings.builder()
199-
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
200-
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO)
201-
.build()
202-
);
206+
final var repositorySettings = Settings.builder()
207+
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
208+
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO);
209+
if (maxBulkDeletes != null) {
210+
repositorySettings.put(S3Repository.DELETION_BATCH_SIZE_SETTING.getKey(), maxBulkDeletes);
211+
}
212+
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
203213

204214
final S3BlobStore s3BlobStore = new S3BlobStore(
205215
service,
@@ -1073,6 +1083,38 @@ interface FailingHandlerFactory {
10731083
}
10741084
}
10751085

1086+
public void testSuppressedDeletionErrorsAreCapped() {
1087+
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
1088+
int maxBulkDeleteSize = randomIntBetween(1, 10);
1089+
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
1090+
httpServer.createContext("/", exchange -> {
1091+
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
1092+
exchange.sendResponseHeaders(
1093+
randomFrom(
1094+
HttpStatus.SC_INTERNAL_SERVER_ERROR,
1095+
HttpStatus.SC_BAD_GATEWAY,
1096+
HttpStatus.SC_SERVICE_UNAVAILABLE,
1097+
HttpStatus.SC_GATEWAY_TIMEOUT,
1098+
HttpStatus.SC_NOT_FOUND,
1099+
HttpStatus.SC_UNAUTHORIZED
1100+
),
1101+
-1
1102+
);
1103+
exchange.close();
1104+
} else {
1105+
fail("expected only deletions");
1106+
}
1107+
});
1108+
try {
1109+
var maxNoOfDeletions = 2 * S3BlobStore.MAX_DELETE_EXCEPTIONS;
1110+
var blobs = randomList(1, maxNoOfDeletions * maxBulkDeleteSize, ESTestCase::randomIdentifier);
1111+
blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator());
1112+
fail("deletion should not succeed");
1113+
} catch (IOException e) {
1114+
assertThat(e.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));
1115+
}
1116+
}
1117+
10761118
@Override
10771119
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
10781120
// some attempts make meaningful progress and do not count towards the max retry limit

0 commit comments

Comments
 (0)