| 
47 | 47 | import org.elasticsearch.telemetry.InstrumentType;  | 
48 | 48 | import org.elasticsearch.telemetry.Measurement;  | 
49 | 49 | import org.elasticsearch.telemetry.RecordingMeterRegistry;  | 
 | 50 | +import org.elasticsearch.test.ESTestCase;  | 
50 | 51 | import org.elasticsearch.watcher.ResourceWatcherService;  | 
51 | 52 | import org.hamcrest.Matcher;  | 
52 | 53 | import org.junit.After;  | 
@@ -149,7 +150,8 @@ protected BlobContainer createBlobContainer(  | 
149 | 150 |         final @Nullable Integer maxRetries,  | 
150 | 151 |         final @Nullable TimeValue readTimeout,  | 
151 | 152 |         final @Nullable Boolean disableChunkedEncoding,  | 
152 |  | -        final @Nullable ByteSizeValue bufferSize  | 
 | 153 | +        final @Nullable ByteSizeValue bufferSize,  | 
 | 154 | +        final @Nullable Integer maxBulkDeletes  | 
153 | 155 |     ) {  | 
154 | 156 |         final Settings.Builder clientSettings = Settings.builder();  | 
155 | 157 |         final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);  | 
@@ -180,11 +182,11 @@ protected BlobContainer createBlobContainer(  | 
180 | 182 |         clientSettings.setSecureSettings(secureSettings);  | 
181 | 183 |         service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build()));  | 
182 | 184 | 
 
  | 
183 |  | -        final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(  | 
184 |  | -            "repository",  | 
185 |  | -            S3Repository.TYPE,  | 
186 |  | -            Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build()  | 
187 |  | -        );  | 
 | 185 | +        final var repositorySettings = Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName);  | 
 | 186 | +        if (maxBulkDeletes != null) {  | 
 | 187 | +            repositorySettings.put(S3Repository.DELETION_BATCH_SIZE_SETTING.getKey(), maxBulkDeletes);  | 
 | 188 | +        }  | 
 | 189 | +        final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());  | 
188 | 190 | 
 
  | 
189 | 191 |         final S3BlobStore s3BlobStore = new S3BlobStore(  | 
190 | 192 |             service,  | 
@@ -239,7 +241,7 @@ public void testWriteBlobWithRetries() throws Exception {  | 
239 | 241 |         final int maxRetries = randomInt(5);  | 
240 | 242 |         final CountDown countDown = new CountDown(maxRetries + 1);  | 
241 | 243 | 
 
  | 
242 |  | -        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);  | 
 | 244 | +        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);  | 
243 | 245 | 
 
  | 
244 | 246 |         final byte[] bytes = randomBlobContent();  | 
245 | 247 |         httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {  | 
@@ -289,7 +291,7 @@ public void testWriteBlobWithRetries() throws Exception {  | 
289 | 291 |     public void testWriteBlobWithReadTimeouts() {  | 
290 | 292 |         final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));  | 
291 | 293 |         final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));  | 
292 |  | -        final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);  | 
 | 294 | +        final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null);  | 
293 | 295 | 
 
  | 
294 | 296 |         // HTTP server does not send a response  | 
295 | 297 |         httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {  | 
@@ -323,7 +325,7 @@ public void testWriteLargeBlob() throws Exception {  | 
323 | 325 |         final boolean useTimeout = rarely();  | 
324 | 326 |         final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;  | 
325 | 327 |         final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);  | 
326 |  | -        final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);  | 
 | 328 | +        final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);  | 
327 | 329 | 
 
  | 
328 | 330 |         final int parts = randomIntBetween(1, 5);  | 
329 | 331 |         final long lastPartSize = randomLongBetween(10, 512);  | 
@@ -424,7 +426,7 @@ public void testWriteLargeBlobStreaming() throws Exception {  | 
424 | 426 |         final boolean useTimeout = rarely();  | 
425 | 427 |         final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;  | 
426 | 428 |         final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);  | 
427 |  | -        final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);  | 
 | 429 | +        final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);  | 
428 | 430 | 
 
  | 
