Skip to content

Commit 87b08e6

Browse files
committed
Submit all batch deletes concurrently
1 parent d5aa10f commit 87b08e6

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.azure.storage.blob.BlobServiceAsyncClient;
2626
import com.azure.storage.blob.BlobServiceClient;
2727
import com.azure.storage.blob.batch.BlobBatch;
28-
import com.azure.storage.blob.batch.BlobBatchClient;
28+
import com.azure.storage.blob.batch.BlobBatchAsyncClient;
2929
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
3030
import com.azure.storage.blob.batch.BlobBatchStorageException;
3131
import com.azure.storage.blob.models.BlobErrorCode;
@@ -295,27 +295,31 @@ private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, It
295295
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter
296296
// is sent, and we can support all SAS token types
297297
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization
298-
BlobBatchClient batchClient = new BlobBatchClientBuilder(
298+
final BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder(
299299
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container)
300-
).buildClient();
300+
).buildAsyncClient();
301+
final List<Mono<Void>> batchResponses = new ArrayList<>();
301302
while (blobNames.hasNext()) {
302-
final BlobBatch currentBatch = batchClient.getBlobBatch();
303+
final BlobBatch currentBatch = batchAsyncClient.getBlobBatch();
303304
int counter = 0;
304305
while (counter < MAX_ELEMENTS_PER_BATCH && blobNames.hasNext()) {
305306
currentBatch.deleteBlob(container, blobNames.next());
306307
counter++;
307308
}
308-
try {
309-
batchClient.submitBatch(currentBatch);
310-
} catch (BlobBatchStorageException bbse) {
311-
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
312-
for (BlobStorageException bse : batchExceptions) {
313-
// If one of the requests failed with something other than a 404, throw the encompassing exception
314-
if (bse.getStatusCode() != RestStatus.NOT_FOUND.getStatus()) {
315-
throw new IOException("Failed to delete batch", bbse);
316-
}
309+
batchResponses.add(batchAsyncClient.submitBatch(currentBatch));
310+
}
311+
try {
312+
Flux.merge(batchResponses).collectList().block();
313+
} catch (BlobBatchStorageException bbse) {
314+
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
315+
for (BlobStorageException bse : batchExceptions) {
316+
// If one of the requests failed with something other than a BLOB_NOT_FOUND, throw the encompassing exception
317+
if (BlobErrorCode.BLOB_NOT_FOUND.equals(bse.getErrorCode()) == false) {
318+
throw new IOException("Failed to delete batch", bbse);
317319
}
318320
}
321+
} catch (Exception e) {
322+
throw new IOException("Unable to delete blobs");
319323
}
320324
}
321325

0 commit comments

Comments
 (0)