Skip to content

Commit b77b4e2

Browse files
feat(elasticsearch): allow bulk delete (#8424)
1 parent 1e507bc commit b77b4e2

File tree

3 files changed

+15
-4
lines changed

3 files changed

+15
-4
lines changed

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public static ESBulkProcessor.ESBulkProcessorBuilder builder(RestHighLevelClient
4747
@NonNull
4848
private Boolean async = false;
4949
@Builder.Default
50+
@NonNull
51+
private Boolean batchDelete = false;
52+
@Builder.Default
5053
private Integer bulkRequestsLimit = 500;
5154
@Builder.Default
5255
private Integer bulkFlushPeriod = 1;
@@ -62,12 +65,13 @@ public static ESBulkProcessor.ESBulkProcessorBuilder builder(RestHighLevelClient
6265
@Getter(AccessLevel.NONE)
6366
private final BulkProcessor bulkProcessor;
6467

65-
private ESBulkProcessor(@NonNull RestHighLevelClient searchClient, @NonNull Boolean async, Integer bulkRequestsLimit,
66-
Integer bulkFlushPeriod, Integer numRetries, Long retryInterval,
68+
private ESBulkProcessor(@NonNull RestHighLevelClient searchClient, @NonNull Boolean async, @NonNull Boolean batchDelete,
69+
Integer bulkRequestsLimit, Integer bulkFlushPeriod, Integer numRetries, Long retryInterval,
6770
TimeValue defaultTimeout, WriteRequest.RefreshPolicy writeRequestRefreshPolicy,
6871
BulkProcessor ignored) {
6972
this.searchClient = searchClient;
7073
this.async = async;
74+
this.batchDelete = batchDelete;
7175
this.bulkRequestsLimit = bulkRequestsLimit;
7276
this.bulkFlushPeriod = bulkFlushPeriod;
7377
this.numRetries = numRetries;
@@ -103,8 +107,10 @@ public Optional<BulkByScrollResponse> deleteByQuery(QueryBuilder queryBuilder, b
103107
deleteByQueryRequest.indices(indices);
104108

105109
try {
106-
// flush pending writes
107-
bulkProcessor.flush();
110+
if (!batchDelete) {
111+
// flush pending writes
112+
bulkProcessor.flush();
113+
}
108114
// perform delete after local flush
109115
final BulkByScrollResponse deleteResponse = searchClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
110116
MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc(deleteResponse.getTotal());

metadata-service/configuration/src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ elasticsearch:
168168
numRetries: ${ES_BULK_NUM_RETRIES:3}
169169
retryInterval: ${ES_BULK_RETRY_INTERVAL:1}
170170
refreshPolicy: ${ES_BULK_REFRESH_POLICY:NONE}
171+
enableBatchDelete: ${ES_BULK_ENABLE_BATCH_DELETE:false}
171172
index:
172173
prefix: ${INDEX_PREFIX:}
173174
numShards: ${ELASTICSEARCH_NUM_SHARDS_PER_INDEX:1}

metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public class ElasticSearchBulkProcessorFactory {
4141
@Value("#{new Boolean('${elasticsearch.bulkProcessor.async}')}")
4242
private boolean async;
4343

44+
@Value("#{new Boolean('${elasticsearch.bulkProcessor.enableBatchDelete}')}")
45+
private boolean enableBatchDelete;
46+
4447
@Value("${elasticsearch.bulkProcessor.refreshPolicy}")
4548
private String refreshPolicy;
4649

@@ -53,6 +56,7 @@ protected ESBulkProcessor getInstance() {
5356
.bulkRequestsLimit(bulkRequestsLimit)
5457
.retryInterval(retryInterval)
5558
.numRetries(numRetries)
59+
.batchDelete(enableBatchDelete)
5660
.writeRequestRefreshPolicy(WriteRequest.RefreshPolicy.valueOf(refreshPolicy))
5761
.build();
5862
}

0 commit comments

Comments
 (0)