429 | 431 |         final int parts = randomIntBetween(1, 5);  | 
430 | 432 |         final long lastPartSize = randomLongBetween(10, 512);  | 
@@ -538,7 +540,7 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception {  | 
538 | 540 |             0,  | 
539 | 541 |             randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))  | 
540 | 542 |         );  | 
541 |  | -        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));  | 
 | 543 | +        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);  | 
542 | 544 |         final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);  | 
543 | 545 | 
 
  | 
544 | 546 |         final byte[] bytes = randomBlobContent();  | 
@@ -611,7 +613,7 @@ public void testReadDoesNotRetryForRepositoryAnalysis() {  | 
611 | 613 |             0,  | 
612 | 614 |             randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))  | 
613 | 615 |         );  | 
614 |  | -        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));  | 
 | 616 | +        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);  | 
615 | 617 | 
 
  | 
616 | 618 |         final byte[] bytes = randomBlobContent();  | 
617 | 619 | 
 
  | 
@@ -649,7 +651,7 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException {  | 
649 | 651 |             0,  | 
650 | 652 |             randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))  | 
651 | 653 |         );  | 
652 |  | -        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));  | 
 | 654 | +        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);  | 
653 | 655 |         final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);  | 
654 | 656 | 
 
  | 
655 | 657 |         final byte[] bytes = randomBlobContent(512);  | 
@@ -742,7 +744,7 @@ public void handle(HttpExchange exchange) throws IOException {  | 
742 | 744 | 
 
  | 
743 | 745 |     public void testDoesNotRetryOnNotFound() {  | 
744 | 746 |         final int maxRetries = between(3, 5);  | 
745 |  | -        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);  | 
 | 747 | +        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);  | 
746 | 748 | 
 
  | 
747 | 749 |         final AtomicInteger numberOfReads = new AtomicInteger(0);  | 
748 | 750 |         @SuppressForbidden(reason = "use a http server")  | 
@@ -772,6 +774,38 @@ public void handle(HttpExchange exchange) throws IOException {  | 
772 | 774 |         assertThat(getRetryHistogramMeasurements(), empty());  | 
773 | 775 |     }  | 
774 | 776 | 
 
  | 
 | 777 | +    public void testSuppressedDeletionErrorsAreCapped() {  | 
 | 778 | +        final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));  | 
 | 779 | +        int maxBulkDeleteSize = randomIntBetween(1, 10);  | 
 | 780 | +        final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);  | 
 | 781 | +        httpServer.createContext("/", exchange -> {  | 
 | 782 | +            if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {  | 
 | 783 | +                exchange.sendResponseHeaders(  | 
 | 784 | +                    randomFrom(  | 
 | 785 | +                        HttpStatus.SC_INTERNAL_SERVER_ERROR,  | 
 | 786 | +                        HttpStatus.SC_BAD_GATEWAY,  | 
 | 787 | +                        HttpStatus.SC_SERVICE_UNAVAILABLE,  | 
 | 788 | +                        HttpStatus.SC_GATEWAY_TIMEOUT,  | 
 | 789 | +                        HttpStatus.SC_NOT_FOUND,  | 
 | 790 | +                        HttpStatus.SC_UNAUTHORIZED  | 
 | 791 | +                    ),  | 
 | 792 | +                    -1  | 
 | 793 | +                );  | 
 | 794 | +                exchange.close();  | 
 | 795 | +            } else {  | 
 | 796 | +                fail("expected only deletions");  | 
 | 797 | +            }  | 
 | 798 | +        });  | 
 | 799 | +        var maxNoOfDeletions = 2 * S3BlobStore.MAX_DELETE_EXCEPTIONS;  | 
 | 800 | +        var blobs = randomList(1, maxNoOfDeletions * maxBulkDeleteSize, ESTestCase::randomIdentifier);  | 
 | 801 | +        var exception = expectThrows(  | 
 | 802 | +            IOException.class,  | 
 | 803 | +            "deletion should not succeed",  | 
 | 804 | +            () -> blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator())  | 
 | 805 | +        );  | 
 | 806 | +        assertThat(exception.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));  | 
 | 807 | +    }  | 
 | 808 | + | 
775 | 809 |     @Override  | 
776 | 810 |     protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {  | 
777 | 811 |         // some attempts make meaningful progress and do not count towards the max retry limit  | 
 | 
0 commit comments