Skip to content

Commit 8f2c38e

Browse files
committed
[ML] Add daily task to manage .ml-state indices
Add a daily maintenance task to roll over .ml-state indices if the index size exceeds a configurable default size (default 50GB). This replaces the previous method of using ILM to manage the state indices, as that was not a workable solution for serverless. This builds on the work done in PR elastic#136065 which provides similar functionality for results indices. WIP
1 parent a01ab1e commit 8f2c38e

File tree

8 files changed

+317
-96
lines changed

8 files changed

+317
-96
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ public final class MlIndexAndAlias {
8080
private static final Predicate<String> IS_ANOMALIES_SHARED_INDEX = Pattern.compile(
8181
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}"
8282
).asMatchPredicate();
83+
private static final Predicate<String> IS_ANOMALIES_STATE_INDEX = Pattern.compile(
84+
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-\\d{6}"
85+
).asMatchPredicate();
8386
public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias";
8487

8588
static final Comparator<String> INDEX_NAME_COMPARATOR = (index1, index2) -> {
@@ -495,6 +498,16 @@ public static boolean isAnomaliesSharedIndex(String indexName) {
495498
return IS_ANOMALIES_SHARED_INDEX.test(indexName);
496499
}
497500

501+
/**
502+
* Checks if an index name matches the pattern for the ML anomalies state indices (e.g., ".ml-state-000001").
503+
*
504+
* @param indexName The name of the index to check.
505+
* @return {@code true} if the index is an anomalies state index, {@code false} otherwise.
506+
*/
507+
public static boolean isAnomaliesStateIndex(String indexName) {
508+
return IS_ANOMALIES_STATE_INDEX.test(indexName);
509+
}
510+
498511
/**
499512
* Returns the latest index. Latest is the index with the highest
500513
* 6 digit suffix.
@@ -630,6 +643,47 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis
630643
request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE)));
631644
}
632645

646+
/**
647+
* Adds alias actions to a request builder to move the ML state write alias from an old index to a new one after a rollover.
648+
* This method is robust and will move the correct alias regardless of the current alias state on the old index.
649+
*
650+
* @param aliasRequestBuilder The request builder to add actions to.
651+
* @param oldIndex The index from which the alias is being moved.
652+
* @param newIndex The new index to which the alias will be moved.
653+
* @param clusterState The current cluster state, used to inspect existing aliases on the old index.
654+
* @param allStateIndices A list of all current .ml-state indices
655+
* @return The modified {@link IndicesAliasesRequestBuilder}.
656+
*/
657+
public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions(
658+
IndicesAliasesRequestBuilder aliasRequestBuilder,
659+
String oldIndex,
660+
String newIndex,
661+
ClusterState clusterState,
662+
String[] allStateIndices
663+
) {
664+
var meta = clusterState.metadata().getProject().index(oldIndex);
665+
if (meta == null) {
666+
// This should not happen in practice as we are iterating over existing indices, but we defend against it.
667+
return aliasRequestBuilder;
668+
}
669+
670+
// Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one.
671+
aliasRequestBuilder.addAliasAction(
672+
IndicesAliasesRequest.AliasActions.remove().indices(allStateIndices).alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
673+
);
674+
675+
aliasRequestBuilder.addAliasAction(
676+
IndicesAliasesRequest.AliasActions.add()
677+
.index(newIndex)
678+
.alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
679+
.isHidden(true)
680+
.writeIndex(true)
681+
);
682+
683+
return aliasRequestBuilder;
684+
685+
}
686+
633687
/**
634688
* Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover.
635689
* This includes moving the write alias and re-creating the filtered read aliases on the new index.
@@ -640,7 +694,7 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis
640694
* @param clusterState The current cluster state, used to inspect existing aliases on the old index.
641695
* @return The modified {@link IndicesAliasesRequestBuilder}.
642696
*/
643-
public static IndicesAliasesRequestBuilder addIndexAliasesRequests(
697+
public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions(
644698
IndicesAliasesRequestBuilder aliasRequestBuilder,
645699
String oldIndex,
646700
String newIndex,

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ public void testBuildIndexAliasesRequest() {
523523
);
524524

525525
var newIndex = anomaliesIndex + "-000001";
526-
var request = MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build());
526+
var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build());
527527
var actions = request.request().getAliasActions();
528528
assertThat(actions, hasSize(6));
529529

x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
"index" : {
1010
"auto_expand_replicas" : "0-1",
1111
"hidden": true
12-
},
13-
"index.lifecycle.name": "${xpack.ml.index.lifecycle.name}",
14-
"index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}"
12+
}
1513
},
1614
"mappings" : {
1715
"_meta": {
Lines changed: 190 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
11+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1112
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1215
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
1317
import org.elasticsearch.cluster.metadata.AliasMetadata;
1418
import org.elasticsearch.cluster.service.ClusterService;
1519
import org.elasticsearch.common.unit.ByteSizeValue;
1620
import org.elasticsearch.index.IndexVersion;
1721
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
1822
import org.elasticsearch.test.ESIntegTestCase;
1923
import org.elasticsearch.threadpool.ThreadPool;
20-
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
2124
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
2225
import org.elasticsearch.xpack.core.ml.job.config.Job;
2326
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -33,12 +36,14 @@
3336
import java.util.concurrent.atomic.AtomicReference;
3437
import java.util.function.Consumer;
3538

39+
import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary;
3640
import static org.hamcrest.Matchers.containsInAnyOrder;
3741
import static org.hamcrest.Matchers.is;
42+
import static org.hamcrest.Matchers.nullValue;
3843
import static org.mockito.Mockito.mock;
3944

4045
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
41-
public class MlDailyMaintenanceServiceRolloverResultsIndicesIT extends BaseMlIntegTestCase {
46+
public class MlDailyMaintenanceServiceRolloverIndicesIT extends BaseMlIntegTestCase {
4247

4348
private MlDailyMaintenanceService maintenanceService;
4449

@@ -64,45 +69,80 @@ public void createComponents() throws Exception {
6469
);
6570
}
6671

72+
/**
73+
* In production the only way to create a model snapshot is to open a job, and
74+
* opening a job ensures that the state index exists. This suite does not open jobs
75+
* but instead inserts snapshot and state documents directly to the results and
76+
* state indices. This means it needs to create the state index explicitly. This
77+
* method should not be copied to test suites that run jobs in the way they are
78+
* run in production.
79+
*/
80+
@Before
81+
public void addMlState() {
82+
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
83+
createStateIndexAndAliasIfNecessary(
84+
client(),
85+
ClusterState.EMPTY_STATE,
86+
TestIndexNameExpressionResolver.newInstance(),
87+
TEST_REQUEST_TIMEOUT,
88+
future
89+
);
90+
future.actionGet();
91+
}
92+
6793
private void initClusterAndJob() {
6894
internalCluster().ensureAtLeastNumDataNodes(1);
6995
ensureStableCluster(1);
7096
}
7197

72-
public void testTriggerRollResultsIndicesIfNecessaryTask_givenNoIndices() throws Exception {
98+
public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception {
7399
// The null case, nothing to do.
74100

75-
// set the rollover max size to 0B so we can roll the index unconditionally
101+
// Delete the .ml-state-000001 index for this particular test
102+
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
103+
DeleteIndexRequest request = new DeleteIndexRequest(".ml-state-000001");
104+
client().admin().indices().delete(request).actionGet();
105+
106+
// set the rollover max size to 0B so we can roll the indices unconditionally
76107
// It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards.
77108
maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO);
78-
{
79-
GetIndexResponse getIndexResponse = client().admin()
80-
.indices()
81-
.prepareGetIndex(TEST_REQUEST_TIMEOUT)
82-
.setIndices(".ml-anomalies*")
83-
.get();
84-
logger.warn("get_index_response: {}", getIndexResponse.toString());
85-
assertThat(getIndexResponse.getIndices().length, is(0));
86-
var aliases = getIndexResponse.getAliases();
87-
assertThat(aliases.size(), is(0));
88-
}
89109

90-
blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask);
110+
Map<String, Consumer<ActionListener<AcknowledgedResponse>>> params = Map.of(".ml-anomalies*", (listener) -> {
111+
maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener);
112+
}, ".ml-state*", (listener) -> { maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener); });
113+
114+
for (Map.Entry<String, Consumer<ActionListener<AcknowledgedResponse>>> param : params.entrySet()) {
115+
String indexPattern = param.getKey();
116+
Consumer<ActionListener<AcknowledgedResponse>> function = param.getValue();
117+
{
118+
GetIndexResponse getIndexResponse = client().admin()
119+
.indices()
120+
.prepareGetIndex(TEST_REQUEST_TIMEOUT)
121+
.setIndices(indexPattern)
122+
.get();
123+
logger.warn("get_index_response: {}", getIndexResponse.toString());
124+
assertThat(getIndexResponse.getIndices().length, is(0));
125+
var aliases = getIndexResponse.getAliases();
126+
assertThat(aliases.size(), is(0));
127+
}
91128

92-
{
93-
GetIndexResponse getIndexResponse = client().admin()
94-
.indices()
95-
.prepareGetIndex(TEST_REQUEST_TIMEOUT)
96-
.setIndices(".ml-anomalies*")
97-
.get();
98-
logger.warn("get_index_response: {}", getIndexResponse.toString());
99-
assertThat(getIndexResponse.getIndices().length, is(0));
100-
var aliases = getIndexResponse.getAliases();
101-
assertThat(aliases.size(), is(0));
129+
blockingCall(function);
130+
131+
{
132+
GetIndexResponse getIndexResponse = client().admin()
133+
.indices()
134+
.prepareGetIndex(TEST_REQUEST_TIMEOUT)
135+
.setIndices(indexPattern)
136+
.get();
137+
logger.warn("get_index_response: {}", getIndexResponse.toString());
138+
assertThat(getIndexResponse.getIndices().length, is(0));
139+
var aliases = getIndexResponse.getAliases();
140+
assertThat(aliases.size(), is(0));
141+
}
102142
}
103143
}
104144

105-
public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOnRolloverMaxSize() throws Exception {
145+
public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception {
106146
// The null case, nothing to do.
107147

108148
// set the rollover max size to -1B so the indices should not be rolled over
@@ -225,6 +265,127 @@ public void testTriggerRollResultsIndicesIfNecessaryTask() throws Exception {
225265
runTestScenario(jobs_with_custom_index, "custom-fred");
226266
}
227267

268+
public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception {
269+
// 1. Ensure that rollover tasks will always execute
270+
maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO);
271+
272+
// 2. Check the state index exists and has the expected write alias
273+
assertIndicesAndAliases(
274+
"Before rollover (state)",
275+
AnomalyDetectorsIndex.jobStateIndexPattern(),
276+
Map.of(".ml-state-000001", List.of(".ml-state-write"))
277+
);
278+
279+
// 3. Trigger a single maintenance run
280+
blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask);
281+
282+
// 4. Verify state index was rolled over correctly
283+
assertIndicesAndAliases(
284+
"After rollover (state)",
285+
AnomalyDetectorsIndex.jobStateIndexPattern(),
286+
Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write"))
287+
);
288+
289+
// 5. Trigger another maintenance run
290+
blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask);
291+
292+
// 6. Verify state index was rolled over correctly
293+
assertIndicesAndAliases(
294+
"After rollover (state)",
295+
AnomalyDetectorsIndex.jobStateIndexPattern(),
296+
Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write"))
297+
);
298+
}
299+
300+
public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception {
301+
// The null case, nothing to do.
302+
303+
// set the rollover max size to -1B so the indices should not be rolled over
304+
maintenanceService.setRolloverMaxSize(ByteSizeValue.MINUS_ONE);
305+
{
306+
GetIndexResponse getIndexResponse = client().admin()
307+
.indices()
308+
.prepareGetIndex(TEST_REQUEST_TIMEOUT)
309+
.setIndices(".ml-state*")
310+
.get();
311+
logger.warn("get_index_response: {}", getIndexResponse.toString());
312+
assertIndicesAndAliases(
313+
"Before rollover (state)",
314+
AnomalyDetectorsIndex.jobStateIndexPattern(),
315+
Map.of(".ml-state-000001", List.of(".ml-state-write"))
316+
);
317+
}
318+
319+
blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask);
320+
321+
{
322+
GetIndexResponse getIndexResponse = client().admin()
323+
.indices()
324+
.prepareGetIndex(TEST_REQUEST_TIMEOUT)
325+
.setIndices(".ml-state*")
326+
.get();
327+
assertIndicesAndAliases(
328+
"After rollover (state)",
329+
AnomalyDetectorsIndex.jobStateIndexPattern(),
330+
Map.of(".ml-state-000001", List.of(".ml-state-write"))
331+
);
332+
}
333+
}
334+
335+
public void testTriggerRollStateIndicesIfNecessaryTask_givenMissingWriteAlias() throws Exception {
336+
// 1. Ensure that rollover tasks will always attempt to execute
337+
maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO);
338+
339+
// 2. Remove the write alias to create an inconsistent state
340+
client().admin()
341+
.indices()
342+
.prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
343+
.removeAlias(".ml-state-000001", AnomalyDetectorsIndex.jobStateIndexWriteAlias())
344+
.get();
345+
346+
assertIndicesAndAliases(
347+
"Before rollover (state, missing alias)",
348+
AnomalyDetectorsIndex.jobStateIndexPattern(),
349+
Map.of(".ml-state-000001", List.of())
350+
);
351+
352+
// 3. Trigger a maintenance run and expect it to gracefully handle the missing write alias
353+
blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask);
354+
355+
// 4. Verify the index rolled over correctly and the write alias was added
356+
assertIndicesAndAliases(
357+
"After rollover (state, missing alias)",
358+
AnomalyDetectorsIndex.jobStateIndexPattern(),
359+
Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write"))
360+
);
361+
}
362+
363+
public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongIndex() throws Exception {
364+
// 1. Ensure that rollover tasks will always attempt to execute
365+
maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO);
366+
367+
// 2. Create a second, newer state index
368+
createIndex(".ml-state-000002");
369+
370+
// 3. Verify the initial state (write alias is on the older index)
371+
assertIndicesAndAliases(
372+
"Before rollover (state, alias on wrong index)",
373+
AnomalyDetectorsIndex.jobStateIndexPattern(),
374+
Map.of(".ml-state-000001", List.of(".ml-state-write"), ".ml-state-000002", List.of())
375+
);
376+
377+
// 4. The service finds .ml-state-000002 as the latest, but the rollover alias points to ...000001
378+
// Trigger a maintenance run and expect it to gracefully repair the wrongly seated write alias
379+
blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask);
380+
381+
// 5. Verify the index rolled over correctly and the write alias was moved to the latest index
382+
assertIndicesAndAliases(
383+
"After rollover (state, alias on wrong index)",
384+
AnomalyDetectorsIndex.jobStateIndexPattern(),
385+
Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write"))
386+
);
387+
}
388+
228389
private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception {
229390
String firstJobId = jobs[0].getId();
230391
String secondJobId = jobs[1].getId();
@@ -335,7 +496,8 @@ private void assertIndicesAndAliases(String context, String indexWildcard, Map<S
335496
expectedAliases.forEach((indexName, expectedAliasList) -> {
336497
assertThat("Context: " + context, indices.size(), is(expectedAliases.size()));
337498
if (expectedAliasList.isEmpty()) {
338-
assertThat("Context: " + context, aliases.size(), is(0));
499+
List<AliasMetadata> actualAliasMetadata = aliases.get(indexName);
500+
assertThat("Context: " + context, actualAliasMetadata, is(nullValue()));
339501
} else {
340502
List<AliasMetadata> actualAliasMetadata = aliases.get(indexName);
341503
List<String> actualAliasList = actualAliasMetadata.stream().map(AliasMetadata::alias).toList();
@@ -376,12 +538,4 @@ private PutJobAction.Response putJob(Job.Builder job) {
376538
PutJobAction.Request request = new PutJobAction.Request(job);
377539
return client().execute(PutJobAction.INSTANCE, request).actionGet();
378540
}
379-
380-
private void deleteJob(String jobId) {
381-
try {
382-
client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).actionGet();
383-
} catch (Exception e) {
384-
// noop
385-
}
386-
}
387541
}

0 commit comments

Comments
 (0)