Skip to content

Commit 1b1f97f

Browse files
committed
[ML] Introduce WritableIndexExpander to manage writable indices for job removers
1 parent 2ddc240 commit 1b1f97f

File tree

8 files changed

+113
-67
lines changed

8 files changed

+113
-67
lines changed

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.action.index.IndexRequest;
1111
import org.elasticsearch.action.support.PlainActionFuture;
1212
import org.elasticsearch.client.internal.OriginSettingClient;
13-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1413
import org.elasticsearch.common.settings.Settings;
1514
import org.elasticsearch.common.unit.ByteSizeValue;
1615
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
@@ -39,6 +38,7 @@
3938
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
4039
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
4140
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
41+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
4242
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
4343
import org.junit.Before;
4444

@@ -49,11 +49,12 @@
4949
public class UnusedStatsRemoverIT extends BaseMlIntegTestCase {
5050

5151
private OriginSettingClient client;
52-
private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
52+
private WritableIndexExpander writableIndexExpander;
5353

5454
@Before
5555
public void createComponents() {
5656
client = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
57+
writableIndexExpander = new WritableIndexExpander(clusterService(), TestIndexNameExpressionResolver.newInstance());
5758
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
5859
MlStatsIndex.createStatsIndexAndAliasIfNecessary(
5960
client(),
@@ -161,11 +162,7 @@ private void refreshStatsIndex() {
161162

162163
private void runUnusedStatsRemover() {
163164
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
164-
new UnusedStatsRemover(client, new TaskId("test", 0L), clusterService(), indexNameExpressionResolver).remove(
165-
10000.0f,
166-
deletionListener,
167-
() -> false
168-
);
165+
new UnusedStatsRemover(client, new TaskId("test", 0L), writableIndexExpander).remove(10000.0f, deletionListener, () -> false);
169166
deletionListener.actionGet();
170167
}
171168

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
4242
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
4343
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
44+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
4445
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
4546
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
4647
import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator;
@@ -70,6 +71,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
7071
private final JobConfigProvider jobConfigProvider;
7172
private final JobResultsProvider jobResultsProvider;
7273
private final AnomalyDetectionAuditor auditor;
74+
private final WritableIndexExpander writableIndexExpander;
7375

7476
@Inject
7577
public TransportDeleteExpiredDataAction(
@@ -121,6 +123,7 @@ public TransportDeleteExpiredDataAction(
121123
this.jobConfigProvider = jobConfigProvider;
122124
this.jobResultsProvider = jobResultsProvider;
123125
this.auditor = auditor;
126+
this.writableIndexExpander = new WritableIndexExpander(clusterService, indexNameExpressionResolver);
124127
}
125128

126129
@Override
@@ -246,13 +249,15 @@ private List<MlDataRemover> createDataRemovers(
246249
TaskId parentTaskId,
247250
AnomalyDetectionAuditor anomalyDetectionAuditor
248251
) {
252+
249253
return Arrays.asList(
250254
new ExpiredResultsRemover(
251255
originClient,
252256
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
253257
parentTaskId,
254258
anomalyDetectionAuditor,
255-
threadPool
259+
threadPool,
260+
writableIndexExpander
256261
),
257262
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId),
258263
new ExpiredModelSnapshotsRemover(
@@ -263,9 +268,9 @@ private List<MlDataRemover> createDataRemovers(
263268
jobResultsProvider,
264269
anomalyDetectionAuditor
265270
),
266-
new UnusedStateRemover(originClient, parentTaskId),
271+
new UnusedStateRemover(originClient, parentTaskId, writableIndexExpander),
267272
new EmptyStateIndexRemover(originClient, parentTaskId),
268-
new UnusedStatsRemover(originClient, parentTaskId, clusterService, indexNameExpressionResolver),
273+
new UnusedStatsRemover(originClient, parentTaskId, writableIndexExpander),
269274
new ExpiredAnnotationsRemover(
270275
originClient,
271276
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
@@ -278,7 +283,14 @@ private List<MlDataRemover> createDataRemovers(
278283

279284
private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor anomalyDetectionAuditor) {
280285
return Arrays.asList(
281-
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool),
286+
new ExpiredResultsRemover(
287+
client,
288+
new VolatileCursorIterator<>(jobs),
289+
parentTaskId,
290+
anomalyDetectionAuditor,
291+
threadPool,
292+
writableIndexExpander
293+
),
282294
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
283295
new ExpiredModelSnapshotsRemover(
284296
client,
@@ -288,9 +300,9 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
288300
jobResultsProvider,
289301
anomalyDetectionAuditor
290302
),
291-
new UnusedStateRemover(client, parentTaskId),
303+
new UnusedStateRemover(client, parentTaskId, writableIndexExpander),
292304
new EmptyStateIndexRemover(client, parentTaskId),
293-
new UnusedStatsRemover(client, parentTaskId, clusterService, indexNameExpressionResolver),
305+
new UnusedStatsRemover(client, parentTaskId, writableIndexExpander),
294306
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool)
295307
);
296308
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
3131
this.client = client;
3232
this.jobIterator = jobIterator;
3333
this.parentTaskId = parentTaskId;
34+
3435
}
3536

3637
protected TaskId getParentTaskId() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,20 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
7070

7171
private final AnomalyDetectionAuditor auditor;
7272
private final ThreadPool threadPool;
73+
private final WritableIndexExpander writableIndexExpander;
7374

7475
public ExpiredResultsRemover(
7576
OriginSettingClient client,
7677
Iterator<Job> jobIterator,
7778
TaskId parentTaskId,
7879
AnomalyDetectionAuditor auditor,
79-
ThreadPool threadPool
80+
ThreadPool threadPool,
81+
WritableIndexExpander writableIndexExpander
8082
) {
8183
super(client, jobIterator, parentTaskId);
8284
this.auditor = Objects.requireNonNull(auditor);
8385
this.threadPool = Objects.requireNonNull(threadPool);
86+
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
8487
}
8588

8689
@Override
@@ -136,7 +139,7 @@ public void onFailure(Exception e) {
136139
});
137140
}
138141

139-
private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
142+
private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
140143
QueryBuilder excludeFilter = QueryBuilders.termsQuery(
141144
Result.RESULT_TYPE.getPreferredName(),
142145
ModelSizeStats.RESULT_TYPE_VALUE,
@@ -148,7 +151,15 @@ private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerS
148151
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"))
149152
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
150153
.mustNot(excludeFilter);
151-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())).setSlices(
154+
155+
var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
156+
157+
if (indicesToQuery.isEmpty()) {
158+
LOGGER.warn("No writable indices found for job [{}]", job.getId());
159+
return new DeleteByQueryRequest();
160+
}
161+
162+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setSlices(
152163
AbstractBulkByScrollRequest.AUTO_SLICES
153164
)
154165
.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ public class UnusedStateRemover implements MlDataRemover {
5151

5252
private final OriginSettingClient client;
5353
private final TaskId parentTaskId;
54+
private final WritableIndexExpander writableIndexExpander;
5455

55-
public UnusedStateRemover(OriginSettingClient client, TaskId parentTaskId) {
56+
public UnusedStateRemover(OriginSettingClient client, TaskId parentTaskId, WritableIndexExpander writableIndexExpander) {
5657
this.client = Objects.requireNonNull(client);
5758
this.parentTaskId = Objects.requireNonNull(parentTaskId);
59+
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
5860
}
5961

6062
@Override
@@ -137,8 +139,18 @@ private Set<String> getDataFrameAnalyticsJobIds() {
137139

138140
private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, float requestsPerSec, ActionListener<Boolean> listener) {
139141
LOGGER.info("Found [{}] unused state documents; attempting to delete", unusedDocIds.size());
140-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
141-
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
142+
143+
var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobStateIndexPattern());
144+
145+
if (indicesToQuery.isEmpty()) {
146+
LOGGER.info("No writable indices found for unused state documents");
147+
listener.onResponse(true);
148+
return;
149+
}
150+
151+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setIndicesOptions(
152+
IndicesOptions.lenientExpandOpen()
153+
)
142154
.setAbortOnVersionConflict(false)
143155
.setRequestsPerSecond(requestsPerSec)
144156
.setTimeout(DEFAULT_MAX_DURATION)
@@ -149,7 +161,7 @@ private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, float reque
149161
deleteByQueryRequest.setParentTask(parentTaskId);
150162

151163
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(response -> {
152-
if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) {
164+
if (response.getBulkFailures().isEmpty() == false || response.getSearchFailures().isEmpty() == false) {
153165
LOGGER.error(
154166
"Some unused state documents could not be deleted due to failures: {}",
155167
Strings.collectionToCommaDelimitedString(response.getBulkFailures())

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,12 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.action.ActionListener;
12-
import org.elasticsearch.action.bulk.BulkItemResponse;
1312
import org.elasticsearch.action.support.IndicesOptions;
1413
import org.elasticsearch.client.internal.OriginSettingClient;
15-
import org.elasticsearch.cluster.metadata.IndexMetadata;
16-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
17-
import org.elasticsearch.cluster.service.ClusterService;
1814
import org.elasticsearch.common.Strings;
1915
import org.elasticsearch.index.query.BoolQueryBuilder;
2016
import org.elasticsearch.index.query.QueryBuilder;
2117
import org.elasticsearch.index.query.QueryBuilders;
22-
import org.elasticsearch.index.reindex.BulkByScrollResponse;
2318
import org.elasticsearch.index.reindex.DeleteByQueryAction;
2419
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
2520
import org.elasticsearch.tasks.TaskId;
@@ -32,7 +27,6 @@
3227
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
3328
import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;
3429

35-
import java.util.ArrayList;
3630
import java.util.Deque;
3731
import java.util.HashSet;
3832
import java.util.Objects;
@@ -50,19 +44,12 @@ public class UnusedStatsRemover implements MlDataRemover {
5044

5145
private final OriginSettingClient client;
5246
private final TaskId parentTaskId;
53-
private final ClusterService clusterService;
54-
private final IndexNameExpressionResolver indexNameExpressionResolver;
55-
56-
public UnusedStatsRemover(
57-
OriginSettingClient client,
58-
TaskId parentTaskId,
59-
ClusterService clusterService,
60-
IndexNameExpressionResolver indexNameExpressionResolver
61-
) {
47+
private final WritableIndexExpander writableIndexExpander;
48+
49+
public UnusedStatsRemover(OriginSettingClient client, TaskId parentTaskId, WritableIndexExpander writableIndexExpander) {
6250
this.client = Objects.requireNonNull(client);
6351
this.parentTaskId = Objects.requireNonNull(parentTaskId);
64-
this.clusterService = Objects.requireNonNull(clusterService);
65-
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
52+
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
6653
}
6754

6855
@Override
@@ -116,37 +103,11 @@ private Set<String> getTrainedModelIds() {
116103
return modelIds;
117104
}
118105

119-
private static boolean hasNonReadOnlyBulkFailures(BulkByScrollResponse response) {
120-
for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
121-
if (failure.getMessage().contains(IndexMetadata.INDEX_WRITE_BLOCK.description())) {
122-
LOGGER.debug(
123-
"Ignoring failure to delete orphan stats docs from read-only index [{}]: {}",
124-
failure.getIndex(),
125-
failure.getMessage()
126-
);
127-
} else {
128-
return true;
129-
}
130-
}
131-
return false;
132-
}
133-
134106
private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec, ActionListener<Boolean> listener) {
135-
var clusterState = clusterService.state();
136-
var concreteIndices = indexNameExpressionResolver.concreteIndexNames(
137-
clusterState,
138-
IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN,
139-
MlStatsIndex.indexPattern()
140-
);
141-
var indicesToQuery = new ArrayList<String>();
142-
for (String concreteIndex : concreteIndices) {
143-
var indexSettings = clusterState.metadata().getProject().index(concreteIndex).getSettings();
144-
if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings) == false) {
145-
indicesToQuery.add(concreteIndex);
146-
}
147-
}
107+
var indicesToQuery = writableIndexExpander.getWritableIndices(MlStatsIndex.indexPattern());
148108

149109
if (indicesToQuery.isEmpty()) {
110+
LOGGER.info("No writable indices found for unused stats documents");
150111
listener.onResponse(true);
151112
return;
152113
}
@@ -157,7 +118,7 @@ private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec
157118
deleteByQueryRequest.setParentTask(parentTaskId);
158119

159120
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(response -> {
160-
if (hasNonReadOnlyBulkFailures(response) || response.getSearchFailures().isEmpty() == false) {
121+
if (response.getBulkFailures().isEmpty() == false || response.getSearchFailures().isEmpty() == false) {
161122
LOGGER.error(
162123
"Some unused stats documents could not be deleted due to failures: {}",
163124
Strings.collectionToCommaDelimitedString(response.getBulkFailures())
@@ -173,4 +134,5 @@ private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec
173134
listener.onFailure(e);
174135
}));
175136
}
137+
176138
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.job.retention;
9+
10+
import org.elasticsearch.action.support.IndicesOptions;
11+
import org.elasticsearch.cluster.metadata.IndexMetadata;
12+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
13+
import org.elasticsearch.cluster.service.ClusterService;
14+
15+
import java.util.ArrayList;
16+
import java.util.Objects;
17+
18+
public class WritableIndexExpander {
19+
20+
private final ClusterService clusterService;
21+
private final IndexNameExpressionResolver indexNameExpressionResolver;
22+
23+
public WritableIndexExpander(ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) {
24+
this.clusterService = Objects.requireNonNull(clusterService);
25+
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
26+
}
27+
28+
protected ArrayList<String> getWritableIndices(String indexPattern) {
29+
var clusterState = clusterService.state();
30+
var concreteIndices = indexNameExpressionResolver.concreteIndexNames(
31+
clusterState,
32+
IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN,
33+
indexPattern
34+
);
35+
var indicesToQuery = new ArrayList<String>();
36+
for (String concreteIndex : concreteIndices) {
37+
var indexSettings = clusterState.metadata().getProject().index(concreteIndex).getSettings();
38+
if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings) == false) {
39+
indicesToQuery.add(concreteIndex);
40+
}
41+
}
42+
return indicesToQuery;
43+
}
44+
}

0 commit comments

Comments
 (0)