diff --git a/docs/changelog/137603.yaml b/docs/changelog/137603.yaml new file mode 100644 index 0000000000000..8934c3fbecc27 --- /dev/null +++ b/docs/changelog/137603.yaml @@ -0,0 +1,5 @@ +pr: 137603 +summary: Add daily task to manage .ml-state indices +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index cf63a75acf5e4..d3f213b4e0c64 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -80,6 +80,9 @@ public final class MlIndexAndAlias { private static final Predicate IS_ANOMALIES_SHARED_INDEX = Pattern.compile( AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}" ).asMatchPredicate(); + private static final Predicate IS_ANOMALIES_STATE_INDEX = Pattern.compile( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-\\d{6}" + ).asMatchPredicate(); public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias"; static final Comparator INDEX_NAME_COMPARATOR = (index1, index2) -> { @@ -495,6 +498,16 @@ public static boolean isAnomaliesSharedIndex(String indexName) { return IS_ANOMALIES_SHARED_INDEX.test(indexName); } + /** + * Checks if an index name matches the pattern for the ML anomalies state indices (e.g., ".ml-state-000001"). + * + * @param indexName The name of the index to check. + * @return {@code true} if the index is an anomalies state index, {@code false} otherwise. + */ + public static boolean isAnomaliesStateIndex(String indexName) { + return IS_ANOMALIES_STATE_INDEX.test(indexName); + } + /** * Returns the latest index. Latest is the index with the highest * 6 digit suffix. @@ -630,6 +643,47 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE))); } + /** + * Adds alias actions to a request builder to move the ML state write alias from an old index to a new one after a rollover. + * This method is robust and will move the correct alias regardless of the current alias state on the old index. + * + * @param aliasRequestBuilder The request builder to add actions to. + * @param oldIndex The index from which the alias is being moved. + * @param newIndex The new index to which the alias will be moved. + * @param clusterState The current cluster state, used to inspect existing aliases on the old index. + * @param allStateIndices A list of all current .ml-state indices + * @return The modified {@link IndicesAliasesRequestBuilder}. + */ + public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String oldIndex, + String newIndex, + ClusterState clusterState, + String[] allStateIndices + ) { + var meta = clusterState.metadata().getProject().index(oldIndex); + if (meta == null) { + // This should not happen in practice as we are iterating over existing indices, but we defend against it. + return aliasRequestBuilder; + } + + // Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove().indices(allStateIndices).alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ); + + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add() + .index(newIndex) + .alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .isHidden(true) + .writeIndex(true) + ); + + return aliasRequestBuilder; + + } + /** * Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover. * This includes moving the write alias and re-creating the filtered read aliases on the new index. @@ -640,7 +694,7 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis * @param clusterState The current cluster state, used to inspect existing aliases on the old index. * @return The modified {@link IndicesAliasesRequestBuilder}. */ - public static IndicesAliasesRequestBuilder addIndexAliasesRequests( + public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( IndicesAliasesRequestBuilder aliasRequestBuilder, String oldIndex, String newIndex, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 64d25f1c957e6..87d1af0e652a2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -523,7 +523,7 @@ public void testBuildIndexAliasesRequest() { ); var newIndex = anomaliesIndex + "-000001"; - var request = MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build()); + var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build()); var actions = request.request().getAliasActions(); assertThat(actions, hasSize(6)); diff --git a/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json b/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json index 6f4d39fdb939a..19c9e4172b58e 100644 --- a/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json +++ b/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json @@ -9,9 +9,7 @@ "index" : { "auto_expand_replicas" : "0-1", "hidden": true - }, - "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}", - "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}" + } }, "mappings" : { "_meta": { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java similarity index 66% rename from x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java rename to x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 5e894d401e403..416f1c20ae3c0 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -8,8 +8,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeValue; @@ -17,7 +21,6 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -33,12 +36,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) -public class MlDailyMaintenanceServiceRolloverResultsIndicesIT extends BaseMlIntegTestCase { +public class MlDailyMaintenanceServiceRolloverIndicesIT extends BaseMlIntegTestCase { private MlDailyMaintenanceService maintenanceService; @@ -64,45 +69,80 @@ public void createComponents() throws Exception { ); } + /** + * In production the only way to create a model snapshot is to open a job, and + * opening a job ensures that the state index exists. This suite does not open jobs + * but instead inserts snapshot and state documents directly to the results and + * state indices. This means it needs to create the state index explicitly. This + * method should not be copied to test suites that run jobs in the way they are + * run in production. + */ + @Before + public void addMlState() { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary( + client(), + ClusterState.EMPTY_STATE, + TestIndexNameExpressionResolver.newInstance(), + TEST_REQUEST_TIMEOUT, + future + ); + future.actionGet(); + } + private void initClusterAndJob() { internalCluster().ensureAtLeastNumDataNodes(1); ensureStableCluster(1); } - public void testTriggerRollResultsIndicesIfNecessaryTask_givenNoIndices() throws Exception { + public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception { // The null case, nothing to do. - // set the rollover max size to 0B so we can roll the index unconditionally + // Delete the .ml-state-000001 index for this particular test + PlainActionFuture future = new PlainActionFuture<>(); + DeleteIndexRequest request = new DeleteIndexRequest(".ml-state-000001"); + client().admin().indices().delete(request).actionGet(); + + // set the rollover max size to 0B so we can roll the indices unconditionally // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-anomalies*") - .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); - var aliases = getIndexResponse.getAliases(); - assertThat(aliases.size(), is(0)); - } - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + Map>> params = Map.of(".ml-anomalies*", (listener) -> { + maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener); + }, ".ml-state*", (listener) -> { maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener); }); + + for (Map.Entry>> param : params.entrySet()) { + String indexPattern = param.getKey(); + Consumer> function = param.getValue(); + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexPattern) + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertThat(getIndexResponse.getIndices().length, is(0)); + var aliases = getIndexResponse.getAliases(); + assertThat(aliases.size(), is(0)); + } - { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-anomalies*") - .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); - var aliases = getIndexResponse.getAliases(); - assertThat(aliases.size(), is(0)); + blockingCall(function); + + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexPattern) + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertThat(getIndexResponse.getIndices().length, is(0)); + var aliases = getIndexResponse.getAliases(); + assertThat(aliases.size(), is(0)); + } } } - public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOnRolloverMaxSize() throws Exception { + public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception { // The null case, nothing to do. // set the rollover max size to -1B so the indices should not be rolled over @@ -225,6 +265,127 @@ public void testTriggerRollResultsIndicesIfNecessaryTask() throws Exception { runTestScenario(jobs_with_custom_index, "custom-fred"); } + public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { + // 1. Ensure that rollover tasks will always execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Check the state index exists and has the expected write alias + assertIndicesAndAliases( + "Before rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write")) + ); + + // 3. Trigger a single maintenance run + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify state index was rolled over correctly + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write")) + ); + + // 5. Trigger another maintenance run + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 6. Verify state index was rolled over correctly + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write")) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception { + // The null case, nothing to do. + + // set the rollover max size to -1B so the indices should not be rolled over + maintenanceService.setRolloverMaxSize(ByteSizeValue.MINUS_ONE); + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(".ml-state*") + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertIndicesAndAliases( + "Before rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write")) + ); + } + + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(".ml-state*") + .get(); + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write")) + ); + } + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenMissingWriteAlias() throws Exception { + // 1. Ensure that rollover tasks will always attempt to execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Remove the write alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(".ml-state-000001", AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .get(); + + assertIndicesAndAliases( + "Before rollover (state, missing alias)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of()) + ); + + // 3. Trigger a maintenance run and expect it to gracefully handle the missing write alias + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify the index rolled over correctly and the write alias was added + assertIndicesAndAliases( + "After rollover (state, missing alias)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write")) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongIndex() throws Exception { + // 1. Ensure that rollover tasks will always attempt to execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Create a second, newer state index + createIndex(".ml-state-000002"); + + // 3. Verify the initial state (write alias is on the older index) + assertIndicesAndAliases( + "Before rollover (state, alias on wrong index)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write"), ".ml-state-000002", List.of()) + ); + + // 4. The service finds .ml-state-000002 as the latest, but the rollover alias points to ...000001 + // Trigger a maintenance run and expect it to gracefully repair the wrongly seated write alias + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 5. Verify the index rolled over correctly and the write alias was moved to the latest index + assertIndicesAndAliases( + "After rollover (state, alias on wrong index)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write")) + ); + } + private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { String firstJobId = jobs[0].getId(); String secondJobId = jobs[1].getId(); @@ -335,7 +496,8 @@ private void assertIndicesAndAliases(String context, String indexWildcard, Map { assertThat("Context: " + context, indices.size(), is(expectedAliases.size())); if (expectedAliasList.isEmpty()) { - assertThat("Context: " + context, aliases.size(), is(0)); + List actualAliasMetadata = aliases.get(indexName); + assertThat("Context: " + context, actualAliasMetadata, is(nullValue())); } else { List actualAliasMetadata = aliases.get(indexName); List actualAliasList = actualAliasMetadata.stream().map(AliasMetadata::alias).toList(); @@ -376,12 +538,4 @@ private PutJobAction.Response putJob(Job.Builder job) { PutJobAction.Request request = new PutJobAction.Request(job); return client().execute(PutJobAction.INSTANCE, request).actionGet(); } - - private void deleteJob(String jobId) { - try { - client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).actionGet(); - } catch (Exception e) { - // noop - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java index 202cc020471f6..027afe839a664 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java @@ -154,7 +154,7 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio ).andThen((l, success) -> { rollover(rolloverAlias, newIndexName, l); }).andThen((l, newIndexNameResponse) -> { - MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, index, newIndexNameResponse, clusterState); // Delete the new alias created for the rollover action aliasRequestBuilder.removeAlias(newIndexNameResponse, rolloverAlias); MlIndexAndAlias.updateAliases(aliasRequestBuilder, l); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 348cdf0e11a2b..792cb4d77dd5a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -253,41 +253,42 @@ private void triggerTasks() { } private void triggerAnomalyDetectionMaintenance() { - // Step 5: Log any error that could have happened - ActionListener finalListener = ActionListener.wrap(response -> { - if (response.isAcknowledged() == false) { - logger.warn("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask failed"); - } else { - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask succeeded"); - } - }, e -> logger.warn("An error occurred during [ML] maintenance tasks execution ", e)); + // The maintenance tasks are chained, where each subsequent task is executed regardless of whether the previous one + // succeeded or failed. - // Step 4: Roll over results indices if necessary - ActionListener rollResultsIndicesIfNecessaryListener = ActionListener.wrap(unused -> { - triggerRollResultsIndicesIfNecessaryTask(finalListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerRollResultsIndicesIfNecessaryTask(finalListener); - }); + // Final step: Log completion + ActionListener finalListener = ActionListener.wrap( + response -> logger.info("Completed [ML] maintenance tasks"), + e -> logger.warn("An error occurred during [ML] maintenance tasks execution", e) + ); + + // Step 5: Roll over state indices + Runnable rollStateIndices = () -> triggerRollStateIndicesIfNecessaryTask(finalListener); + + // Step 4: Roll over results indices + Runnable rollResultsIndices = () -> triggerRollResultsIndicesIfNecessaryTask( + continueOnFailureListener("roll-state-indices", rollStateIndices) + ); // Step 3: Delete expired data - ActionListener deleteJobsListener = ActionListener.wrap(unused -> { - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); - }); + Runnable deleteExpiredData = () -> triggerDeleteExpiredDataTask( + continueOnFailureListener("roll-results-indices", rollResultsIndices) + ); - // Step 2: Reset jobs that are in resetting state without task - ActionListener resetJobsListener = ActionListener.wrap(unused -> { - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); - }); + // Step 2: Reset jobs that are in resetting state without a task + Runnable resetJobs = () -> triggerResetJobsInStateResetWithoutResetTask( + continueOnFailureListener("delete-expired-data", deleteExpiredData) + ); + + // Step 1: Delete jobs that are in deleting state without a task + triggerDeleteJobsInStateDeletingWithoutDeletionTask(continueOnFailureListener("reset-jobs", resetJobs)); + } - // Step 1: Delete jobs that are in deleting state without task - triggerDeleteJobsInStateDeletingWithoutDeletionTask(resetJobsListener); + private ActionListener continueOnFailureListener(String nextTaskName, Runnable next) { + return ActionListener.wrap(response -> next.run(), e -> { + logger.warn(() -> "A maintenance task failed, but maintenance will continue. Triggering next task [" + nextTaskName + "].", e); + next.run(); + }); } private void triggerDataFrameAnalyticsMaintenance() { @@ -321,7 +322,7 @@ private void rollover(Client client, String rolloverAlias, @Nullable String newI ); } - private void rollAndUpdateAliases(ClusterState clusterState, String index, ActionListener listener) { + private void rollAndUpdateAliases(ClusterState clusterState, String index, String[] allIndices, ActionListener listener) { OriginSettingClient originSettingClient = new OriginSettingClient(client, ML_ORIGIN); Tuple newIndexNameAndRolloverAlias = MlIndexAndAlias.createRolloverAliasAndNewIndexName(index); @@ -351,7 +352,17 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio // 3 Update aliases ActionListener rolloverListener = ActionListener.wrap(newIndexNameResponse -> { - MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + if (MlIndexAndAlias.isAnomaliesStateIndex(index)) { + MlIndexAndAlias.addStateIndexRolloverAliasActions( + aliasRequestBuilder, + index, + newIndexNameResponse, + clusterState, + allIndices + ); + } else { + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + } // On success, the rollover alias may have been moved to the new index, so we attempt to remove it from there. // Note that the rollover request is considered "successful" even if it didn't occur due to a condition not being met // (no exception will be thrown). In which case the attempt to remove the alias here will fail with an @@ -374,21 +385,16 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio MlIndexAndAlias.createAliasForRollover(originSettingClient, index, rolloverAlias, getIndicesAliasesListener); } - private String[] findIndicesNeedingRollover(ClusterState clusterState) { - // list all indices starting .ml-anomalies- - // this includes the shared index and all custom results indices - String[] indices = expressionResolver.concreteIndexNames( - clusterState, - IndicesOptions.lenientExpandOpenHidden(), - AnomalyDetectorsIndex.jobResultsIndexPattern() - ); - logger.trace("triggerRollResultsIndicesIfNecessaryTask: indices found: {}", Arrays.toString(indices)); + private String[] findIndicesMatchingPattern(ClusterState clusterState, String indexPattern) { + // list all indices matching the given index pattern + String[] indices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpenHidden(), indexPattern); + logger.trace("findIndicesMatchingPattern: indices found: {} matching pattern [{}]", Arrays.toString(indices), indexPattern); return indices; } - private void rolloverIndexSafely(ClusterState clusterState, String index, List failures) { + private void rolloverIndexSafely(ClusterState clusterState, String index, String[] allIndices, List failures) { PlainActionFuture updated = new PlainActionFuture<>(); - rollAndUpdateAliases(clusterState, index, updated); + rollAndUpdateAliases(clusterState, index, allIndices, updated); try { updated.actionGet(); } catch (Exception ex) { @@ -413,13 +419,16 @@ private void handleRolloverResults(String[] indices, List failures, A finalListener.onResponse(AcknowledgedResponse.FALSE); } - // public for testing - public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask"); + private void triggerRollIndicesIfNecessaryTask( + String taskName, + String indexPattern, + ActionListener finalListener + ) { + logger.info("[ML] maintenance task: [{}] for index pattern [{}]", taskName, indexPattern); ClusterState clusterState = clusterService.state(); - String[] indices = findIndicesNeedingRollover(clusterState); + String[] indices = findIndicesMatchingPattern(clusterState, indexPattern); if (rolloverMaxSize == ByteSizeValue.MINUS_ONE || indices.length == 0) { // Early bath finalListener.onResponse(AcknowledgedResponse.TRUE); @@ -430,11 +439,21 @@ public void triggerRollResultsIndicesIfNecessaryTask(ActionListener MlIndexAndAlias.latestIndexMatchingBaseName(index, expressionResolver, clusterState).equals(index)) - .forEach(index -> rolloverIndexSafely(clusterState, index, failures)); + .forEach(latestIndex -> rolloverIndexSafely(clusterState, latestIndex, indices, failures)); handleRolloverResults(indices, failures, finalListener); } + // public for testing + public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { + triggerRollIndicesIfNecessaryTask("roll-state-indices", AnomalyDetectorsIndex.jobResultsIndexPattern(), finalListener); + } + + // public for testing + public void triggerRollStateIndicesIfNecessaryTask(ActionListener finalListener) { + triggerRollIndicesIfNecessaryTask("roll-results-indices", AnomalyDetectorsIndex.jobStateIndexPattern(), finalListener); + } + private void triggerDeleteExpiredDataTask(ActionListener finalListener) { ActionListener deleteExpiredDataActionListener = finalListener.delegateFailureAndWrap( (l, deleteExpiredDataResponse) -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index 02fcc2b4465f3..57a1dcb9bd0b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -60,9 +60,6 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { private IndexTemplateConfig stateTemplate() { Map variables = new HashMap<>(); variables.put(VERSION_ID_PATTERN, String.valueOf(ML_INDEX_TEMPLATE_VERSION)); - // In serverless a different version of "state_index_template.json" is shipped that won't substitute the ILM policy variable - variables.put(INDEX_LIFECYCLE_NAME, ML_SIZE_BASED_ILM_POLICY_NAME); - variables.put(INDEX_LIFECYCLE_ROLLOVER_ALIAS, AnomalyDetectorsIndex.jobStateIndexWriteAlias()); return new IndexTemplateConfig( AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java index acda7e981489d..5da433f09a1e6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java @@ -103,8 +103,6 @@ public void testStateTemplate() { .findFirst() .orElseThrow(() -> new AssertionError("expected the ml state index template to be put")); ComposableIndexTemplate indexTemplate = req.indexTemplate(); - assertThat(indexTemplate.template().settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy")); - assertThat(indexTemplate.template().settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write")); } public void testStatsTemplate() {