Skip to content

Commit 219cf40

Browse files
committed
Use batch delete when deleting blob directory
1 parent 57286c4 commit 219cf40

File tree

1 file changed

+41
-54
lines changed

1 file changed

+41
-54
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.azure.core.util.BinaryData;
2222
import com.azure.storage.blob.BlobAsyncClient;
2323
import com.azure.storage.blob.BlobClient;
24-
import com.azure.storage.blob.BlobContainerAsyncClient;
2524
import com.azure.storage.blob.BlobContainerClient;
2625
import com.azure.storage.blob.BlobServiceAsyncClient;
2726
import com.azure.storage.blob.BlobServiceClient;
@@ -239,30 +238,28 @@ public boolean blobExists(OperationPurpose purpose, String blob) throws IOExcept
239238
}
240239
}
241240

242-
// number of concurrent blob delete requests to use while bulk deleting
243-
private static final int CONCURRENT_DELETES = 100;
244-
245241
public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) {
246242
final AtomicInteger blobsDeleted = new AtomicInteger(0);
247243
final AtomicLong bytesDeleted = new AtomicLong(0);
244+
final List<String> blobNames = new ArrayList<>();
248245

249246
SocketAccess.doPrivilegedVoidException(() -> {
250-
final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient(purpose).getBlobContainerAsyncClient(container);
251-
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path)
252-
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
247+
final AzureBlobServiceClient client = getAzureBlobServiceClientClient(purpose);
248+
final BlobContainerClient blobContainerClient = client.getSyncClient().getBlobContainerClient(container);
253249
try {
254-
blobContainerAsyncClient.listBlobs(options, null).flatMap(blobItem -> {
255-
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
256-
return Mono.empty();
257-
} else {
258-
final String blobName = blobItem.getName();
259-
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
260-
final Mono<Void> deleteTask = getDeleteTask(blobName, blobAsyncClient);
261-
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
262-
blobsDeleted.incrementAndGet();
263-
return deleteTask;
250+
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path)
251+
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
252+
for (BlobItem blobItem : blobContainerClient.listBlobs(options, null)) {
253+
if (blobItem.isPrefix()) {
254+
continue;
264255
}
265-
}, CONCURRENT_DELETES).then().block();
256+
blobNames.add(blobItem.getName());
257+
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
258+
blobsDeleted.incrementAndGet();
259+
}
260+
if (blobNames.isEmpty() == false) {
261+
deleteListOfBlobs(client, blobNames.iterator());
262+
}
266263
} catch (Exception e) {
267264
filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed"));
268265
}
@@ -291,45 +288,35 @@ public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<St
291288
if (blobNames.hasNext() == false) {
292289
return;
293290
}
294-
final AzureBlobServiceClient azureBlobServiceClient = getAzureBlobServiceClientClient(purpose);
295-
SocketAccess.doPrivilegedVoidException(() -> {
296-
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter
297-
// is sent, and we can support all SAS token types
298-
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization
299-
BlobBatchClient batchAsyncClient = new BlobBatchClientBuilder(
300-
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container)
301-
).buildClient();
302-
while (blobNames.hasNext()) {
303-
final BlobBatch currentBatch = batchAsyncClient.getBlobBatch();
304-
int counter = 0;
305-
while (counter < MAX_ELEMENTS_PER_BATCH && blobNames.hasNext()) {
306-
currentBatch.deleteBlob(container, blobNames.next());
307-
counter++;
308-
}
309-
try {
310-
batchAsyncClient.submitBatch(currentBatch);
311-
} catch (BlobBatchStorageException bbse) {
312-
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
313-
for (BlobStorageException bse : batchExceptions) {
314-
// If one of the requests failed with something other than a 404, throw the encompassing exception
315-
if (bse.getStatusCode() != RestStatus.NOT_FOUND.getStatus()) {
316-
throw new IOException("Failed to delete batch", bbse);
317-
}
291+
SocketAccess.doPrivilegedVoidException(() -> deleteListOfBlobs(getAzureBlobServiceClientClient(purpose), blobNames));
292+
}
293+
294+
private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, Iterator<String> blobNames) throws IOException {
295+
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter
296+
// is sent, and we can support all SAS token types
297+
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization
298+
BlobBatchClient batchAsyncClient = new BlobBatchClientBuilder(
299+
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container)
300+
).buildClient();
301+
while (blobNames.hasNext()) {
302+
final BlobBatch currentBatch = batchAsyncClient.getBlobBatch();
303+
int counter = 0;
304+
while (counter < MAX_ELEMENTS_PER_BATCH && blobNames.hasNext()) {
305+
currentBatch.deleteBlob(container, blobNames.next());
306+
counter++;
307+
}
308+
try {
309+
batchAsyncClient.submitBatch(currentBatch);
310+
} catch (BlobBatchStorageException bbse) {
311+
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
312+
for (BlobStorageException bse : batchExceptions) {
313+
// If one of the requests failed with something other than a 404, throw the encompassing exception
314+
if (bse.getStatusCode() != RestStatus.NOT_FOUND.getStatus()) {
315+
throw new IOException("Failed to delete batch", bbse);
318316
}
319317
}
320318
}
321-
});
322-
}
323-
324-
private static Mono<Void> getDeleteTask(String blobName, BlobAsyncClient blobAsyncClient) {
325-
return blobAsyncClient.delete()
326-
// Ignore not found blobs, as it's possible that due to network errors a request
327-
// for an already deleted blob is retried, causing an error.
328-
.onErrorResume(
329-
e -> e instanceof BlobStorageException blobStorageException && blobStorageException.getStatusCode() == 404,
330-
throwable -> Mono.empty()
331-
)
332-
.onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable));
319+
}
333320
}
334321

335322
public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) {

0 commit comments

Comments
 (0)