Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c40723a
unit test passes
valeriy42 Mar 21, 2025
f32c3be
clean up
valeriy42 Mar 21, 2025
ca253b8
Update docs/changelog/125408.yaml
valeriy42 Mar 21, 2025
0419085
[ML] Enhance UnusedStatsRemover by integrating IndexNameExpressionRes…
valeriy42 Apr 7, 2025
f3058f3
Merge branch 'enhancement/1532-unused-stats-remover' of https://githu…
valeriy42 Apr 7, 2025
2ddc240
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 Apr 8, 2025
1b1f97f
[ML] Introduce WritableIndexExpander to manage writable indices for j…
valeriy42 Apr 8, 2025
bec1653
Merge branch 'main' into enhancement/1532-unused-stats-remover
davidkyle Apr 11, 2025
a4405ef
[ML] Refactor job data removers to utilize WritableIndexExpander for …
valeriy42 Apr 14, 2025
c500317
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 Apr 14, 2025
546b635
Merge branch 'enhancement/1532-unused-stats-remover' of https://githu…
valeriy42 Apr 14, 2025
c98414d
[CI] Auto commit changes from spotless
Apr 14, 2025
482b243
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 May 19, 2025
2a10a63
fix typo
valeriy42 May 21, 2025
64ed211
add unit test for ExpiredAnnotationsRemover
valeriy42 May 21, 2025
8a28cb0
add unit test for ExpiredModelSnapshotsRemover
valeriy42 May 22, 2025
5db2496
add unit test for ExpiredResultsRemoverTests
valeriy42 May 22, 2025
1ad69b3
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
valeriy42 May 22, 2025
576318e
[CI] Auto commit changes from spotless
May 22, 2025
0064d5e
fix integration test
valeriy42 May 22, 2025
26bdb36
Merge remote-tracking branch 'origin/enhancement/1532-unused-stats-re…
valeriy42 May 22, 2025
bed8f1e
update docs
valeriy42 May 22, 2025
3d8a61b
fix logger usage failure
valeriy42 May 22, 2025
400b69d
Merge branch 'main' into enhancement/1532-unused-stats-remover
valeriy42 May 22, 2025
4447a52
making WritableIndexExpander a singleton
valeriy42 May 23, 2025
18808b4
prevent JobDataDeleter from attempting to delete from read-only indices
valeriy42 May 23, 2025
0317323
Merge branch 'main' into enhancement/1532-unused-stats-remover
valeriy42 May 23, 2025
532bb6a
fit test failure
valeriy42 May 23, 2025
264b581
Merge branch 'main' into enhancement/1532-unused-stats-remover
valeriy42 May 23, 2025
7b98158
fit test failure
valeriy42 May 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125408
summary: Prevent ML data retention logic from failing when deleting documents in read-only
indices
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -63,91 +64,118 @@ public void createComponents() {
}

public void testRemoveUnusedStats() throws Exception {
String modelId = "model-with-stats";
putDFA(modelId);

prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();

PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build()
);
client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();

client.execute(
PutTrainedModelAction.INSTANCE,
new PutTrainedModelAction.Request(
TrainedModelConfig.builder()
.setModelId("model-with-stats")
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
.setParsedDefinition(
new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
.setTrainedModel(
Tree.builder()
.setFeatureNames(Arrays.asList("foo", "bar"))
.setRoot(TreeNode.builder(0).setLeafValue(42))
.build()
)
)
.validate(true)
.build(),
false
)
).actionGet();

// Existing analytics and models
indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), DataCounts.documentId("analytics-with-stats"));
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
indexStatDocument(new InferenceStats(1, 1, 1, 1, modelId, "test", Instant.now()), InferenceStats.docId(modelId, "test"));
indexStatDocument(
new InferenceStats(1, 1, 1, 1, TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test", Instant.now()),
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
);

// Unused analytics/model stats
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
indexStatDocument(
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
InferenceStats.docId("missing-model", "test")
);

refreshStatsIndex();
runUnusedStatsRemover();

final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";

// Validate expected docs
assertDocExists(index, InferenceStats.docId(modelId, "test"));
assertDocExists(index, DataCounts.documentId("analytics-with-stats"));
assertDocExists(index, InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test"));

// Validate removed docs
assertDocDoesNotExist(index, InferenceStats.docId("missing-model", "test"));
assertDocDoesNotExist(index, DataCounts.documentId("missing-analytics-with-stats"));
}

public void testRemovingUnusedStatsFromReadOnlyIndexShouldFailSilently() throws Exception {
String modelId = "model-with-stats";
putDFA(modelId);

indexStatDocument(
new InferenceStats(1, 1, 1, 1, "model-with-stats", "test", Instant.now()),
InferenceStats.docId("model-with-stats", "test")
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
InferenceStats.docId("missing-model", "test")
);
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
makeIndexReadOnly();
refreshStatsIndex();

PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
statsRemover.remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();
runUnusedStatsRemover();
refreshStatsIndex();

client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
assertDocExists(index, InferenceStats.docId("missing-model", "test")); // should still exist
}

final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001";
private void putDFA(String modelId) {
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();

// Make sure that stats that should exist still exist
assertTrue(client().prepareGet(initialStateIndex, InferenceStats.docId("model-with-stats", "test")).get().isExists());
assertTrue(
client().prepareGet(
initialStateIndex,
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
).get().isExists()
PutDataFrameAnalyticsAction.Request analyticsRequest = new PutDataFrameAnalyticsAction.Request(
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build()
);
assertTrue(client().prepareGet(initialStateIndex, DataCounts.documentId("analytics-with-stats")).get().isExists());

// make sure that unused stats were deleted
assertFalse(client().prepareGet(initialStateIndex, DataCounts.documentId("missing-analytics-with-stats")).get().isExists());
assertFalse(client().prepareGet(initialStateIndex, InferenceStats.docId("missing-model", "test")).get().isExists());
client.execute(PutDataFrameAnalyticsAction.INSTANCE, analyticsRequest).actionGet();

TrainedModelDefinition.Builder definition = new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
.setTrainedModel(
Tree.builder().setFeatureNames(Arrays.asList("foo", "bar")).setRoot(TreeNode.builder(0).setLeafValue(42)).build()
);

TrainedModelConfig modelConfig = TrainedModelConfig.builder()
.setModelId(modelId)
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
.setParsedDefinition(definition)
.validate(true)
.build();

client.execute(PutTrainedModelAction.INSTANCE, new PutTrainedModelAction.Request(modelConfig, false)).actionGet();
}

private void indexStatDocument(ToXContentObject object, String docId) throws Exception {
ToXContent.Params params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, Boolean.toString(true))
);
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias());
doc.id(docId);
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()).id(docId);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
object.toXContent(builder, params);
object.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")));
doc.source(builder);
client.index(doc).actionGet();
}
}

