Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8f2c38e
[ML] Add daily task to manage .ml-state indices
edsavage Nov 5, 2025
4020b12
Update docs/changelog/137653.yaml
edsavage Nov 5, 2025
21436a7
Improve testing and perform better maintenance on aliases.
edsavage Nov 7, 2025
479a386
fix for failing test
edsavage Nov 10, 2025
db46939
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Nov 10, 2025
9853266
Tidy up some loose ends
edsavage Nov 10, 2025
df2859c
[CI] Auto commit changes from spotless
Nov 10, 2025
84f114e
Attend to code review comments
edsavage Nov 11, 2025
57bfe13
checkstyle fix
edsavage Nov 11, 2025
21fcf3b
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Nov 11, 2025
5ace996
Merge branch 'manage_ad_state_indices' of github.com:edsavage/elastic…
edsavage Nov 11, 2025
01ffee4
Add more tests exercising ILM behaviour
edsavage Nov 11, 2025
6fe4bad
[CI] Auto commit changes from spotless
Nov 11, 2025
3e884b4
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Nov 11, 2025
e8f8156
Simplification of logic in addResultsIndexRolloverAliasActions
edsavage Nov 12, 2025
7a45467
Fixed and tidied test case
edsavage Nov 12, 2025
b32e5ef
Merge branch 'main' into manage_ad_state_indices
edsavage Nov 12, 2025
3b22204
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Nov 12, 2025
632e5b5
Added test case for hasIlm
edsavage Nov 13, 2025
e03a57f
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137653
summary: Add daily task to manage .ml-state indices
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class AnomalyDetectorsIndex {

private static final String RESULTS_MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
private static final String RESOURCE_PATH = "/ml/anomalydetection/";
private static final String WRITE_ALIAS_PREFIX = ".write-";
public static final int RESULTS_INDEX_MAPPINGS_VERSION = 1;

private AnomalyDetectorsIndex() {}
Expand Down Expand Up @@ -61,7 +62,12 @@ public static String jobIdFromAlias(String jobResultsAliasedName) {
if (jobResultsAliasedName.length() < AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()) {
return null;
}
return jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());

var jobId = jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
if (jobId.startsWith(WRITE_ALIAS_PREFIX)) {
jobId = jobId.substring(WRITE_ALIAS_PREFIX.length());
}
return jobId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.core.Nullable;
Expand All @@ -45,8 +46,12 @@
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
Expand Down Expand Up @@ -80,6 +85,9 @@ public final class MlIndexAndAlias {
private static final Predicate<String> IS_ANOMALIES_SHARED_INDEX = Pattern.compile(
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}"
).asMatchPredicate();
private static final Predicate<String> IS_ANOMALIES_STATE_INDEX = Pattern.compile(
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-\\d{6}"
).asMatchPredicate();
public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias";

static final Comparator<String> INDEX_NAME_COMPARATOR = (index1, index2) -> {
Expand Down Expand Up @@ -468,6 +476,12 @@ private static boolean hasIndexTemplate(ClusterState state, String templateName,
return template != null && Long.valueOf(version).equals(template.version());
}

/**
* Ensures a given index name is valid for ML results by appending the 6-digit suffix if it is missing.
*
* @param indexName The index name to validate.
* @return The validated index name, with the suffix added if it was missing.
*/
public static String ensureValidResultsIndexName(String indexName) {
// The results index name is either the original one provided or the original with a suffix appended.
return has6DigitSuffix(indexName) ? indexName : indexName + FIRST_INDEX_SIX_DIGIT_SUFFIX;
Expand Down Expand Up @@ -495,6 +509,16 @@ public static boolean isAnomaliesSharedIndex(String indexName) {
return IS_ANOMALIES_SHARED_INDEX.test(indexName);
}

/**
* Checks if an index name matches the pattern for the ML anomalies state indices (e.g., ".ml-state-000001").
*
* @param indexName The name of the index to check.
* @return {@code true} if the index is an anomalies state index, {@code false} otherwise.
*/
public static boolean isAnomaliesStateIndex(String indexName) {
return IS_ANOMALIES_STATE_INDEX.test(indexName);
}

/**
* Returns the latest index. Latest is the index with the highest
* 6 digit suffix.
Expand All @@ -507,13 +531,56 @@ public static String latestIndex(String[] concreteIndices) {
: Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get();
}

/**
* Sorts the given list of indices based on their 6 digit suffix.
* @param indices List of index names
*/
public static void sortIndices(List<String> indices) {
indices.sort(INDEX_NAME_COMPARATOR);
}

/**
* True if the version is read *and* write compatible not just read only compatible
*/
public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) {
return version.onOrAfter(IndexVersions.V_8_0_0);
}

/**
* Returns the given index name without its 6 digit suffix.
* @param index
* @return
*/
public static String baseIndexName(String index) {
String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index)
? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length())
: index;

return baseIndexName;
}

