Skip to content

Commit 636f2b8

Browse files
committed
Delete word lists when an index is deleted
1 parent 1c80ba0 commit 636f2b8

File tree

4 files changed

+71
-46
lines changed

4 files changed

+71
-46
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.support.DestructiveOperations;
1717
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1818
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
19+
import org.elasticsearch.client.internal.Client;
1920
import org.elasticsearch.cluster.ClusterState;
2021
import org.elasticsearch.cluster.block.ClusterBlockException;
2122
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.common.inject.Inject;
2526
import org.elasticsearch.common.util.concurrent.EsExecutors;
2627
import org.elasticsearch.index.Index;
28+
import org.elasticsearch.index.analysis.WordListsIndexService;
2729
import org.elasticsearch.tasks.Task;
2830
import org.elasticsearch.threadpool.ThreadPool;
2931
import org.elasticsearch.transport.TransportService;
@@ -42,6 +44,7 @@ public class TransportDeleteIndexAction extends AcknowledgedTransportMasterNodeA
4244

4345
private final MetadataDeleteIndexService deleteIndexService;
4446
private final DestructiveOperations destructiveOperations;
47+
private final WordListsIndexService wordListsIndexService;
4548

4649
@Inject
4750
public TransportDeleteIndexAction(
@@ -51,7 +54,8 @@ public TransportDeleteIndexAction(
5154
MetadataDeleteIndexService deleteIndexService,
5255
ActionFilters actionFilters,
5356
IndexNameExpressionResolver indexNameExpressionResolver,
54-
DestructiveOperations destructiveOperations
57+
DestructiveOperations destructiveOperations,
58+
Client client
5559
) {
5660
super(
5761
TYPE.name(),
@@ -65,6 +69,7 @@ public TransportDeleteIndexAction(
6569
);
6670
this.deleteIndexService = deleteIndexService;
6771
this.destructiveOperations = destructiveOperations;
72+
this.wordListsIndexService = new WordListsIndexService(client);
6873
}
6974

7075
@Override
@@ -91,10 +96,20 @@ protected void masterOperation(
9196
return;
9297
}
9398

94-
DeleteIndexClusterStateUpdateRequest deleteRequest = new DeleteIndexClusterStateUpdateRequest(listener.delegateResponse((l, e) -> {
95-
logger.debug(() -> "failed to delete indices [" + concreteIndices + "]", e);
96-
listener.onFailure(e);
97-
})).ackTimeout(request.ackTimeout()).masterNodeTimeout(request.masterNodeTimeout()).indices(concreteIndices.toArray(new Index[0]));
99+
DeleteIndexClusterStateUpdateRequest deleteRequest = new DeleteIndexClusterStateUpdateRequest(new ActionListener<>() {
100+
@Override
101+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
102+
concreteIndices.forEach(i -> wordListsIndexService.deleteIndexWordLists(i.getName(), this.delegateFailure((l, b) -> {})));
103+
listener.onResponse(acknowledgedResponse);
104+
}
105+
106+
@Override
107+
public void onFailure(Exception e) {
108+
// TODO: Detect when failure is due to word list deletion and update logged error message appropriately
109+
logger.debug(() -> "failed to delete indices [" + concreteIndices + "]", e);
110+
listener.onFailure(e);
111+
}
112+
}).ackTimeout(request.ackTimeout()).masterNodeTimeout(request.masterNodeTimeout()).indices(concreteIndices.toArray(new Index[0]));
98113

99114
deleteIndexService.deleteIndices(deleteRequest);
100115
}

server/src/main/java/org/elasticsearch/index/analysis/WordListsIndexService.java

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.index.analysis;
1010

11+
import org.elasticsearch.ElasticsearchStatusException;
1112
import org.elasticsearch.ExceptionsHelper;
1213
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
@@ -16,6 +17,7 @@
1617
import org.elasticsearch.action.ActionType;
1718
import org.elasticsearch.action.DelegatingActionListener;
1819
import org.elasticsearch.action.DocWriteRequest;
20+
import org.elasticsearch.action.bulk.BulkItemResponse;
1921
import org.elasticsearch.action.index.IndexRequest;
2022
import org.elasticsearch.action.index.TransportIndexAction;
2123
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -31,12 +33,19 @@
3133
import org.elasticsearch.common.util.concurrent.ThreadContext;
3234
import org.elasticsearch.index.IndexNotFoundException;
3335
import org.elasticsearch.index.query.QueryBuilders;
36+
import org.elasticsearch.index.query.TermQueryBuilder;
37+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
38+
import org.elasticsearch.index.reindex.DeleteByQueryAction;
39+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
3440
import org.elasticsearch.indices.SystemIndexDescriptor;
41+
import org.elasticsearch.logging.LogManager;
42+
import org.elasticsearch.logging.Logger;
3543
import org.elasticsearch.rest.RestStatus;
3644
import org.elasticsearch.xcontent.XContentBuilder;
3745

3846
import java.io.IOException;
3947
import java.io.UncheckedIOException;
48+
import java.util.List;
4049
import java.util.concurrent.Executor;
4150
import java.util.function.BiConsumer;
4251
import java.util.function.Supplier;
@@ -45,6 +54,8 @@
4554
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
4655

