Skip to content
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
9ff0b94
[ML] 8Manage rollover of AD results indices
edsavage Oct 6, 2025
78d0d8e
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 6, 2025
0ec99b6
Spotless Apply
edsavage Oct 6, 2025
8405ff3
Spotless Apply
edsavage Oct 6, 2025
d10d09d
Bit of a tidy up
edsavage Oct 6, 2025
dfb1939
Slight refactor
edsavage Oct 6, 2025
ffc2842
Another tidy
edsavage Oct 6, 2025
9863169
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 6, 2025
8f17540
Remove unused accessor
edsavage Oct 7, 2025
d60a811
Update docs/changelog/136065.yaml
edsavage Oct 7, 2025
2729a86
Merge branch 'main' into manage_ad_results_indices
edsavage Oct 7, 2025
c5f58e1
[CI] Auto commit changes from spotless
Oct 7, 2025
7b6caf0
Address some test failures
edsavage Oct 8, 2025
52ab642
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 8, 2025
66d7268
Typos
edsavage Oct 8, 2025
f9296fd
[CI] Auto commit changes from spotless
Oct 8, 2025
07ddbaa
Make the max results index size for rollover user configurable.
edsavage Oct 8, 2025
e7eb106
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 8, 2025
cb85a49
Fix bad merge
edsavage Oct 8, 2025
123c8cb
[CI] Auto commit changes from spotless
Oct 8, 2025
1a5d0da
Merge branch 'main' into manage_ad_results_indices
edsavage Oct 8, 2025
c54a354
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 8, 2025
80b1b71
Test fixes
edsavage Oct 8, 2025
84ad33d
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 8, 2025
2a66d40
Remove assertion for condition that is no longer entirely true
edsavage Oct 8, 2025
83e7e2b
A few more test fixes
edsavage Oct 9, 2025
e9c0c2c
Fixed typo in docs
edsavage Oct 9, 2025
0b6b79a
Tweaks to yamlRestCompatTests
edsavage Oct 10, 2025
97106a1
A few more test fixes
edsavage Oct 10, 2025
468f712
Another test fix through rewrite rules
edsavage Oct 13, 2025
96cecaf
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 13, 2025
63f97c5
Another REST compatibility test transformation
edsavage Oct 13, 2025
8050133
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 13, 2025
c320ccc
First draft of integration tests
edsavage Oct 16, 2025
8e22944
2nd draft of integration test
edsavage Oct 16, 2025
644c57f
spotless fixes
edsavage Oct 16, 2025
0532c8b
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 16, 2025
5465b24
[CI] Auto commit changes from spotless
Oct 16, 2025
42e63c9
spotless fixes
edsavage Oct 16, 2025
5e7a9da
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 16, 2025
16cb2d1
Fix broken test case
edsavage Oct 17, 2025
7abf680
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 17, 2025
b5513a2
Tidy up
edsavage Oct 17, 2025
29faf9e
Add more test scenarios
edsavage Oct 17, 2025
77ab9df
[CI] Auto commit changes from spotless
Oct 17, 2025
8bf9e6f
Merge branch 'main' into manage_ad_results_indices
edsavage Oct 19, 2025
f9f6881
Merge branch 'main' into manage_ad_results_indices
edsavage Oct 19, 2025
cd9b467
Refactor integration tests
edsavage Oct 20, 2025
fe2c12d
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 20, 2025
326eedd
Attend to code review comments
edsavage Oct 23, 2025
21ea400
Remove unneeded variable
edsavage Oct 23, 2025
f90e4b8
[CI] Auto commit changes from spotless
Oct 23, 2025
5d06353
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 23, 2025
988eeeb
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 23, 2025
000e1b3
Bugfix and typo
edsavage Oct 23, 2025
cc157c8
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 23, 2025
58540ae
More tests and fixes
edsavage Oct 24, 2025
74d92ae
* Clarified documentation regarding results_index_rollover_max_size
edsavage Oct 29, 2025
0c2fbe9
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136065.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136065
summary: Nightly maintenance for anomaly detection results indices to keep to manageable size.
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ $$$xpack.ml.max_open_jobs$$$
`xpack.ml.nightly_maintenance_requests_per_second`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The rate at which the nightly maintenance task deletes expired model snapshots and results. The setting is a proxy to the [`requests_per_second`](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query) parameter used in the delete by query requests and controls throttling. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Valid values must be greater than `0.0` or equal to `-1.0`, where `-1.0` means a default value is used. Defaults to `-1.0`

