Skip to content

Commit 6115885

Browse files
committed
Improve handling of individual batch failures
1 parent e744cbd commit 6115885

File tree

5 files changed

+139
-54
lines changed

5 files changed

+139
-54
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.repositories.azure;
1111

12+
import com.sun.net.httpserver.Headers;
1213
import com.sun.net.httpserver.HttpExchange;
1314
import com.sun.net.httpserver.HttpHandler;
1415

@@ -19,6 +20,7 @@
1920
import org.elasticsearch.common.blobstore.OperationPurpose;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.settings.Settings;
2224
import org.elasticsearch.core.SuppressForbidden;
2325
import org.elasticsearch.plugins.PluginsService;
2426
import org.elasticsearch.repositories.RepositoriesMetrics;
@@ -230,12 +232,24 @@ public void testRequestTimeIsAccurate() throws IOException {
230232
}
231233

232234
public void testBatchDeleteFailure() throws IOException {
233-
final String repository = createRepository(randomRepositoryName());
235+
final int deleteBatchSize = randomIntBetween(1, 30);
236+
final String repositoryName = randomRepositoryName();
237+
final String repository = createRepository(
238+
repositoryName,
239+
Settings.builder()
240+
.put(repositorySettings(repositoryName))
241+
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
242+
.build(),
243+
true
244+
);
234245
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
235246
final BlobContainer container = getBlobContainer(dataNodeName, repository);
236247

237248
final List<String> blobsToDelete = new ArrayList<>();
238-
for (int i = 0; i < 10; i++) {
249+
final int numberOfBatches = randomIntBetween(3, 5);
250+
final int numberOfBlobs = numberOfBatches * deleteBatchSize;
251+
final int failedBatches = randomIntBetween(1, numberOfBatches);
252+
for (int i = 0; i < numberOfBlobs; i++) {
239253
byte[] bytes = randomBytes(randomInt(100));
240254
String blobName = "index-" + randomAlphaOfLength(10);
241255
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
@@ -244,17 +258,61 @@ public void testBatchDeleteFailure() throws IOException {
244258
Randomness.shuffle(blobsToDelete);
245259
clearMetrics(dataNodeName);
246260

261+
// Handler will fail one or more of the batch requests
262+
RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
263+
247264
// Exhaust the retries
248-
IntStream.range(0, MAX_RETRIES + 1).forEach(i -> requestHandlers.offer(new FixedRequestHandler(RestStatus.INTERNAL_SERVER_ERROR)));
265+
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
266+
.forEach(i -> requestHandlers.offer(failNRequestRequestHandler));
249267

250-
final OperationPurpose deletePurpose = randomPurpose();
251-
assertThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(deletePurpose, blobsToDelete.iterator()));
268+
logger.info("--> Failing {} of {} batches", failedBatches, numberOfBatches);
252269

253-
metricsAsserter(dataNodeName, deletePurpose, AzureBlobStore.Operation.BLOB_BATCH, repository).expectMetrics()
254-
.withRequests(MAX_RETRIES + 1)
255-
.withExceptions(MAX_RETRIES + 1)
256-
.withThrottles(0)
257-
.forResult(MetricsAsserter.Result.Exception);
270+
IOException exception = assertThrows(
271+
IOException.class,
272+
() -> container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator())
273+
);
274+
assertEquals(exception.getSuppressed().length, failedBatches);
275+
assertEquals(
276+
(numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1L)),
277+
getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_REQUESTS_TOTAL)
278+
);
279+
assertEquals((failedBatches * (MAX_RETRIES + 1L)), getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_EXCEPTIONS_TOTAL));
280+
}
281+
282+
private long getLongCounterTotal(String dataNodeName, String metricKey) {
283+
return getTelemetryPlugin(dataNodeName).getLongCounterMeasurement(metricKey)
284+
.stream()
285+
.mapToLong(Measurement::getLong)
286+
.reduce(0L, Long::sum);
287+
}
288+
289+
/**
290+
* Creates a {@link RequestHandler} that will persistently fail the first <code>numberToFail</code> distinct requests
291+
* it sees. Any other requests are passed through to the delegate.
292+
*
293+
* @param numberToFail The number of requests to fail
294+
* @return the handler
295+
*/
296+
private static RequestHandler createFailNRequestsHandler(int numberToFail) {
297+
final List<String> requestsToFail = new ArrayList<>(numberToFail);
298+
return (exchange, delegate) -> {
299+
final Headers requestHeaders = exchange.getRequestHeaders();
300+
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
301+
boolean failRequest = false;
302+
synchronized (requestsToFail) {
303+
if (requestsToFail.contains(requestId)) {
304+
failRequest = true;
305+
} else if (requestsToFail.size() < numberToFail) {
306+
requestsToFail.add(requestId);
307+
failRequest = true;
308+
}
309+
}
310+
if (failRequest) {
311+
exchange.sendResponseHeaders(500, -1);
312+
} else {
313+
delegate.handle(exchange);
314+
}
315+
};
258316
}
259317

