Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125408
summary: Prevent ML data retention logic from failing when deleting documents in read-only
indices
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -63,91 +64,118 @@ public void createComponents() {
}

public void testRemoveUnusedStats() throws Exception {
String modelId = "model-with-stats";
putDFA(modelId);

prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();

PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build()
);
client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();

client.execute(
PutTrainedModelAction.INSTANCE,
new PutTrainedModelAction.Request(
TrainedModelConfig.builder()
.setModelId("model-with-stats")
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
.setParsedDefinition(
new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
.setTrainedModel(
Tree.builder()
.setFeatureNames(Arrays.asList("foo", "bar"))
.setRoot(TreeNode.builder(0).setLeafValue(42))
.build()
)
)
.validate(true)
.build(),
false
)
).actionGet();

// Existing analytics and models
indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), DataCounts.documentId("analytics-with-stats"));
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
indexStatDocument(new InferenceStats(1, 1, 1, 1, modelId, "test", Instant.now()), InferenceStats.docId(modelId, "test"));
indexStatDocument(
new InferenceStats(1, 1, 1, 1, TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test", Instant.now()),
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
);

// Unused analytics/model stats
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
indexStatDocument(
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
InferenceStats.docId("missing-model", "test")
);

refreshStatsIndex();
runUnusedStatsRemover();

final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";

// Validate expected docs
assertDocExists(index, InferenceStats.docId(modelId, "test"));
assertDocExists(index, DataCounts.documentId("analytics-with-stats"));
assertDocExists(index, InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test"));

// Validate removed docs
assertDocDoesNotExist(index, InferenceStats.docId("missing-model", "test"));
assertDocDoesNotExist(index, DataCounts.documentId("missing-analytics-with-stats"));
}

public void testRemovingUnusedStatsFromReadOnlyIndexShouldFailSilently() throws Exception {
String modelId = "model-with-stats";
putDFA(modelId);

indexStatDocument(
new InferenceStats(1, 1, 1, 1, "model-with-stats", "test", Instant.now()),
InferenceStats.docId("model-with-stats", "test")
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
InferenceStats.docId("missing-model", "test")
);
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
makeIndexReadOnly();
refreshStatsIndex();

PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
statsRemover.remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();
runUnusedStatsRemover();
refreshStatsIndex();

client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
assertDocExists(index, InferenceStats.docId("missing-model", "test")); // should still exist
}

final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001";
private void putDFA(String modelId) {
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();

// Make sure that stats that should exist still exist
assertTrue(client().prepareGet(initialStateIndex, InferenceStats.docId("model-with-stats", "test")).get().isExists());
assertTrue(
client().prepareGet(
initialStateIndex,
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
).get().isExists()
PutDataFrameAnalyticsAction.Request analyticsRequest = new PutDataFrameAnalyticsAction.Request(
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build()
);
assertTrue(client().prepareGet(initialStateIndex, DataCounts.documentId("analytics-with-stats")).get().isExists());

// make sure that unused stats were deleted
assertFalse(client().prepareGet(initialStateIndex, DataCounts.documentId("missing-analytics-with-stats")).get().isExists());
assertFalse(client().prepareGet(initialStateIndex, InferenceStats.docId("missing-model", "test")).get().isExists());
client.execute(PutDataFrameAnalyticsAction.INSTANCE, analyticsRequest).actionGet();

TrainedModelDefinition.Builder definition = new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
.setTrainedModel(
Tree.builder().setFeatureNames(Arrays.asList("foo", "bar")).setRoot(TreeNode.builder(0).setLeafValue(42)).build()
);

TrainedModelConfig modelConfig = TrainedModelConfig.builder()
.setModelId(modelId)
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
.setParsedDefinition(definition)
.validate(true)
.build();

client.execute(PutTrainedModelAction.INSTANCE, new PutTrainedModelAction.Request(modelConfig, false)).actionGet();
}

private void indexStatDocument(ToXContentObject object, String docId) throws Exception {
ToXContent.Params params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, Boolean.toString(true))
);
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias());
doc.id(docId);
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()).id(docId);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
object.toXContent(builder, params);
object.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")));
doc.source(builder);
client.index(doc).actionGet();
}
}