private void refreshStatsIndex() {
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
}

private void runUnusedStatsRemover() {
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
new UnusedStatsRemover(client, new TaskId("test", 0L)).remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();
}

private void makeIndexReadOnly() {
client().admin()
.indices()
.prepareUpdateSettings(MlStatsIndex.indexPattern())
.setSettings(Settings.builder().put("index.blocks.write", true))
.get();
}

private void assertDocExists(String index, String docId) {
assertTrue(client().prepareGet(index, docId).get().isExists());
}

private void assertDocDoesNotExist(String index, String docId) {
assertFalse(client().prepareGet(index, docId).get().isExists());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTaskExecutor;
import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
Expand Down Expand Up @@ -921,6 +922,9 @@ public Collection<?> createComponents(PluginServices services) {
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
TelemetryProvider telemetryProvider = services.telemetryProvider();

// Initialize WritableIndexExpander
WritableIndexExpander.initialize(clusterService, indexNameExpressionResolver);

if (enabled == false) {
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
// both empty if ML is disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private List<MlDataRemover> createDataRemovers(
TaskId parentTaskId,
AnomalyDetectionAuditor anomalyDetectionAuditor
) {

return Arrays.asList(
new ExpiredResultsRemover(
originClient,
Expand All @@ -252,8 +253,8 @@ private List<MlDataRemover> createDataRemovers(
new ExpiredModelSnapshotsRemover(
originClient,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
threadPool,
parentTaskId,
threadPool,
jobResultsProvider,
anomalyDetectionAuditor
),
Expand All @@ -277,8 +278,8 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
new ExpiredModelSnapshotsRemover(
client,
new VolatileCursorIterator<>(jobs),
threadPool,
parentTaskId,
threadPool,
jobResultsProvider,
anomalyDetectionAuditor
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.security.user.InternalUsers;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -133,7 +134,10 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
}

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])).setRefresh(true)
String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
if (indicesToQuery.length == 0) return;

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));

Expand Down Expand Up @@ -181,7 +185,16 @@ public void deleteAnnotations(
boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
}
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setQuery(query)

String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnnotationIndex.READ_ALIAS_NAME),
listener,
"annotations",
() -> listener.onResponse(true)
);
if (indicesToQuery.length == 0) return;

DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(true)
Expand All @@ -199,6 +212,28 @@ public void deleteAnnotations(
);
}

private <T> String[] removeReadOnlyIndices(
List<String> indicesToQuery,
ActionListener<T> listener,
String entityType,
Runnable onEmpty
) {
try {
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
} catch (Exception e) {
logger.error("Failed to get writable indices for [" + jobId + "]", e);
listener.onFailure(e);
return new String[0];
}
if (indicesToQuery.isEmpty()) {
logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType);
if (onEmpty != null) {
onEmpty.run();
}
}
return indicesToQuery.toArray(String[]::new);
}

/**
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}.
* Forecasts are <em>not</em> deleted, as they will not be automatically regenerated after
Expand All @@ -220,7 +255,14 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
)
)
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setQuery(query)
String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
listener,
"results",
() -> listener.onResponse(true)
);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(true)
Expand Down Expand Up @@ -265,9 +307,14 @@ public void deleteInterimResults() {
* @param listener Response listener
*/
public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setRefresh(
true
)
String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
listener,
"datafeed timing stats",
null
);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));

Expand Down Expand Up @@ -457,7 +504,9 @@ private void deleteResultsByQuery(
ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(indices).setQuery(query)
String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setAbortOnVersionConflict(false)
Expand Down Expand Up @@ -530,7 +579,16 @@ private static IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesRespons
private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener<Boolean> finishedHandler) {
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)

String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
finishedHandler,
"quantiles",
() -> finishedHandler.onResponse(true)
);
if (indicesToQuery.length == 0) return;

DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
.setAbortOnVersionConflict(false)
.setRefresh(true);
Expand Down Expand Up @@ -560,7 +618,14 @@ private void deleteCategorizerState(
) {
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
String[] indicesToQuery = removeReadOnlyIndices(
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
finishedHandler,
"categorizer state",
() -> finishedHandler.onResponse(true)
);
if (indicesToQuery.length == 0) return;
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
.setAbortOnVersionConflict(false)
.setRefresh(true);
Expand Down
Loading
Loading