260318
private void clearMetrics(String discoveryNode) {

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ protected Settings repositorySettings(String repoName) {
8989
.put(super.repositorySettings(repoName))
9090
.put(AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
9191
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
92-
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test");
92+
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
93+
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), randomIntBetween(5, 256));
9394
if (randomBoolean()) {
9495
settingsBuilder.put(AzureRepository.Repository.BASE_PATH_SETTING.getKey(), randomFrom("test", "test/1"));
9596
}
@@ -281,10 +282,22 @@ public void testLargeBlobCountDeletion() throws Exception {
281282
}
282283

283284
public void testDeleteBlobsIgnoringIfNotExists() throws Exception {
284-
try (BlobStore store = newBlobStore()) {
285+
// Test with a smaller batch size here
286+
final int deleteBatchSize = randomIntBetween(1, 30);
287+
final String repositoryName = randomRepositoryName();
288+
createRepository(
289+
repositoryName,
290+
Settings.builder()
291+
.put(repositorySettings(repositoryName))
292+
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
293+
.build(),
294+
true
295+
);
296+
try (BlobStore store = newBlobStore(repositoryName)) {
285297
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
286-
List<String> blobsToDelete = new ArrayList<>();
287-
for (int i = 0; i < 10; i++) {
298+
final int toDeleteCount = randomIntBetween(deleteBatchSize, 3 * deleteBatchSize);
299+
final List<String> blobsToDelete = new ArrayList<>();
300+
for (int i = 0; i < toDeleteCount; i++) {
288301
byte[] bytes = randomBytes(randomInt(100));
289302
String blobName = randomAlphaOfLength(10);
290303
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void writeMetadataBlob(
138138
}
139139

140140
@Override
141-
public DeleteResult delete(OperationPurpose purpose) {
141+
public DeleteResult delete(OperationPurpose purpose) throws IOException {
142142
return blobStore.deleteBlobDirectory(purpose, keyPath);
143143
}
144144

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -236,57 +236,41 @@ public boolean blobExists(OperationPurpose purpose, String blob) throws IOExcept
236236
}
237237
}
238238

239-
public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) {
239+
public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) throws IOException {
240240
final AtomicInteger blobsDeleted = new AtomicInteger(0);
241241
final AtomicLong bytesDeleted = new AtomicLong(0);
242242
final List<String> blobNames = new ArrayList<>();
243243

244-
SocketAccess.doPrivilegedVoidException(() -> {
244+
SocketAccess.doPrivilegedVoidExceptionExplicit(IOException.class, () -> {
245245
final AzureBlobServiceClient client = getAzureBlobServiceClientClient(purpose);
246246
final BlobContainerClient blobContainerClient = client.getSyncClient().getBlobContainerClient(container);
247-
try {
248-
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path)
249-
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
250-
for (BlobItem blobItem : blobContainerClient.listBlobs(options, null)) {
251-
if (blobItem.isPrefix()) {
252-
continue;
253-
}
254-
blobNames.add(blobItem.getName());
255-
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
256-
blobsDeleted.incrementAndGet();
247+
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path)
248+
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
249+
for (BlobItem blobItem : blobContainerClient.listBlobs(options, null)) {
250+
if (blobItem.isPrefix()) {
251+
continue;
257252
}
258-
if (blobNames.isEmpty() == false) {
259-
deleteListOfBlobs(client, blobNames.iterator());
260-
}
261-
} catch (Exception e) {
262-
filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed"));
253+
blobNames.add(blobItem.getName());
254+
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
255+
blobsDeleted.incrementAndGet();
256+
}
257+
if (blobNames.isEmpty() == false) {
258+
deleteListOfBlobs(client, blobNames.iterator());
263259
}
264260
});
265261

266262
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
267263
}
268264

269-
private static void filterDeleteExceptionsAndRethrow(Exception e, IOException exception) throws IOException {
270-
int suppressedCount = 0;
271-
for (Throwable suppressed : e.getSuppressed()) {
272-
// We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
273-
if (suppressed instanceof IOException) {
274-
exception.addSuppressed(suppressed);
275-
suppressedCount++;
276-
if (suppressedCount > 10) {
277-
break;
278-
}
279-
}
280-
}
281-
throw exception;
282-
}
283-
284265
@Override
285266
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
286267
if (blobNames.hasNext() == false) {
287268
return;
288269
}
289-
SocketAccess.doPrivilegedVoidException(() -> deleteListOfBlobs(getAzureBlobServiceClientClient(purpose), blobNames));
270+
SocketAccess.doPrivilegedVoidExceptionExplicit(
271+
IOException.class,
272+
() -> deleteListOfBlobs(getAzureBlobServiceClientClient(purpose), blobNames)
273+
);
290274
}
291275

292276
private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, Iterator<String> blobNames) throws IOException {
@@ -306,19 +290,45 @@ private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, It
306290
}
307291
batchResponses.add(batchAsyncClient.submitBatch(currentBatch));
308292
}
309-
try {
310-
Flux.merge(batchResponses).collectList().block();
311-
} catch (BlobBatchStorageException bbse) {
293+
294+
// The selective exception handling is easier to do this way than with what is thrown by Mono#whenDelayError
295+
IOException exceptionToThrow = null;
296+
for (Mono<Void> batchResponse : batchResponses) {
297+
try {
298+
batchResponse.block();
299+
} catch (Exception e) {
300+
if (isIgnorableBatchDeleteException(e)) {
301+
continue;
302+
}
303+
if (exceptionToThrow == null) {
304+
exceptionToThrow = new IOException("Error deleting batch");
305+
}
306+
exceptionToThrow.addSuppressed(e);
307+
}
308+
}
309+
if (exceptionToThrow != null) {
310+
throw exceptionToThrow;
311+
}
312+
}
313+
314+
/**
315+
* We can ignore {@link BlobBatchStorageException}s when they are just telling us some of the files were not found
316+
*
317+
* @param exception An exception throw by batch delete
318+
* @return true if it is safe to ignore, false otherwise
319+
*/
320+
private boolean isIgnorableBatchDeleteException(Exception exception) {
321+
if (exception instanceof BlobBatchStorageException bbse) {
312322
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
313323
for (BlobStorageException bse : batchExceptions) {
314324
// If one of the requests failed with something other than a BLOB_NOT_FOUND, throw the encompassing exception
315325
if (BlobErrorCode.BLOB_NOT_FOUND.equals(bse.getErrorCode()) == false) {
316-
throw new IOException("Failed to delete batch", bbse);
326+
return false;
317327
}
318328
}
319-
} catch (Exception e) {
320-
throw new IOException("Unable to delete blobs", e);
329+
return true;
321330
}
331+
return false;
322332
}
323333

324334
public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) {

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ public static void doPrivilegedVoidException(StorageRunnable action) {
5252
}
5353
}
5454

55+
public static <E extends Exception> void doPrivilegedVoidExceptionExplicit(Class<E> exception, StorageRunnable action) throws E {
56+
doPrivilegedVoidException(action);
57+
}
58+
5559
@FunctionalInterface
5660
public interface StorageRunnable {
5761
void executeCouldThrow() throws URISyntaxException, IOException;

0 commit comments

Comments
 (0)