Skip to content

Commit 7599d4c

Browse files
authored
Use Azure blob batch API to delete blobs in batches (#114566)
Closes ES-9777
1 parent cc6e741 commit 7599d4c

File tree

14 files changed

+374
-88
lines changed

14 files changed

+374
-88
lines changed

docs/changelog/114566.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114566
2+
summary: Use Azure blob batch API to delete blobs in batches
3+
area: Distributed
4+
type: enhancement
5+
issues: []

docs/reference/snapshot-restore/repository-azure.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,15 @@ include::repository-shared-settings.asciidoc[]
259259
`primary_only` or `secondary_only`. Defaults to `primary_only`. Note that if you set it
260260
to `secondary_only`, it will force `readonly` to true.
261261

262+
`delete_objects_max_size`::
263+
264+
(integer) Sets the maxmimum batch size, betewen 1 and 256, used for `BlobBatch` requests. Defaults to 256 which is the maximum
265+
number supported by the https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks[Azure blob batch API].
266+
267+
`max_concurrent_batch_deletes`::
268+
269+
(integer) Sets the maximum number of concurrent batch delete requests that will be submitted for any individual bulk delete with `BlobBatch`. Note that the effective number of concurrent deletes is further limited by the Azure client connection and event loop thread limits. Defaults to 10, minimum is 1, maximum is 100.
270+
262271
[[repository-azure-validation]]
263272
==== Repository validation rules
264273

gradle/verification-metadata.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@
144144
<sha256 value="31915426834400cac854f48441c168d55aa6fc054527f28f1d242a7067affd14" origin="Generated by Gradle"/>
145145
</artifact>
146146
</component>
147+
<component group="com.azure" name="azure-storage-blob-batch" version="12.23.1">
148+
<artifact name="azure-storage-blob-batch-12.23.1.jar">
149+
<sha256 value="8c11749c783222873f63f22575aa5ae7ee8f285388183b82d1a18db21f4d2eba" origin="Generated by Gradle"/>
150+
</artifact>
151+
</component>
147152
<component group="com.azure" name="azure-storage-common" version="12.26.1">
148153
<artifact name="azure-storage-common-12.26.1.jar">
149154
<sha256 value="b0297ac1a9017ccd8a1e5cf41fb8d00ff0adbdd06849f6c5aafb3208708264dd" origin="Generated by Gradle"/>

modules/repository-azure/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencies {
3030
api "com.azure:azure-identity:1.13.2"
3131
api "com.azure:azure-json:1.2.0"
3232
api "com.azure:azure-storage-blob:12.27.1"
33+
api "com.azure:azure-storage-blob-batch:12.23.1"
3334
api "com.azure:azure-storage-common:12.26.1"
3435
api "com.azure:azure-storage-internal-avro:12.12.1"
3536
api "com.azure:azure-xml:1.1.0"

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@
99

1010
package org.elasticsearch.repositories.azure;
1111

12+
import com.sun.net.httpserver.Headers;
1213
import com.sun.net.httpserver.HttpExchange;
1314
import com.sun.net.httpserver.HttpHandler;
1415

1516
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.common.Randomness;
1618
import org.elasticsearch.common.blobstore.BlobContainer;
1719
import org.elasticsearch.common.blobstore.BlobPath;
1820
import org.elasticsearch.common.blobstore.OperationPurpose;
21+
import org.elasticsearch.common.bytes.BytesArray;
1922
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.settings.Settings;
2024
import org.elasticsearch.core.SuppressForbidden;
2125
import org.elasticsearch.plugins.PluginsService;
2226
import org.elasticsearch.repositories.RepositoriesMetrics;
@@ -31,6 +35,7 @@
3135
import java.io.IOException;
3236
import java.nio.ByteBuffer;
3337
import java.nio.charset.StandardCharsets;
38+
import java.util.ArrayList;
3439
import java.util.List;
3540
import java.util.Map;
3641
import java.util.Queue;
@@ -43,6 +48,7 @@
4348
import java.util.stream.IntStream;
4449

4550
import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
51+
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
4652
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4753
import static org.hamcrest.Matchers.hasSize;
4854
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -225,6 +231,91 @@ public void testRequestTimeIsAccurate() throws IOException {
225231
assertThat(recordedRequestTime, lessThanOrEqualTo(elapsedTimeMillis));
226232
}
227233

234+
public void testBatchDeleteFailure() throws IOException {
235+
final int deleteBatchSize = randomIntBetween(1, 30);
236+
final String repositoryName = randomRepositoryName();
237+
final String repository = createRepository(
238+
repositoryName,
239+
Settings.builder()
240+
.put(repositorySettings(repositoryName))
241+
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
242+
.build(),
243+
true
244+
);
245+
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
246+
final BlobContainer container = getBlobContainer(dataNodeName, repository);
247+
248+
final List<String> blobsToDelete = new ArrayList<>();
249+
final int numberOfBatches = randomIntBetween(3, 20);
250+
final int numberOfBlobs = numberOfBatches * deleteBatchSize;
251+
final int failedBatches = randomIntBetween(1, numberOfBatches);
252+
for (int i = 0; i < numberOfBlobs; i++) {
253+
byte[] bytes = randomBytes(randomInt(100));
254+
String blobName = "index-" + randomAlphaOfLength(10);
255+
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
256+
blobsToDelete.add(blobName);
257+
}
258+
Randomness.shuffle(blobsToDelete);
259+
clearMetrics(dataNodeName);
260+
261+
// Handler will fail one or more of the batch requests
262+
final RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
263+
264+
// Exhaust the retries
265+
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
266+
.forEach(i -> requestHandlers.offer(failNRequestRequestHandler));
267+
268+
logger.info("--> Failing {} of {} batches", failedBatches, numberOfBatches);
269+
270+
final IOException exception = assertThrows(
271+
IOException.class,
272+
() -> container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator())
273+
);
274+
assertEquals(Math.min(failedBatches, 10), exception.getSuppressed().length);
275+
assertEquals(
276+
(numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1L)),
277+
getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_REQUESTS_TOTAL)
278+
);
279+
assertEquals((failedBatches * (MAX_RETRIES + 1L)), getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_EXCEPTIONS_TOTAL));
280+
assertEquals(failedBatches * deleteBatchSize, container.listBlobs(randomPurpose()).size());
281+
}
282+
283+
private long getLongCounterTotal(String dataNodeName, String metricKey) {
284+
return getTelemetryPlugin(dataNodeName).getLongCounterMeasurement(metricKey)
285+
.stream()
286+
.mapToLong(Measurement::getLong)
287+
.reduce(0L, Long::sum);
288+
}
289+
290+
/**
291+
* Creates a {@link RequestHandler} that will persistently fail the first <code>numberToFail</code> distinct requests
292+
* it sees. Any other requests are passed through to the delegate.
293+
*
294+
* @param numberToFail The number of requests to fail
295+
* @return the handler
296+
*/
297+
private static RequestHandler createFailNRequestsHandler(int numberToFail) {
298+
final List<String> requestsToFail = new ArrayList<>(numberToFail);
299+
return (exchange, delegate) -> {
300+
final Headers requestHeaders = exchange.getRequestHeaders();
301+
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
302+
boolean failRequest = false;
303+
synchronized (requestsToFail) {
304+
if (requestsToFail.contains(requestId)) {
305+
failRequest = true;
306+
} else if (requestsToFail.size() < numberToFail) {
307+
requestsToFail.add(requestId);
308+
failRequest = true;
309+
}
310+
}
311+
if (failRequest) {
312+
exchange.sendResponseHeaders(500, -1);
313+
} else {
314+
delegate.handle(exchange);
315+
}
316+
};
317+
}
318+
228319
private void clearMetrics(String discoveryNode) {
229320
internalCluster().getInstance(PluginsService.class, discoveryNode)
230321
.filterPlugins(TestTelemetryPlugin.class)

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ protected Settings repositorySettings(String repoName) {
8989
.put(super.repositorySettings(repoName))
9090
.put(AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
9191
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
92-
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test");
92+
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
93+
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), randomIntBetween(5, 256))
94+
.put(AzureRepository.Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.getKey(), randomIntBetween(1, 10));
9395
if (randomBoolean()) {
9496
settingsBuilder.put(AzureRepository.Repository.BASE_PATH_SETTING.getKey(), randomFrom("test", "test/1"));
9597
}
@@ -249,6 +251,8 @@ protected void maybeTrack(String request, Headers headers) {
249251
trackRequest("PutBlockList");
250252
} else if (Regex.simpleMatch("PUT /*/*", request)) {
251253
trackRequest("PutBlob");
254+
} else if (Regex.simpleMatch("POST /*/*?*comp=batch*", request)) {
255+
trackRequest("BlobBatch");
252256
}
253257
}
254258