/**
* Returns an array of indices that match the given base index name.
* @param baseIndexName The base part of an index name, without the 6 digit suffix.
* @param expressionResolver The expression resolver
* @param latestState The latest cluster state
* @return An array of matching indices.
*/
public static String[] indicesMatchingBasename(
String baseIndexName,
IndexNameExpressionResolver expressionResolver,
ClusterState latestState
) {

String[] matching = expressionResolver.concreteIndexNames(
latestState,
IndicesOptions.lenientExpandOpenHidden(),
baseIndexName + "*"
);

return matching;
}

/**
* Strip any suffix from the index name and find any other indices
* that match the base name. Then return the latest index from the
Expand All @@ -529,15 +596,10 @@ public static String latestIndexMatchingBaseName(
IndexNameExpressionResolver expressionResolver,
ClusterState latestState
) {
String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index)
? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length())
: index;

String[] matching = expressionResolver.concreteIndexNames(
latestState,
IndicesOptions.lenientExpandOpenHidden(),
baseIndexName + "*"
);
String baseIndexName = baseIndexName(index);

var matching = indicesMatchingBasename(baseIndexName, expressionResolver, latestState);

// We used to assert here if no matching indices could be found. However, when called _before_ a job is created it may be the case
// that no .ml-anomalies-shared* indices yet exist
Expand Down Expand Up @@ -577,6 +639,15 @@ public static void rollover(Client client, RolloverRequest rolloverRequest, Acti
}));
}

/**
* Generates a temporary rollover alias and a potential new index name based on a source index name.
* This is a preparatory step for a rollover action. If the source index already has a 6-digit suffix,
* the new index name will be null, allowing the rollover API to auto-increment the suffix.
*
* @param index The name of the index that is a candidate for rollover.
* @return A {@link Tuple} where {@code v1} is the generated rollover alias and {@code v2} is the new index name
* (or {@code null} if rollover can auto-determine it).
*/
public static Tuple<String, String> createRolloverAliasAndNewIndexName(String index) {
String indexName = Objects.requireNonNull(index);

Expand All @@ -596,6 +667,12 @@ public static Tuple<String, String> createRolloverAliasAndNewIndexName(String in
return new Tuple<>(rolloverAlias, newIndexName);
}

/**
* Creates a pre-configured {@link IndicesAliasesRequestBuilder} with default timeouts.
*
* @param client The client to use for the request.
* @return A new {@link IndicesAliasesRequestBuilder}.
*/
public static IndicesAliasesRequestBuilder createIndicesAliasesRequestBuilder(Client client) {
return client.admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS);
}
Expand Down Expand Up @@ -630,6 +707,57 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis
request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE)));
}

