Skip to content

Commit 311ccfc

Browse files
committed
Limit number of suppressed S3 deletion errors (elastic#123630)
1 parent 4cf7824 commit 311ccfc

File tree

5 files changed

+87
-37
lines changed

5 files changed

+87
-37
lines changed

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ protected BlobContainer createBlobContainer(
110110
final @Nullable Integer maxRetries,
111111
final @Nullable TimeValue readTimeout,
112112
final @Nullable Boolean disableChunkedEncoding,
113-
final @Nullable ByteSizeValue bufferSize
113+
final @Nullable ByteSizeValue bufferSize,
114+
final @Nullable Integer maxBulkDeletes
114115
) {
115116
final Settings.Builder clientSettings = Settings.builder();
116117
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
@@ -170,7 +171,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
170171
final int maxRetries = randomIntBetween(2, 10);
171172
final AtomicInteger countDown = new AtomicInteger(maxRetries);
172173

173-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
174+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
174175

175176
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
176177
final byte[] bytes = randomBytes(1 << 22);
@@ -199,7 +200,7 @@ public void testWriteBlobWithRetries() throws Exception {
199200
final int maxRetries = randomIntBetween(2, 10);
200201
final CountDown countDown = new CountDown(maxRetries);
201202

202-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
203+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
203204
final byte[] bytes = randomBlobContent();
204205
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
205206
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
@@ -241,7 +242,7 @@ public void testWriteBlobWithRetries() throws Exception {
241242
public void testWriteBlobWithReadTimeouts() {
242243
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
243244
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
244-
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
245+
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);
245246

246247
// HTTP server does not send a response
247248
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
@@ -294,7 +295,7 @@ public void testWriteLargeBlob() throws IOException {
294295
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
295296

296297
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
297-
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
298+
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);
298299

299300
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
300301
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
@@ -434,7 +435,7 @@ public String next() {
434435
return Integer.toString(totalDeletesSent++);
435436
}
436437
};
437-
final BlobContainer blobContainer = createBlobContainer(1, null, null, null);
438+
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null);
438439
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
439440
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
440441

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.concurrent.ConcurrentHashMap;
4949
import java.util.concurrent.Executor;
5050
import java.util.concurrent.TimeUnit;
51-
import java.util.concurrent.atomic.AtomicReference;
5251
import java.util.concurrent.atomic.LongAdder;
5352
import java.util.stream.Collectors;
5453

@@ -65,6 +64,8 @@ class S3BlobStore implements BlobStore {
6564
*/
6665
static final int MAX_BULK_DELETES = 1000;
6766

67+
static final int MAX_DELETE_EXCEPTIONS = 10;
68+
6869
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
6970

7071
private final S3Service service;
@@ -332,6 +333,18 @@ public BlobContainer blobContainer(BlobPath path) {
332333
return new S3BlobContainer(path, this);
333334
}
334335

336+
private static class DeletionExceptions {
337+
Exception exception = null;
338+
private int count = 0;
339+
340+
void useOrMaybeSuppress(Exception e) {
341+
if (count < MAX_DELETE_EXCEPTIONS) {
342+
exception = ExceptionsHelper.useOrSuppress(exception, e);
343+
count++;
344+
}
345+
}
346+
}
347+
335348
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
336349
if (blobNames.hasNext() == false) {
337350
return;
@@ -340,19 +353,19 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
340353
final List<String> partition = new ArrayList<>();
341354
try (AmazonS3Reference clientReference = clientReference()) {
342355
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
343-
final AtomicReference<Exception> aex = new AtomicReference<>();
356+
final var deletionExceptions = new DeletionExceptions();
344357
blobNames.forEachRemaining(key -> {
345358
partition.add(key);
346359
if (partition.size() == bulkDeletionBatchSize) {
347-
deletePartition(purpose, clientReference, partition, aex);
360+
deletePartition(purpose, clientReference, partition, deletionExceptions);
348361
partition.clear();
349362
}
350363
});
351364
if (partition.isEmpty() == false) {
352-
deletePartition(purpose, clientReference, partition, aex);
365+
deletePartition(purpose, clientReference, partition, deletionExceptions);
353366
}
354-
if (aex.get() != null) {
355-
throw aex.get();
367+
if (deletionExceptions.exception != null) {
368+
throw deletionExceptions.exception;
356369
}
357370
} catch (Exception e) {
358371
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
@@ -363,7 +376,7 @@ private void deletePartition(
363376
OperationPurpose purpose,
364377
AmazonS3Reference clientReference,
365378
List<String> partition,
366-
AtomicReference<Exception> aex
379+
DeletionExceptions deletionExceptions
367380
) {
368381
try {
369382
SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(bulkDelete(purpose, this, partition)));
@@ -377,11 +390,11 @@ private void deletePartition(
377390
),
378391
e
379392
);
380-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
393+
deletionExceptions.useOrMaybeSuppress(e);
381394
} catch (AmazonClientException e) {
382395
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
383396
// remove any keys from the outstanding deletes set.
384-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
397+
deletionExceptions.useOrMaybeSuppress(e);
385398
}
386399
}
387400

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.telemetry.InstrumentType;
4848
import org.elasticsearch.telemetry.Measurement;
4949
import org.elasticsearch.telemetry.RecordingMeterRegistry;
50+
import org.elasticsearch.test.ESTestCase;
5051
import org.elasticsearch.watcher.ResourceWatcherService;
5152
import org.hamcrest.Matcher;
5253
import org.junit.After;
@@ -149,7 +150,8 @@ protected BlobContainer createBlobContainer(
149150
final @Nullable Integer maxRetries,
150151
final @Nullable TimeValue readTimeout,
151152
final @Nullable Boolean disableChunkedEncoding,
152-
final @Nullable ByteSizeValue bufferSize
153+
final @Nullable ByteSizeValue bufferSize,
154+
final @Nullable Integer maxBulkDeletes
153155
) {
154156
final Settings.Builder clientSettings = Settings.builder();
155157
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
@@ -180,11 +182,11 @@ protected BlobContainer createBlobContainer(
180182
clientSettings.setSecureSettings(secureSettings);
181183
service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build()));
182184

183-
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
184-
"repository",
185-
S3Repository.TYPE,
186-
Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build()
187-
);
185+
final var repositorySettings = Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName);
186+
if (maxBulkDeletes != null) {
187+
repositorySettings.put(S3Repository.DELETION_BATCH_SIZE_SETTING.getKey(), maxBulkDeletes);
188+
}
189+
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
188190

189191
final S3BlobStore s3BlobStore = new S3BlobStore(
190192
service,
@@ -239,7 +241,7 @@ public void testWriteBlobWithRetries() throws Exception {
239241
final int maxRetries = randomInt(5);
240242
final CountDown countDown = new CountDown(maxRetries + 1);
241243

242-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
244+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
243245

244246
final byte[] bytes = randomBlobContent();
245247
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
@@ -289,7 +291,7 @@ public void testWriteBlobWithRetries() throws Exception {
289291
public void testWriteBlobWithReadTimeouts() {
290292
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
291293
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
292-
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);
294+
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null);
293295

294296
// HTTP server does not send a response
295297
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
@@ -323,7 +325,7 @@ public void testWriteLargeBlob() throws Exception {
323325
final boolean useTimeout = rarely();
324326
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
325327
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
326-
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
328+
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
327329

328330
final int parts = randomIntBetween(1, 5);
329331
final long lastPartSize = randomLongBetween(10, 512);
@@ -424,7 +426,7 @@ public void testWriteLargeBlobStreaming() throws Exception {
424426
final boolean useTimeout = rarely();
425427
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
426428
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
427-
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
429+
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
428430

429431
final int parts = randomIntBetween(1, 5);
430432
final long lastPartSize = randomLongBetween(10, 512);
@@ -538,7 +540,7 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception {
538540
0,
539541
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
540542
);
541-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
543+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
542544
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
543545

544546
final byte[] bytes = randomBlobContent();
@@ -611,7 +613,7 @@ public void testReadDoesNotRetryForRepositoryAnalysis() {
611613
0,
612614
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
613615
);
614-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
616+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
615617

616618
final byte[] bytes = randomBlobContent();
617619

@@ -649,7 +651,7 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException {
649651
0,
650652
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
651653
);
652-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
654+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
653655
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
654656

655657
final byte[] bytes = randomBlobContent(512);
@@ -742,7 +744,7 @@ public void handle(HttpExchange exchange) throws IOException {
742744

743745
public void testDoesNotRetryOnNotFound() {
744746
final int maxRetries = between(3, 5);
745-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
747+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
746748

747749
final AtomicInteger numberOfReads = new AtomicInteger(0);
748750
@SuppressForbidden(reason = "use a http server")
@@ -772,6 +774,38 @@ public void handle(HttpExchange exchange) throws IOException {
772774
assertThat(getRetryHistogramMeasurements(), empty());
773775
}
774776

777+
public void testSuppressedDeletionErrorsAreCapped() {
778+
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
779+
int maxBulkDeleteSize = randomIntBetween(1, 10);
780+
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
781+
httpServer.createContext("/", exchange -> {
782+
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
783+
exchange.sendResponseHeaders(
784+
randomFrom(
785+
HttpStatus.SC_INTERNAL_SERVER_ERROR,
786+
HttpStatus.SC_BAD_GATEWAY,
787+
HttpStatus.SC_SERVICE_UNAVAILABLE,
788+
HttpStatus.SC_GATEWAY_TIMEOUT,
789+
HttpStatus.SC_NOT_FOUND,
790+
HttpStatus.SC_UNAUTHORIZED
791+
),
792+
-1
793+
);
794+
exchange.close();
795+
} else {
796+
fail("expected only deletions");
797+
}
798+
});
799+
var maxNoOfDeletions = 2 * S3BlobStore.MAX_DELETE_EXCEPTIONS;
800+
var blobs = randomList(1, maxNoOfDeletions * maxBulkDeleteSize, ESTestCase::randomIdentifier);
801+
var exception = expectThrows(
802+
IOException.class,
803+
"deletion should not succeed",
804+
() -> blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator())
805+
);
806+
assertThat(exception.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));
807+
}
808+
775809
@Override
776810
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
777811
// some attempts make meaningful progress and do not count towards the max retry limit

modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ protected BlobContainer createBlobContainer(
7676
Integer maxRetries,
7777
TimeValue readTimeout,
7878
Boolean disableChunkedEncoding,
79-
ByteSizeValue bufferSize
79+
ByteSizeValue bufferSize,
80+
Integer maxBulkDeletes
8081
) {
8182
Settings.Builder settingsBuilder = Settings.builder();
8283

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ protected abstract BlobContainer createBlobContainer(
8484
@Nullable Integer maxRetries,
8585
@Nullable TimeValue readTimeout,
8686
@Nullable Boolean disableChunkedEncoding,
87-
@Nullable ByteSizeValue bufferSize
87+
@Nullable ByteSizeValue bufferSize,
88+
@Nullable Integer maxBulkDeletes
8889
);
8990

9091
protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
@@ -93,7 +94,7 @@ protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
9394
}
9495

9596
public void testReadNonexistentBlobThrowsNoSuchFileException() {
96-
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
97+
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null);
9798
final long position = randomLongBetween(0, MAX_RANGE_VAL);
9899
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
99100
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
@@ -120,7 +121,7 @@ public void testReadBlobWithRetries() throws Exception {
120121

121122
final byte[] bytes = randomBlobContent();
122123
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
123-
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
124+
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
124125
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
125126
Streams.readFully(exchange.getRequestBody());
126127
if (countDown.countDown()) {
@@ -177,7 +178,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
177178
final CountDown countDown = new CountDown(maxRetries + 1);
178179

179180
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
180-
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
181+
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
181182
final byte[] bytes = randomBlobContent();
182183
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
183184
Streams.readFully(exchange.getRequestBody());
@@ -249,7 +250,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
249250
public void testReadBlobWithReadTimeouts() {
250251
final int maxRetries = randomInt(5);
251252
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
252-
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
253+
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
253254

254255
// HTTP server does not send a response
255256
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
@@ -306,7 +307,7 @@ protected OperationPurpose randomFiniteRetryingPurpose() {
306307

307308
public void testReadBlobWithNoHttpResponse() {
308309
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
309-
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
310+
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null);
310311

311312
// HTTP server closes connection immediately
312313
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
@@ -326,7 +327,7 @@ public void testReadBlobWithNoHttpResponse() {
326327

327328
public void testReadBlobWithPrematureConnectionClose() {
328329
final int maxRetries = randomInt(20);
329-
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
330+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
330331

331332
final boolean alwaysFlushBody = randomBoolean();
332333

0 commit comments

Comments
 (0)