@@ -279,10 +283,22 @@ public void testLargeBlobCountDeletion() throws Exception {
279283
}
280284

281285
public void testDeleteBlobsIgnoringIfNotExists() throws Exception {
282-
try (BlobStore store = newBlobStore()) {
286+
// Test with a smaller batch size here
287+
final int deleteBatchSize = randomIntBetween(1, 30);
288+
final String repositoryName = randomRepositoryName();
289+
createRepository(
290+
repositoryName,
291+
Settings.builder()
292+
.put(repositorySettings(repositoryName))
293+
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
294+
.build(),
295+
true
296+
);
297+
try (BlobStore store = newBlobStore(repositoryName)) {
283298
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
284-
List<String> blobsToDelete = new ArrayList<>();
285-
for (int i = 0; i < 10; i++) {
299+
final int toDeleteCount = randomIntBetween(deleteBatchSize, 3 * deleteBatchSize);
300+
final List<String> blobsToDelete = new ArrayList<>();
301+
for (int i = 0; i < toDeleteCount; i++) {
286302
byte[] bytes = randomBytes(randomInt(100));
287303
String blobName = randomAlphaOfLength(10);
288304
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.elasticsearch.common.unit.ByteSizeUnit;
3131
import org.elasticsearch.common.unit.ByteSizeValue;
3232
import org.elasticsearch.core.Booleans;
33+
import org.elasticsearch.logging.LogManager;
34+
import org.elasticsearch.logging.Logger;
3335
import org.elasticsearch.plugins.Plugin;
3436
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
3537
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@@ -46,6 +48,7 @@
4648
import static org.hamcrest.Matchers.not;
4749

4850
public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
51+
private static final Logger logger = LogManager.getLogger(AzureStorageCleanupThirdPartyTests.class);
4952
private static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("test.azure.fixture", "true"));
5053

5154
private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");
@@ -89,8 +92,10 @@ protected SecureSettings credentials() {
8992
MockSecureSettings secureSettings = new MockSecureSettings();
9093
secureSettings.setString("azure.client.default.account", System.getProperty("test.azure.account"));
9194
if (hasSasToken) {
95+
logger.info("--> Using SAS token authentication");
9296
secureSettings.setString("azure.client.default.sas_token", System.getProperty("test.azure.sas_token"));
9397
} else {
98+
logger.info("--> Using key authentication");
9499
secureSettings.setString("azure.client.default.key", System.getProperty("test.azure.key"));
95100
}
96101
return secureSettings;

modules/repository-azure/src/main/java/module-info.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,15 @@
1818
requires org.apache.logging.log4j;
1919
requires org.apache.logging.log4j.core;
2020

21-
requires com.azure.core;
2221
requires com.azure.http.netty;
23-
requires com.azure.storage.blob;
24-
requires com.azure.storage.common;
2522
requires com.azure.identity;
2623

2724
requires io.netty.buffer;
2825
requires io.netty.transport;
2926
requires io.netty.resolver;
3027
requires io.netty.common;
3128

32-
requires reactor.core;
3329
requires reactor.netty.core;
3430
requires reactor.netty.http;
31+
requires com.azure.storage.blob.batch;
3532
}

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void writeMetadataBlob(
138138
}
139139

140140
@Override
141-
public DeleteResult delete(OperationPurpose purpose) {
141+
public DeleteResult delete(OperationPurpose purpose) throws IOException {
142142
return blobStore.deleteBlobDirectory(purpose, keyPath);
143143
}
144144

0 commit comments

Comments
 (0)