/**
* Adds alias actions to a request builder to move the ML state write alias from an old index to a new one after a rollover.
* This method is robust and will move the correct alias regardless of the current alias state on the old index.
*
* @param aliasRequestBuilder The request builder to add actions to.
* @param oldIndex The index from which the alias is being moved.
* @param newIndex The new index to which the alias will be moved.
* @param clusterState The current cluster state, used to inspect existing aliases on the old index.
* @param allStateIndices A list of all current .ml-state indices
* @return The modified {@link IndicesAliasesRequestBuilder}.
*/
public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions(
IndicesAliasesRequestBuilder aliasRequestBuilder,
String oldIndex,
String newIndex,
ClusterState clusterState,
List<String> allStateIndices
) {
var meta = clusterState.metadata().getProject().index(oldIndex);
if (meta == null) {
// This should not happen in practice as we are iterating over existing indices, but we defend against it.
return aliasRequestBuilder;
}

// Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one.
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.remove()
.indices(allStateIndices.toArray(new String[0]))
.alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
);

aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(newIndex)
.alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
.isHidden(true)
.writeIndex(true)
);

return aliasRequestBuilder;

}

private static Optional<String> findEarliestIndexWithAlias(Map<String, List<AliasMetadata>> aliasesMap, AliasMetadata targetAlias) {
return aliasesMap.entrySet()
.stream()
.filter(entry -> entry.getValue().contains(targetAlias))
.map(Map.Entry::getKey)
.min(INDEX_NAME_COMPARATOR);
}

/**
* Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover.
* This includes moving the write alias and re-creating the filtered read aliases on the new index.
Expand All @@ -640,36 +768,81 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis
* @param clusterState The current cluster state, used to inspect existing aliases on the old index.
* @return The modified {@link IndicesAliasesRequestBuilder}.
*/
public static IndicesAliasesRequestBuilder addIndexAliasesRequests(
public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions(
IndicesAliasesRequestBuilder aliasRequestBuilder,
String oldIndex,
String newIndex,
ClusterState clusterState
ClusterState clusterState,
List<String> currentJobResultsIndices
) {
// Multiple jobs can share the same index each job
// has a read and write alias that needs updating
// after the rollover
var meta = clusterState.metadata().getProject().index(oldIndex);
assert meta != null;
if (meta == null) {
var aliasesMap = clusterState.metadata().getProject().findAllAliases(currentJobResultsIndices.toArray(new String[0]));
if (aliasesMap == null) {
return aliasRequestBuilder;
}

for (var alias : meta.getAliases().values()) {
// Compile a unique set of all aliases from all the indices.
// An alias could appear on multiple indices in an inconsistent state, but its properties (filter, etc.) should be the same.
var uniqueAliases = new HashSet<AliasMetadata>();
for (var indexAliases : aliasesMap.values()) {
uniqueAliases.addAll(indexAliases);
}

// Make sure to include the new index
List<String> allJobResultsIndices = new ArrayList<>(currentJobResultsIndices);
allJobResultsIndices.add(newIndex);
MlIndexAndAlias.sortIndices(allJobResultsIndices);

for (var alias : uniqueAliases) {
if (isAnomaliesWriteAlias(alias.alias())) {
// Remove the write alias from ALL job results indices to handle any inconsistencies where it might exist on more than one.
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.remove()
.indices(currentJobResultsIndices.toArray(new String[0]))
.alias(alias.alias())
);
// Add the write alias to the latest results index
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true)
);
aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias()));
String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias());
String readAlias = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
// Always take the opportunity to add the read alias on the latest index
// as it may have been missing on the old index
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.indices(newIndex)
.alias(readAlias)
.isHidden(true)
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
);
} else if (isAnomaliesReadAlias(alias.alias())) {
// Try to generate a sub list of indices to operate on where the first index in the list is the first one with the current
// read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index.
int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).map(allJobResultsIndices::indexOf)
// If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices.
.filter(i -> i >= 0)
.orElse(0);

String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias());
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(newIndex)
.indices(
allJobResultsIndices.subList(indexOfEarliestIndexWithAlias, allJobResultsIndices.size()).toArray(new String[0])
)
.alias(alias.alias())
.isHidden(true)
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
);

// Always take the opportunity to add the write alias on the new index
// as it may have been missing on the old index
String writeAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(writeAlias).isHidden(true).writeIndex(true)
);
}
}

Expand Down
Loading