`xpack.ml.results_index_rollover_max_size`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The maximum size the anomaly detection results indices can reach before being rolled over by the nightly maintenance task. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Valid values must be greater than or equal to `0B`. A value of `0B` means the indices will always be rolled over. Defaults to `50GB`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indices will always be rolled over

What does this mean? And how can you turn off the rollover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? And how can you turn off the rollover?

It means that regardless of size of the results indices, they will always be rolled over. I can update the docs to be more specific about this and/or we can choose a more suitable minimum value for xpack.ml.results_index_rollover_max_size

Currently no, rollover can't be turned off. I think it should be able to be though, I'll add that change.


`xpack.ml.node_concurrent_job_allocations`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The maximum number of jobs that can concurrently be in the `opening` state on each node. Typically, jobs spend a small amount of time in this state before they move to `open` state. Jobs that must restore large models when they are opening spend more time in the `opening` state. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Defaults to `2`.

Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ tasks.named("precommit").configure {
}

tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.replaceIsTrue("\\.ml-anomalies-shared.mappings._meta.version", "\\.ml-anomalies-shared-000001.mappings._meta.version")
task.replaceKeyInMatch("\\.ml-anomalies-shared.mappings.new_field.mapping.new_field.type", "\\.ml-anomalies-shared-000001.mappings.new_field.mapping.new_field.type")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-shared", ".ml-anomalies-shared-000001")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-1,.ml-anomalies-custom-all-test-2", ".ml-anomalies-custom-all-test-1-000001,.ml-anomalies-custom-all-test-2-000001")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-1", ".ml-anomalies-custom-all-test-1-000001")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-2", ".ml-anomalies-custom-all-test-2-000001")
task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry)", "The telemetry output changed. We dropped a column. That's safe.")
task.skipTest("inference/inference_crud/Test get all", "Assertions on number of inference models break due to default configs")
task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry) snapshot version", "The number of functions is constantly increasing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,7 @@ public Job build() {
// Conditional guards against multiple prepending due to updates instead of first creation
resultsIndexName = resultsIndexName.startsWith("custom-") ? resultsIndexName : "custom-" + resultsIndexName;
}