private void refreshStatsIndex() {
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
}

private void runUnusedStatsRemover() {
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
new UnusedStatsRemover(client, new TaskId("test", 0L)).remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();
}

private void makeIndexReadOnly() {
client().admin()
.indices()
.prepareUpdateSettings(MlStatsIndex.indexPattern())
.setSettings(Settings.builder().put("index.blocks.write", true))
.get();
}

private void assertDocExists(String index, String docId) {
assertTrue(client().prepareGet(index, docId).get().isExists());
}

private void assertDocDoesNotExist(String index, String docId) {
assertFalse(client().prepareGet(index, docId).get().isExists());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTaskExecutor;
import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
Expand Down Expand Up @@ -922,6 +923,9 @@ public Collection<?> createComponents(PluginServices services) {
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
TelemetryProvider telemetryProvider = services.telemetryProvider();

// Initialize WritableIndexExpander
WritableIndexExpander.initialize(clusterService, indexNameExpressionResolver);

if (enabled == false) {
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
// both empty if ML is disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private List<MlDataRemover> createDataRemovers(
TaskId parentTaskId,
AnomalyDetectionAuditor anomalyDetectionAuditor
) {

return Arrays.asList(
new ExpiredResultsRemover(
originClient,
Expand All @@ -252,8 +253,8 @@ private List<MlDataRemover> createDataRemovers(
new ExpiredModelSnapshotsRemover(
originClient,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
threadPool,
parentTaskId,
threadPool,
jobResultsProvider,
anomalyDetectionAuditor
),
Expand All @@ -277,8 +278,8 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
new ExpiredModelSnapshotsRemover(
client,
new VolatileCursorIterator<>(jobs),
threadPool,
parentTaskId,
threadPool,
jobResultsProvider,
anomalyDetectionAuditor
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.security.user.InternalUsers;
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -131,7 +132,10 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
}

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])).setRefresh(true)
String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
if (indicesToQuery.length == 0) return;

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));

Expand Down Expand Up @@ -179,7 +183,16 @@ public void deleteAnnotations(
boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
}
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setQuery(query)

String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnnotationIndex.READ_ALIAS_NAME),
listener,
"annotations",
() -> listener.onResponse(true)
);
if (indicesToQuery.length == 0) return;

DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(true)
Expand All @@ -197,6 +210,28 @@ public void deleteAnnotations(
);
}

private <T> String[] removeReadOnlyIndices(
List<String> indicesToQuery,
ActionListener<T> listener,
String entityType,
Runnable onEmpty
) {
try {
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
} catch (Exception e) {
logger.error("Failed to get writable indices for [" + jobId + "]", e);
listener.onFailure(e);
return new String[0];
}
if (indicesToQuery.isEmpty()) {
logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType);
if (onEmpty != null) {
onEmpty.run();
}
}
return indicesToQuery.toArray(String[]::new);
}

/**
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}.
* Forecasts are <em>not</em> deleted, as they will not be automatically regenerated after
Expand All @@ -218,7 +253,14 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
)
)
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setQuery(query)
String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
listener,
"results",
() -> listener.onResponse(true)
);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(true)
Expand Down Expand Up @@ -263,9 +305,14 @@ public void deleteInterimResults() {
* @param listener Response listener
*/
public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setRefresh(
true
)
String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
listener,
"datafeed timing stats",
null
);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));

Expand Down Expand Up @@ -455,7 +502,9 @@ private void deleteResultsByQuery(
ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(indices).setQuery(query)
String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setAbortOnVersionConflict(false)
Expand Down Expand Up @@ -523,7 +572,16 @@ private static IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesRespons
private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener<Boolean> finishedHandler) {
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)

String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
finishedHandler,
"quantiles",
() -> finishedHandler.onResponse(true)
);
if (indicesToQuery.length == 0) return;

DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
.setAbortOnVersionConflict(false)
.setRefresh(true);
Expand Down Expand Up @@ -553,7 +611,14 @@ private void deleteCategorizerState(
) {
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
finishedHandler,
"categorizer state",
() -> finishedHandler.onResponse(true)
);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
.setAbortOnVersionConflict(false)
.setRefresh(true);
Expand Down
Loading