4756
public class WordListsIndexService {
57+
private static final Logger logger = LogManager.getLogger(WordListsIndexService.class);
58+
4859
public static final String WORD_LISTS_FEATURE_NAME = "word_lists";
4960
public static final String WORD_LISTS_ORIGIN = "word_lists";
5061

@@ -175,38 +186,8 @@ public void onFailure(Exception e) {
175186

176187
super.onFailure(e);
177188
}
178-
});
179-
180-
181-
// client.prepareSearch(WORD_LISTS_ALIAS_NAME)
182-
// .setQuery(QueryBuilders.termQuery(WORD_LIST_ID_FIELD, wordListId))
183-
// .setSize(1)
184-
// .setPreference(Preference.LOCAL.type())
185-
// .setTrackTotalHits(true)
186-
// .execute(new DelegatingActionListener<>(listener) {
187-
// @Override
188-
// public void onResponse(SearchResponse searchResponse) {
189-
// final long wordListCount = searchResponse.getHits().getTotalHits().value;
190-
// if (wordListCount > 1) {
191-
// listener.onFailure(new IllegalStateException(wordListCount + " word lists have ID [" + wordListId + "]"));
192-
// } else if (wordListCount == 1) {
193-
// listener.onResponse((String) searchResponse.getHits().getHits()[0].getSourceAsMap().get(WORD_LIST_VALUE_FIELD));
194-
// } else {
195-
// listener.onResponse(null);
196-
// }
197-
// }
198-
//
199-
// @Override
200-
// public void onFailure(Exception e) {
201-
// Throwable cause = ExceptionsHelper.unwrapCause(e);
202-
// if (cause instanceof IndexNotFoundException) {
203-
// delegate.onResponse(null);
204-
// return;
205-
// }
206-
//
207-
// super.onFailure(e);
208-
// }
209-
// });
189+
}
190+
);
210191
}
211192

212193
public void putWordList(String index, String wordListName, String wordListValue, ActionListener<PutWordListResult> listener) {
@@ -227,14 +208,41 @@ public void putWordList(String index, String wordListName, String wordListValue,
227208
l.onResponse(result);
228209
})
229210
);
211+
}
230212

231-
// client.index(indexRequest, listener.delegateFailure((l, indexResponse) -> {
232-
// PutWordListResult result = indexResponse.status() == RestStatus.CREATED
233-
// ? PutWordListResult.CREATED
234-
// : PutWordListResult.UPDATED;
235-
//
236-
// l.onResponse(result);
237-
// }));
213+
public void deleteIndexWordLists(String index, ActionListener<Boolean> listener) {
214+
TermQueryBuilder queryBuilder = QueryBuilders.termQuery(WORD_LIST_INDEX_FIELD, index);
215+
DeleteByQueryRequest request = new DeleteByQueryRequest().indices(WORD_LISTS_ALIAS_NAME).setQuery(queryBuilder);
216+
executeAsyncWithOrigin(client, WORD_LISTS_ORIGIN, DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
217+
@Override
218+
public void onResponse(BulkByScrollResponse response) {
219+
List<BulkItemResponse.Failure> failures = response.getBulkFailures();
220+
if (failures.isEmpty() == false) {
221+
logger.warn("Failed to delete word lists for index [" + index + "]");
222+
failures.forEach(failure -> logger.warn(failure.getMessage(), failure.getCause()));
223+
listener.onFailure(
224+
new ElasticsearchStatusException(
225+
"Failed to delete word lists for index [" + index + "]. See log for details.",
226+
RestStatus.INTERNAL_SERVER_ERROR
227+
)
228+
);
229+
}
230+
231+
boolean deletedWordLists = response.getDeleted() > 0;
232+
listener.onResponse(deletedWordLists);
233+
}
234+
235+
@Override
236+
public void onFailure(Exception e) {
237+
Throwable cause = ExceptionsHelper.unwrapCause(e);
238+
if (cause instanceof IndexNotFoundException) {
239+
listener.onResponse(false);
240+
return;
241+
}
242+
243+
listener.onFailure(e);
244+
}
245+
});
238246
}
239247

240248
private static String generateWordListId(String index, String wordListName) {

server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ public IndexMetadata verifyIndexMetadata(IndexMetadata indexMetadata, IndexVersi
322322
deleteIndexService,
323323
actionFilters,
324324
indexNameExpressionResolver,
325-
destructiveOperations
325+
destructiveOperations,
326+
client
326327
);
327328
transportUpdateSettingsAction = new TransportUpdateSettingsAction(
328329
transportService,

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2509,7 +2509,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
25092509
new MetadataDeleteIndexService(settings, clusterService, allocationService),
25102510
actionFilters,
25112511
indexNameExpressionResolver,
2512-
new DestructiveOperations(settings, clusterSettings)
2512+
new DestructiveOperations(settings, clusterSettings),
2513+
client
25132514
)
25142515
);
25152516
actions.put(

0 commit comments

Comments
 (0)