if (datafeedConfig != null) {
if (datafeedConfig.getId() == null) {
datafeedConfig.setId(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -30,17 +31,23 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -70,6 +77,10 @@ public final class MlIndexAndAlias {

private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);
private static final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
private static final Predicate<String> IS_ANOMALIES_SHARED_INDEX = Pattern.compile(
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}"
).asMatchPredicate();
public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias";

static final Comparator<String> INDEX_NAME_COMPARATOR = (index1, index2) -> {
String[] index1Parts = index1.split("-");
Expand Down Expand Up @@ -224,6 +235,21 @@ public static void createIndexAndAliasIfNecessary(
loggingListener.onResponse(false);
}

/**
* Creates a system index based on the provided descriptor if it does not already exist.
* <p>
* The check for existence is simple and will return the listener on the calling thread if successful.
* If the index needs to be created an async call will be made and this method will wait for the index to reach at least
* a yellow health status before notifying the listener, ensuring it is ready for use
* upon a successful response. A {@link ResourceAlreadyExistsException} during creation
* is handled gracefully and treated as a success.
*
* @param client The client to use for the create index request.
* @param clusterState The current cluster state, used for the initial existence check.
* @param descriptor The descriptor containing the index name, settings, and mappings.
* @param masterNodeTimeout The timeout for waiting on the master node.
* @param finalListener Async listener
*/
public static void createSystemIndexIfNecessary(
Client client,
ClusterState clusterState,
Expand Down Expand Up @@ -323,6 +349,16 @@ private static void createFirstConcreteIndex(
);
}

/**
* Creates or moves a write alias from one index to another.
*
* @param client The client to use for the add alias request.
* @param alias The alias to update.
* @param currentIndex The index the alias is currently pointing to.
* @param newIndex The new index the alias should point to.
* @param masterNodeTimeout The timeout for waiting on the master node.
* @param listener Async listener
*/
public static void updateWriteAlias(
Client client,
String alias,
Expand Down Expand Up @@ -357,7 +393,7 @@ public static void updateWriteAlias(
/**
* Installs the index template specified by {@code templateConfig} if it is not in already
* installed in {@code clusterState}.
*
* <p>
* The check for presence is simple and will return the listener on
* the calling thread if successful. If the template has to be installed
* an async call will be made.
Expand Down Expand Up @@ -427,17 +463,38 @@ public static void installIndexTemplateIfRequired(
executeAsyncWithOrigin(client, ML_ORIGIN, TransportPutComposableIndexTemplateAction.TYPE, templateRequest, innerListener);
}

public static boolean hasIndexTemplate(ClusterState state, String templateName, long version) {
private static boolean hasIndexTemplate(ClusterState state, String templateName, long version) {
var template = state.getMetadata().getProject().templatesV2().get(templateName);
return template != null && Long.valueOf(version).equals(template.version());
}

public static String ensureValidResultsIndexName(String indexName) {
// The results index name is either the original one provided or the original with a suffix appended.
return has6DigitSuffix(indexName) ? indexName : indexName + FIRST_INDEX_SIX_DIGIT_SUFFIX;
}

/**
* Checks if an index name ends with a 6-digit suffix (e.g., "-000001").
*
* @param indexName The name of the index to check.
* @return {@code true} if the index name has a 6-digit suffix, {@code false} otherwise.
*/
public static boolean has6DigitSuffix(String indexName) {
String[] indexParts = indexName.split("-");
String suffix = indexParts[indexParts.length - 1];
return HAS_SIX_DIGIT_SUFFIX.test(suffix);
}

/**
* Checks if an index name matches the pattern for the default ML anomalies indices (e.g., ".ml-anomalies-shared-000001").
*
* @param indexName The name of the index to check.
* @return {@code true} if the index is a shared anomalies index, {@code false} otherwise.
*/
public static boolean isAnomaliesSharedIndex(String indexName) {
return IS_ANOMALIES_SHARED_INDEX.test(indexName);
}

/**
* Returns the latest index. Latest is the index with the highest
* 6 digit suffix.
Expand All @@ -456,4 +513,194 @@ public static String latestIndex(String[] concreteIndices) {
public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) {
return version.onOrAfter(IndexVersions.V_8_0_0);
}

/**
* Strip any suffix from the index name and find any other indices
* that match the base name. Then return the latest index from the
* matching ones.
*
* @param index The index to check
* @param expressionResolver The expression resolver
* @param latestState The latest cluster state
* @return The latest index that matches the base name of the given index
*/
public static String latestIndexMatchingBaseName(
String index,
IndexNameExpressionResolver expressionResolver,
ClusterState latestState
) {
String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index)
? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length())
: index;

String[] matching = expressionResolver.concreteIndexNames(
latestState,
IndicesOptions.lenientExpandOpenHidden(),
baseIndexName + "*"
);

// We used to assert here if no matching indices could be found. However, when called _before_ a job is created it may be the case
// that no .ml-anomalies-shared* indices yet exist
if (matching.length == 0) {
return index;
}

// Exclude indices that start with the same base name but are a different index
// e.g. .ml-anomalies-foobar should not be included when the index name is
// .ml-anomalies-foo
String[] filtered = Arrays.stream(matching).filter(i -> {
return i.equals(index) || (has6DigitSuffix(i) && i.length() == baseIndexName.length() + FIRST_INDEX_SIX_DIGIT_SUFFIX.length());
}).toArray(String[]::new);

return MlIndexAndAlias.latestIndex(filtered);
}

/**
* Executes a rollover request. It handles {@link ResourceAlreadyExistsException} gracefully by treating it as a success
* and returning the name of the existing index.
*
* @param client The client to use for the rollover request.
* @param rolloverRequest The rollover request to execute.
* @param listener A listener that will be notified with the name of the new (or pre-existing) index on success,
* or an exception on failure.
*/
public static void rollover(Client client, RolloverRequest rolloverRequest, ActionListener<String> listener) {
client.admin()
.indices()
.rolloverIndex(rolloverRequest, ActionListener.wrap(response -> listener.onResponse(response.getNewIndex()), e -> {
if (e instanceof ResourceAlreadyExistsException alreadyExistsException) {
// The destination index already exists possibly because it has been rolled over already.
listener.onResponse(alreadyExistsException.getIndex().getName());
} else {
listener.onFailure(e);
}
}));
}

public static Tuple<String, String> createRolloverAliasAndNewIndexName(String index) {
String indexName = Objects.requireNonNull(index);

// Create an alias specifically for rolling over.
// The ml-anomalies index has aliases for each job, any
// of which could be used but that means one alias is
// treated differently.
// ROLLOVER_ALIAS_SUFFIX puts a `.` in the alias name to avoid any conflicts
// as AD job Ids cannot start with `.`
String rolloverAlias = indexName + ROLLOVER_ALIAS_SUFFIX;

// If the index does not end in a digit then rollover does not know
// what to name the new index so it must be specified in the request.
// Otherwise leave null and rollover will calculate the new name
String newIndexName = MlIndexAndAlias.has6DigitSuffix(index) ? null : indexName + MlIndexAndAlias.FIRST_INDEX_SIX_DIGIT_SUFFIX;

return new Tuple<>(rolloverAlias, newIndexName);
}

public static IndicesAliasesRequestBuilder createIndicesAliasesRequestBuilder(Client client) {
return client.admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS);
}

/**
* Creates a hidden alias for an index, typically used as a rollover target.
*
* @param client The client to use for the alias request.
* @param indexName The name of the index to which the alias will be added.
* @param aliasName The name of the alias to create.
* @param listener A listener that will be notified with the response.
*/
public static void createAliasForRollover(
Client client,
String indexName,
String aliasName,
ActionListener<IndicesAliasesResponse> listener
) {
logger.info("creating rollover [{}] alias for [{}]", aliasName, indexName);
createIndicesAliasesRequestBuilder(client).addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName).isHidden(true)
).execute(listener);
}

