Skip to content

Commit c822a57

Browse files
authored
[ML] Prevent retention classes from failing when deleting documents in read-only indices (elastic#125408)
Classes like UnusedStatsRemover delete orphaned documents without an associated job. When the indices are made read-only it will start failing as read-only means no delete. This PR ensures that the non-writable indices are not included in the delete-by-query requests.
1 parent 55956d4 commit c822a57

19 files changed

+546
-109
lines changed

docs/changelog/125408.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 125408
2+
summary: Prevent ML data retention logic from failing when deleting documents in read-only
3+
indices
4+
area: Machine Learning
5+
type: bug
6+
issues: []

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.index.IndexRequest;
1111
import org.elasticsearch.action.support.PlainActionFuture;
1212
import org.elasticsearch.client.internal.OriginSettingClient;
13+
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
1415
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
1516
import org.elasticsearch.tasks.TaskId;
@@ -63,91 +64,118 @@ public void createComponents() {
6364
}
6465

6566
public void testRemoveUnusedStats() throws Exception {
67+
String modelId = "model-with-stats";
68+
putDFA(modelId);
6669

67-
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();
68-
69-
PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(
70-
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
71-
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
72-
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
73-
.setDest(new DataFrameAnalyticsDest("bar", null))
74-
.setAnalysis(new Regression("prediction"))
75-
.build()
76-
);
77-
client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();
78-
79-
client.execute(
80-
PutTrainedModelAction.INSTANCE,
81-
new PutTrainedModelAction.Request(
82-
TrainedModelConfig.builder()
83-
.setModelId("model-with-stats")
84-
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
85-
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
86-
.setParsedDefinition(
87-
new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
88-
.setTrainedModel(
89-
Tree.builder()
90-
.setFeatureNames(Arrays.asList("foo", "bar"))
91-
.setRoot(TreeNode.builder(0).setLeafValue(42))
92-
.build()
93-
)
94-
)
95-
.validate(true)
96-
.build(),
97-
false
98-
)
99-
).actionGet();
100-
70+
// Existing analytics and models
10171
indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), DataCounts.documentId("analytics-with-stats"));
102-
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
72+
indexStatDocument(new InferenceStats(1, 1, 1, 1, modelId, "test", Instant.now()), InferenceStats.docId(modelId, "test"));
10373
indexStatDocument(
10474
new InferenceStats(1, 1, 1, 1, TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test", Instant.now()),
10575
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
10676
);
77+
78+
// Unused analytics/model stats
79+
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
10780
indexStatDocument(
10881
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
10982
InferenceStats.docId("missing-model", "test")
11083
);
84+
85+
refreshStatsIndex();
86+
runUnusedStatsRemover();
87+
88+
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
89+
90+
// Validate expected docs
91+
assertDocExists(index, InferenceStats.docId(modelId, "test"));
92+
assertDocExists(index, DataCounts.documentId("analytics-with-stats"));
93+
assertDocExists(index, InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test"));
94+
95+
// Validate removed docs
96+
assertDocDoesNotExist(index, InferenceStats.docId("missing-model", "test"));
97+
assertDocDoesNotExist(index, DataCounts.documentId("missing-analytics-with-stats"));
98+
}
99+
100+
public void testRemovingUnusedStatsFromReadOnlyIndexShouldFailSilently() throws Exception {
101+
String modelId = "model-with-stats";
102+
putDFA(modelId);
103+
111104
indexStatDocument(
112-
new InferenceStats(1, 1, 1, 1, "model-with-stats", "test", Instant.now()),
113-
InferenceStats.docId("model-with-stats", "test")
105+
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
106+
InferenceStats.docId("missing-model", "test")
114107
);
115-
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
108+
makeIndexReadOnly();
109+
refreshStatsIndex();
116110

117-
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
118-
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
119-
statsRemover.remove(10000.0f, deletionListener, () -> false);
120-
deletionListener.actionGet();
111+
runUnusedStatsRemover();
112+
refreshStatsIndex();
121113

122-
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
114+
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
115+
assertDocExists(index, InferenceStats.docId("missing-model", "test")); // should still exist
116+
}
123117

124-
final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001";
118+
private void putDFA(String modelId) {
119+
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();
125120

126-
// Make sure that stats that should exist still exist
127-
assertTrue(client().prepareGet(initialStateIndex, InferenceStats.docId("model-with-stats", "test")).get().isExists());
128-
assertTrue(
129-
client().prepareGet(
130-
initialStateIndex,
131-
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
132-
).get().isExists()
121+
PutDataFrameAnalyticsAction.Request analyticsRequest = new PutDataFrameAnalyticsAction.Request(
122+
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
123+
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
124+
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
125+
.setDest(new DataFrameAnalyticsDest("bar", null))
126+
.setAnalysis(new Regression("prediction"))
127+
.build()
133128
);
134-
assertTrue(client().prepareGet(initialStateIndex, DataCounts.documentId("analytics-with-stats")).get().isExists());
135-
136-
// make sure that unused stats were deleted
137-
assertFalse(client().prepareGet(initialStateIndex, DataCounts.documentId("missing-analytics-with-stats")).get().isExists());
138-
assertFalse(client().prepareGet(initialStateIndex, InferenceStats.docId("missing-model", "test")).get().isExists());
129+
client.execute(PutDataFrameAnalyticsAction.INSTANCE, analyticsRequest).actionGet();
130+
131+
TrainedModelDefinition.Builder definition = new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
132+
.setTrainedModel(
133+
Tree.builder().setFeatureNames(Arrays.asList("foo", "bar")).setRoot(TreeNode.builder(0).setLeafValue(42)).build()
134+
);
135+
136+
TrainedModelConfig modelConfig = TrainedModelConfig.builder()
137+
.setModelId(modelId)
138+
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
139+
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
140+
.setParsedDefinition(definition)
141+
.validate(true)
142+
.build();
143+
144+
client.execute(PutTrainedModelAction.INSTANCE, new PutTrainedModelAction.Request(modelConfig, false)).actionGet();
139145
}
140146

141147
private void indexStatDocument(ToXContentObject object, String docId) throws Exception {
142-
ToXContent.Params params = new ToXContent.MapParams(
143-
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, Boolean.toString(true))
144-
);
145-
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias());
146-
doc.id(docId);
148+
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()).id(docId);
147149
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
148-
object.toXContent(builder, params);
150+
object.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")));
149151
doc.source(builder);
150152
client.index(doc).actionGet();
151153
}
152154
}
155+
156+
private void refreshStatsIndex() {
157+
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
158+
}
159+
160+
private void runUnusedStatsRemover() {
161+
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
162+
new UnusedStatsRemover(client, new TaskId("test", 0L)).remove(10000.0f, deletionListener, () -> false);
163+
deletionListener.actionGet();
164+
}
165+
166+
private void makeIndexReadOnly() {
167+
client().admin()
168+
.indices()
169+
.prepareUpdateSettings(MlStatsIndex.indexPattern())
170+
.setSettings(Settings.builder().put("index.blocks.write", true))
171+
.get();
172+
}
173+
174+
private void assertDocExists(String index, String docId) {
175+
assertTrue(client().prepareGet(index, docId).get().isExists());
176+
}
177+
178+
private void assertDocDoesNotExist(String index, String docId) {
179+
assertFalse(client().prepareGet(index, docId).get().isExists());
180+
}
153181
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@
364364
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
365365
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
366366
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
367+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
367368
import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTaskExecutor;
368369
import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor;
369370
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
@@ -921,6 +922,9 @@ public Collection<?> createComponents(PluginServices services) {
921922
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
922923
TelemetryProvider telemetryProvider = services.telemetryProvider();
923924

925+
// Initialize WritableIndexExpander
926+
WritableIndexExpander.initialize(clusterService, indexNameExpressionResolver);
927+
924928
if (enabled == false) {
925929
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
926930
// both empty if ML is disabled

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ private List<MlDataRemover> createDataRemovers(
240240
TaskId parentTaskId,
241241
AnomalyDetectionAuditor anomalyDetectionAuditor
242242
) {
243+
243244
return Arrays.asList(
244245
new ExpiredResultsRemover(
245246
originClient,
@@ -252,8 +253,8 @@ private List<MlDataRemover> createDataRemovers(
252253
new ExpiredModelSnapshotsRemover(
253254
originClient,
254255
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
255-
threadPool,
256256
parentTaskId,
257+
threadPool,
257258
jobResultsProvider,
258259
anomalyDetectionAuditor
259260
),
@@ -277,8 +278,8 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
277278
new ExpiredModelSnapshotsRemover(
278279
client,
279280
new VolatileCursorIterator<>(jobs),
280-
threadPool,
281281
parentTaskId,
282+
threadPool,
282283
jobResultsProvider,
283284
anomalyDetectionAuditor
284285
),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
6868
import org.elasticsearch.xpack.core.security.user.InternalUsers;
6969
import org.elasticsearch.xpack.ml.MachineLearning;
70+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
7071
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
7172

7273
import java.util.ArrayList;
@@ -133,7 +134,10 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
133134
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
134135
}
135136

136-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])).setRefresh(true)
137+
String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
138+
if (indicesToQuery.length == 0) return;
139+
140+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
137141
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
138142
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
139143

@@ -181,7 +185,16 @@ public void deleteAnnotations(
181185
boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
182186
}
183187
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
184-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setQuery(query)
188+
189+
String[] indicesToQuery = removeReadOnlyIndices(
190+
List.of(AnnotationIndex.READ_ALIAS_NAME),
191+
listener,
192+
"annotations",
193+
() -> listener.onResponse(true)
194+
);
195+
if (indicesToQuery.length == 0) return;
196+
197+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
185198
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
186199
.setAbortOnVersionConflict(false)
187200
.setRefresh(true)
@@ -199,6 +212,28 @@ public void deleteAnnotations(
199212
);
200213
}
201214

215+
private <T> String[] removeReadOnlyIndices(
216+
List<String> indicesToQuery,
217+
ActionListener<T> listener,
218+
String entityType,
219+
Runnable onEmpty
220+
) {
221+
try {
222+
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
223+
} catch (Exception e) {
224+
logger.error("Failed to get writable indices for [" + jobId + "]", e);
225+
listener.onFailure(e);
226+
return new String[0];
227+
}
228+
if (indicesToQuery.isEmpty()) {
229+
logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType);
230+
if (onEmpty != null) {
231+
onEmpty.run();
232+
}
233+
}
234+
return indicesToQuery.toArray(String[]::new);
235+
}
236+
202237
/**
203238
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}.
204239
* Forecasts are <em>not</em> deleted, as they will not be automatically regenerated after
@@ -220,7 +255,14 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
220255
)
221256
)
222257
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
223-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setQuery(query)
258+
String[] indicesToQuery = removeReadOnlyIndices(
259+
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
260+
listener,
261+
"results",
262+
() -> listener.onResponse(true)
263+
);
264+
if (indicesToQuery.length == 0) return;
265+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
224266
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
225267
.setAbortOnVersionConflict(false)
226268
.setRefresh(true)
@@ -265,9 +307,14 @@ public void deleteInterimResults() {
265307
* @param listener Response listener
266308
*/
267309
public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> listener) {
268-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setRefresh(
269-
true
270-
)
310+
String[] indicesToQuery = removeReadOnlyIndices(
311+
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
312+
listener,
313+
"datafeed timing stats",
314+
null
315+
);
316+
if (indicesToQuery.length == 0) return;
317+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
271318
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
272319
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));
273320

