Skip to content

Commit b08d6af

Browse files
committed
Make batches execute concurrently
1 parent 6115885 commit b08d6af

File tree

1 file changed

+19
-17
lines changed

1 file changed

+19
-17
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.netty.buffer.ByteBuf;
1313
import io.netty.buffer.ByteBufAllocator;
1414
import io.netty.util.ReferenceCountUtil;
15+
import reactor.core.Exceptions;
1516
import reactor.core.publisher.Flux;
1617
import reactor.core.publisher.Mono;
1718
import reactor.core.scheduler.Schedulers;
@@ -291,23 +292,24 @@ private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, It
291292
batchResponses.add(batchAsyncClient.submitBatch(currentBatch));
292293
}
293294

294-
// The selective exception handling is easier to do this way than with what is thrown by Mono#whenDelayError
295-
IOException exceptionToThrow = null;
296-
for (Mono<Void> batchResponse : batchResponses) {
297-
try {
298-
batchResponse.block();
299-
} catch (Exception e) {
300-
if (isIgnorableBatchDeleteException(e)) {
301-
continue;
302-
}
303-
if (exceptionToThrow == null) {
304-
exceptionToThrow = new IOException("Error deleting batch");
305-
}
306-
exceptionToThrow.addSuppressed(e);
295+
try {
296+
Mono.whenDelayError(
297+
batchResponses.stream()
298+
.map(m -> m.onErrorResume(AzureBlobStore::isIgnorableBatchDeleteException, t -> Mono.empty()))
299+
.toList()
300+
).block();
301+
} catch (Exception e) {
302+
final IOException exception = new IOException("Error deleting batch", e);
303+
if (Exceptions.isMultiple(e)) {
304+
Exceptions.unwrapMultipleExcludingTracebacks(e)
305+
.stream()
306+
// This is horrific, but there's no way to filter this noise reliably, see BlockingSingleSubscriber#blockingGet
307+
.filter(throwable -> throwable.getMessage().contains("#block terminated with an error") == false)
308+
.forEach(exception::addSuppressed);
309+
} else {
310+
exception.addSuppressed(e);
307311
}
308-
}
309-
if (exceptionToThrow != null) {
310-
throw exceptionToThrow;
312+
throw exception;
311313
}
312314
}
313315

@@ -317,7 +319,7 @@ private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, It
317319
* @param exception An exception throw by batch delete
318320
* @return true if it is safe to ignore, false otherwise
319321
*/
320-
private boolean isIgnorableBatchDeleteException(Exception exception) {
322+
private static boolean isIgnorableBatchDeleteException(Throwable exception) {
321323
if (exception instanceof BlobBatchStorageException bbse) {
322324
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
323325
for (BlobStorageException bse : batchExceptions) {

0 commit comments

Comments
 (0)