/**
* Executes a prepared {@link IndicesAliasesRequestBuilder} and notifies the listener of the result.
*
* @param request The prepared request builder containing alias actions.
* @param listener A listener that will be notified with {@code true} on success.
*/
public static void updateAliases(IndicesAliasesRequestBuilder request, ActionListener<Boolean> listener) {
request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE)));
}

/**
* Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover.
* This includes moving the write alias and re-creating the filtered read aliases on the new index.
*
* @param aliasRequestBuilder The request builder to add actions to.
* @param oldIndex The index from which aliases are being moved.
* @param newIndex The new index to which aliases will be moved.
* @param clusterState The current cluster state, used to inspect existing aliases on the old index.
* @return The modified {@link IndicesAliasesRequestBuilder}.
*/
public static IndicesAliasesRequestBuilder addIndexAliasesRequests(
IndicesAliasesRequestBuilder aliasRequestBuilder,
String oldIndex,
String newIndex,
ClusterState clusterState
) {
// Multiple jobs can share the same index each job
// has a read and write alias that needs updating
// after the rollover
var meta = clusterState.metadata().getProject().index(oldIndex);
assert meta != null;
if (meta == null) {
return aliasRequestBuilder;
}

for (var alias : meta.getAliases().values()) {
if (isAnomaliesWriteAlias(alias.alias())) {
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true)
);
aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias()));
} else if (isAnomaliesReadAlias(alias.alias())) {
String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias());
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(newIndex)
.alias(alias.alias())
.isHidden(true)
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
);
}
}

return aliasRequestBuilder;
}

/**
* Determines if an alias name is an ML anomalies write alias.
*
* @param aliasName The alias name to check.
* @return {@code true} if the name matches the write alias pattern, {@code false} otherwise.
*/
public static boolean isAnomaliesWriteAlias(String aliasName) {
return aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_WRITE_PREFIX);
}

/**
* Determines if an alias name is an ML anomalies read alias.
*
* @param aliasName The alias name to check.
* @return {@code true} if the name matches the read alias pattern, {@code false} otherwise.
*/
public static boolean isAnomaliesReadAlias(String aliasName) {
if (aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX) == false) {
return false;
}

// See {@link AnomalyDetectorsIndex#jobResultsAliasedName}
String jobIdPart = aliasName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
// If this is a write alias it will start with a `.` character
// which is not a valid job id.
return MlStrings.isValidId(jobIdPart);
}
}
Loading