@@ -457,7 +504,9 @@ private void deleteResultsByQuery(
457504
ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
458505
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
459506
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
460-
DeleteByQueryRequest request = new DeleteByQueryRequest(indices).setQuery(query)
507+
String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
508+
if (indicesToQuery.length == 0) return;
509+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
461510
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
462511
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
463512
.setAbortOnVersionConflict(false)
@@ -530,7 +579,16 @@ private static IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesRespons
530579
private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener<Boolean> finishedHandler) {
531580
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
532581
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
533-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
582+
583+
String[] indicesToQuery = removeReadOnlyIndices(
584+
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
585+
finishedHandler,
586+
"quantiles",
587+
() -> finishedHandler.onResponse(true)
588+
);
589+
if (indicesToQuery.length == 0) return;
590+
591+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
534592
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
535593
.setAbortOnVersionConflict(false)
536594
.setRefresh(true);
@@ -560,7 +618,14 @@ private void deleteCategorizerState(
560618
) {
561619
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
562620
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
563-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
621+
String[] indicesToQuery = removeReadOnlyIndices(
622+
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
623+
finishedHandler,
624+
"categorizer state",
625+
() -> finishedHandler.onResponse(true)
626+
);
627+
if (indicesToQuery.length == 0) return;
628+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
564629
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
565630
.setAbortOnVersionConflict(false)
566631
.setRefresh(true);

0 commit comments

Comments
 (0)