Skip to content

Commit f6a2b5c

Browse files
authored
Add bulk delete method to BlobStore interface and implementations (#98948)
1 parent 01686a8 commit f6a2b5c

File tree

17 files changed

+196
-89
lines changed

17 files changed

+196
-89
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public DeleteResult delete() throws IOException {
123123

124124
@Override
125125
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
126-
blobStore.deleteBlobs(new Iterator<>() {
126+
blobStore.deleteBlobsIgnoringIfNotExists(new Iterator<>() {
127127
@Override
128128
public boolean hasNext() {
129129
return blobNames.hasNext();

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ private static void filterDeleteExceptionsAndRethrow(Exception e, IOException ex
263263
throw exception;
264264
}
265265

266-
void deleteBlobs(Iterator<String> blobs) throws IOException {
266+
@Override
267+
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobs) throws IOException {
267268
if (blobs.hasNext() == false) {
268269
return;
269270
}

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,8 @@ public String next() {
524524
*
525525
* @param blobNames names of the blobs to delete
526526
*/
527-
void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
527+
@Override
528+
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
528529
if (blobNames.hasNext() == false) {
529530
return;
530531
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 1 addition & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
1414
import com.amazonaws.services.s3.model.AmazonS3Exception;
1515
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
16-
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
1716
import com.amazonaws.services.s3.model.GetObjectRequest;
1817
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
1918
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
2019
import com.amazonaws.services.s3.model.ListNextBatchOfObjectsRequest;
2120
import com.amazonaws.services.s3.model.ListObjectsRequest;
22-
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
2321
import com.amazonaws.services.s3.model.MultipartUpload;
2422
import com.amazonaws.services.s3.model.ObjectListing;
2523
import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -32,7 +30,6 @@
3230
import org.apache.logging.log4j.LogManager;
3331
import org.apache.logging.log4j.Logger;
3432
import org.apache.lucene.util.SetOnce;
35-
import org.elasticsearch.ExceptionsHelper;
3633
import org.elasticsearch.action.ActionListener;
3734
import org.elasticsearch.action.ActionRunnable;
3835
import org.elasticsearch.action.support.RefCountingListener;
@@ -70,12 +67,10 @@
7067
import java.util.Map;
7168
import java.util.concurrent.atomic.AtomicBoolean;
7269
import java.util.concurrent.atomic.AtomicLong;
73-
import java.util.concurrent.atomic.AtomicReference;
7470
import java.util.function.Function;
7571
import java.util.stream.Collectors;
7672

7773
import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead;
78-
import static org.elasticsearch.core.Strings.format;
7974
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
8075
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
8176
import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
@@ -84,12 +79,6 @@ class S3BlobContainer extends AbstractBlobContainer {
8479

8580
private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);
8681

87-
/**
88-
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
89-
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
90-
*/
91-
private static final int MAX_BULK_DELETES = 1000;
92-
9382
private final S3BlobStore blobStore;
9483
private final String keyPath;
9584

@@ -357,55 +346,7 @@ public String next() {
357346
outstanding = blobNames;
358347
}
359348

360-
final List<String> partition = new ArrayList<>();
361-
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
362-
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
363-
final AtomicReference<Exception> aex = new AtomicReference<>();
364-
SocketAccess.doPrivilegedVoid(() -> {
365-
outstanding.forEachRemaining(key -> {
366-
partition.add(key);
367-
if (partition.size() == MAX_BULK_DELETES) {
368-
deletePartition(clientReference, partition, aex);
369-
partition.clear();
370-
}
371-
});
372-
if (partition.isEmpty() == false) {
373-
deletePartition(clientReference, partition, aex);
374-
}
375-
});
376-
if (aex.get() != null) {
377-
throw aex.get();
378-
}
379-
} catch (Exception e) {
380-
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
381-
}
382-
}
383-
384-
private void deletePartition(AmazonS3Reference clientReference, List<String> partition, AtomicReference<Exception> aex) {
385-
try {
386-
clientReference.client().deleteObjects(bulkDelete(blobStore, partition));
387-
} catch (MultiObjectDeleteException e) {
388-
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
389-
// first remove all keys that were sent in the request and then add back those that ran into an exception.
390-
logger.warn(
391-
() -> format(
392-
"Failed to delete some blobs %s",
393-
e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList()
394-
),
395-
e
396-
);
397-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
398-
} catch (AmazonClientException e) {
399-
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
400-
// remove any keys from the outstanding deletes set.
401-
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
402-
}
403-
}
404-
405-
private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List<String> blobs) {
406-
return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY))
407-
.withQuiet(true)
408-
.withRequestMetricCollector(blobStore.deleteMetricCollector);
349+
blobStore.deleteBlobsIgnoringIfNotExists(outstanding);
409350
}
410351

411352
@Override

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,21 @@
88

99
package org.elasticsearch.repositories.s3;
1010

11+
import com.amazonaws.AmazonClientException;
1112
import com.amazonaws.Request;
1213
import com.amazonaws.Response;
1314
import com.amazonaws.metrics.RequestMetricCollector;
1415
import com.amazonaws.services.s3.model.CannedAccessControlList;
16+
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
17+
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
1518
import com.amazonaws.services.s3.model.StorageClass;
1619
import com.amazonaws.util.AWSRequestMetrics;
1720

1821
import org.apache.logging.log4j.LogManager;
1922
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.ExceptionsHelper;
2024
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
25+
import org.elasticsearch.common.Strings;
2126
import org.elasticsearch.common.blobstore.BlobContainer;
2227
import org.elasticsearch.common.blobstore.BlobPath;
2328
import org.elasticsearch.common.blobstore.BlobStore;
@@ -28,13 +33,25 @@
2833
import org.elasticsearch.threadpool.ThreadPool;
2934

3035
import java.io.IOException;
36+
import java.util.ArrayList;
3137
import java.util.HashMap;
38+
import java.util.Iterator;
39+
import java.util.List;
3240
import java.util.Locale;
3341
import java.util.Map;
3442
import java.util.concurrent.atomic.AtomicLong;
43+
import java.util.concurrent.atomic.AtomicReference;
44+
45+
import static org.elasticsearch.core.Strings.format;
3546

3647
class S3BlobStore implements BlobStore {
3748

49+
/**
50+
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
51+
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
52+
*/
53+
private static final int MAX_BULK_DELETES = 1000;
54+
3855
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
3956

4057
private final S3Service service;
@@ -189,6 +206,59 @@ public BlobContainer blobContainer(BlobPath path) {
189206
return new S3BlobContainer(path, this);
190207
}
191208

209+
@Override
210+
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
211+
final List<String> partition = new ArrayList<>();
212+
try (AmazonS3Reference clientReference = clientReference()) {
213+
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
214+
final AtomicReference<Exception> aex = new AtomicReference<>();
215+
SocketAccess.doPrivilegedVoid(() -> {
216+
blobNames.forEachRemaining(key -> {
217+
partition.add(key);
218+
if (partition.size() == MAX_BULK_DELETES) {
219+
deletePartition(clientReference, partition, aex);
220+
partition.clear();
221+
}
222+
});
223+
if (partition.isEmpty() == false) {
224+
deletePartition(clientReference, partition, aex);
225+
}
226+
});
227+
if (aex.get() != null) {
228+
throw aex.get();
229+
}
230+
} catch (Exception e) {
231+
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
232+
}
233+
}
234+
235+
private void deletePartition(AmazonS3Reference clientReference, List<String> partition, AtomicReference<Exception> aex) {
236+
try {
237+
clientReference.client().deleteObjects(bulkDelete(this, partition));
238+
} catch (MultiObjectDeleteException e) {
239+
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
240+
// first remove all keys that were sent in the request and then add back those that ran into an exception.
241+
logger.warn(
242+
() -> format(
243+
"Failed to delete some blobs %s",
244+
e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList()
245+
),
246+
e
247+
);
248+
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
249+
} catch (AmazonClientException e) {
250+
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
251+
// remove any keys from the outstanding deletes set.
252+
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
253+
}
254+
}
255+
256+
private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List<String> blobs) {
257+
return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY))
258+
.withQuiet(true)
259+
.withRequestMetricCollector(blobStore.deleteMetricCollector);
260+
}
261+
192262
@Override
193263
public void close() throws IOException {
194264
this.service.close();

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import org.elasticsearch.common.unit.ByteSizeValue;
2222
import org.elasticsearch.core.CheckedFunction;
2323

24+
import java.io.IOException;
2425
import java.net.MalformedURLException;
2526
import java.net.URL;
27+
import java.util.Iterator;
2628
import java.util.List;
2729

2830
/**
@@ -105,6 +107,11 @@ public BlobContainer blobContainer(BlobPath blobPath) {
105107
}
106108
}
107109

110+
@Override
111+
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
112+
throw new UnsupportedOperationException("Bulk deletes are not supported in URL repositories");
113+
}
114+
108115
@Override
109116
public void close() {
110117
// nothing to do here...

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.blobstore.BlobStore;
1818

1919
import java.io.IOException;
20+
import java.util.Iterator;
2021

2122
final class HdfsBlobStore implements BlobStore {
2223

@@ -69,6 +70,11 @@ public BlobContainer blobContainer(BlobPath path) {
6970
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext, replicationFactor);
7071
}
7172

73+
@Override
74+
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
75+
throw new UnsupportedOperationException("Bulk deletes are not supported in Hdfs repositories");
76+
}
77+
7278
private Path buildHdfsPath(BlobPath blobPath) {
7379
final Path path = translateToHdfsPath(blobPath);
7480
if (readOnly == false) {

plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ public void testSnapshotAndRestore() throws Exception {
4444
testSnapshotAndRestore(false);
4545
}
4646

47+
@Override
48+
public void testBlobStoreBulkDeletion() throws Exception {
49+
// HDFS does not implement bulk deletion from different BlobContainers
50+
}
51+
4752
@Override
4853
protected Collection<Class<? extends Plugin>> nodePlugins() {
4954
return Collections.singletonList(HdfsPlugin.class);

server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
package org.elasticsearch.common.blobstore;
99

1010
import java.io.Closeable;
11+
import java.io.IOException;
1112
import java.util.Collections;
13+
import java.util.Iterator;
1214
import java.util.Map;
1315

1416
/**
@@ -21,6 +23,12 @@ public interface BlobStore extends Closeable {
2123
*/
2224
BlobContainer blobContainer(BlobPath path);
2325

26+
/**
27+
* Delete all the provided blobs from the blob store. Each blob could belong to a different {@code BlobContainer}
28+
* @param blobNames the blobs to be deleted
29+
*/
30+
void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException;
31+
2432
/**
2533
* Returns statistics on the count of operations that have been performed on this blob store
2634
*/

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -181,32 +181,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
181181

182182
@Override
183183
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
184-
IOException ioe = null;
185-
long suppressedExceptions = 0;
186-
while (blobNames.hasNext()) {
187-
try {
188-
Path resolve = path.resolve(blobNames.next());
189-
IOUtils.rm(resolve);
190-
} catch (IOException e) {
191-
// IOUtils.rm puts the original exception as a string in the IOException message. Ignore no such file exception.
192-
if (e.getMessage().contains("NoSuchFileException") == false) {
193-
// track up to 10 delete exceptions and try to continue deleting on exceptions
194-
if (ioe == null) {
195-
ioe = e;
196-
} else if (ioe.getSuppressed().length < 10) {
197-
ioe.addSuppressed(e);
198-
} else {
199-
++suppressedExceptions;
200-
}
201-
}
202-
}
203-
}
204-
if (ioe != null) {
205-
if (suppressedExceptions > 0) {
206-
ioe.addSuppressed(new IOException("Failed to delete files, suppressed [" + suppressedExceptions + "] failures"));
207-
}
208-
throw ioe;
209-
}
184+
blobStore.deleteBlobsIgnoringIfNotExists(Iterators.map(blobNames, blobName -> path.resolve(blobName).toString()));
210185
}
211186

212187
@Override

0 commit comments

Comments
 (0)