Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9ff0b94
[ML] 8Manage rollover of AD results indices
edsavage Oct 6, 2025
78d0d8e
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 6, 2025
0ec99b6
Spotless Apply
edsavage Oct 6, 2025
8405ff3
Spotless Apply
edsavage Oct 6, 2025
d10d09d
Bit of a tidy up
edsavage Oct 6, 2025
dfb1939
Slight refactor
edsavage Oct 6, 2025
ffc2842
Another tidy
edsavage Oct 6, 2025
9863169
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 6, 2025
8f17540
Remove unused accessor
edsavage Oct 7, 2025
d60a811
Update docs/changelog/136065.yaml
edsavage Oct 7, 2025
2729a86
Merge branch 'main' into manage_ad_results_indices
edsavage Oct 7, 2025
c5f58e1
[CI] Auto commit changes from spotless
Oct 7, 2025
7b6caf0
Address some test failures
edsavage Oct 8, 2025
52ab642
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 8, 2025
66d7268
Typos
edsavage Oct 8, 2025
f9296fd
[CI] Auto commit changes from spotless
Oct 8, 2025
07ddbaa
Make the max results index size for rollover user configurable.
edsavage Oct 8, 2025
e7eb106
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 8, 2025
cb85a49
Fix bad merge
edsavage Oct 8, 2025
123c8cb
[CI] Auto commit changes from spotless
Oct 8, 2025
1a5d0da
Merge branch 'main' into manage_ad_results_indices
edsavage Oct 8, 2025
c54a354
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 8, 2025
80b1b71
Test fixes
edsavage Oct 8, 2025
84ad33d
Merge remote-tracking branch 'origin/manage_ad_results_indices' into …
edsavage Oct 8, 2025
2a66d40
Remove assertion for condition that is no longer entirely true
edsavage Oct 8, 2025
83e7e2b
A few more test fixes
edsavage Oct 9, 2025
e9c0c2c
Fixed typo in docs
edsavage Oct 9, 2025
0b6b79a
Tweaks to yamlRestCompatTests
edsavage Oct 10, 2025
97106a1
A few more test fixes
edsavage Oct 10, 2025
468f712
Another test fix through rewrite rules
edsavage Oct 13, 2025
96cecaf
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 13, 2025
63f97c5
Another REST compatibility test transformation
edsavage Oct 13, 2025
8050133
Merge branch 'main' of github.com:elasticsearch/elasticsearch into ma…
edsavage Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136065.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136065
summary: Manage ad results indices
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ $$$xpack.ml.max_open_jobs$$$
`xpack.ml.nightly_maintenance_requests_per_second`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The rate at which the nightly maintenance task deletes expired model snapshots and results. The setting is a proxy to the [`requests_per_second`](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query) parameter used in the delete by query requests and controls throttling. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Valid values must be greater than `0.0` or equal to `-1.0`, where `-1.0` means a default value is used. Defaults to `-1.0`

`xpack.ml.nightly_maintenance_rollover_max_size`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The maximum size the anomaly detection results indices can reach before being rolled over by the nightly maintenance task. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Valid values must be greater than `0B` or equal to `-1B`. Defaults to `50GB`.

`xpack.ml.node_concurrent_job_allocations`
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The maximum number of jobs that can concurrently be in the `opening` state on each node. Typically, jobs spend a small amount of time in this state before they move to `open` state. Jobs that must restore large models when they are opening spend more time in the `opening` state. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Defaults to `2`.

Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ tasks.named("precommit").configure {
}

tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.replaceIsTrue("\\.ml-anomalies-shared.mappings._meta.version", "\\.ml-anomalies-shared-000001.mappings._meta.version")
task.replaceKeyInMatch("\\.ml-anomalies-shared.mappings.new_field.mapping.new_field.type", "\\.ml-anomalies-shared-000001.mappings.new_field.mapping.new_field.type")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-shared", ".ml-anomalies-shared-000001")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-1,.ml-anomalies-custom-all-test-2", ".ml-anomalies-custom-all-test-1-000001,.ml-anomalies-custom-all-test-2-000001")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-1", ".ml-anomalies-custom-all-test-1-000001")
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-2", ".ml-anomalies-custom-all-test-2-000001")
task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry)", "The telemetry output changed. We dropped a column. That's safe.")
task.skipTest("inference/inference_crud/Test get all", "Assertions on number of inference models break due to default configs")
task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry) snapshot version", "The number of functions is constantly increasing")
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
requires org.apache.httpcomponents.client5.httpclient5;
requires org.apache.httpcomponents.core5.httpcore5;
requires org.slf4j;
requires org.elasticsearch.logging;

exports org.elasticsearch.index.engine.frozen;
exports org.elasticsearch.license;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
*/
package org.elasticsearch.xpack.core.ml.job.config;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -30,6 +33,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

Expand Down Expand Up @@ -805,6 +809,8 @@ public static class Builder implements Writeable {
private boolean allowLazyOpen;
private Blocked blocked = Blocked.none();
private DatafeedConfig.Builder datafeedConfig;
private SetOnce<ClusterState> clusterState = new SetOnce<>();
private SetOnce<IndexNameExpressionResolver> indexNameExpressionResolver = new SetOnce<>();

public Builder() {}

Expand Down Expand Up @@ -879,6 +885,14 @@ public String getId() {
return id;
}

private void setClusterState(ClusterState state) {
this.clusterState.set(state);
}

private void setIndexNameExpressionResolver(IndexNameExpressionResolver indexNameExpressionResolver) {
this.indexNameExpressionResolver.set(indexNameExpressionResolver);
}

public void setJobVersion(MlConfigVersion jobVersion) {
this.jobVersion = jobVersion;
}
Expand Down Expand Up @@ -1305,6 +1319,16 @@ public void validateDetectorsAreUnique() {
}
}

public Job build(
@SuppressWarnings("HiddenField") Date createTime,
ClusterState state,
IndexNameExpressionResolver indexNameExpressionResolver
) {
setClusterState(state);
setIndexNameExpressionResolver(indexNameExpressionResolver);
return build(createTime);
}

/**
* Builds a job with the given {@code createTime} and the current version.
* This should be used when a new job is created as opposed to {@link #build()}.
Expand Down Expand Up @@ -1345,10 +1369,23 @@ public Job build() {
if (Strings.isNullOrEmpty(resultsIndexName)) {
resultsIndexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
} else if (resultsIndexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT) == false) {
// User-defined names are prepended with "custom"
// User-defined names are prepended with "custom" and end with a 6 digit suffix
// Conditional guards against multiple prepending due to updates instead of first creation
resultsIndexName = resultsIndexName.startsWith("custom-") ? resultsIndexName : "custom-" + resultsIndexName;
}

resultsIndexName = MlIndexAndAlias.has6DigitSuffix(resultsIndexName) ? resultsIndexName : resultsIndexName + "-000001";

if (indexNameExpressionResolver.get() != null && clusterState.get() != null) {
String tmpResultsIndexName = MlIndexAndAlias.latestIndexMatchingBaseName(
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + resultsIndexName,
indexNameExpressionResolver.get(),
clusterState.get()
);

resultsIndexName = tmpResultsIndexName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
}

if (datafeedConfig != null) {
if (datafeedConfig.getId() == null) {
datafeedConfig.setId(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public final class AnomalyDetectorsIndexFields {
// ".write" rather than simply "write" to avoid the danger of clashing
// with the read alias of a job whose name begins with "write-"
public static final String RESULTS_INDEX_WRITE_PREFIX = RESULTS_INDEX_PREFIX + ".write-";
public static final String RESULTS_INDEX_DEFAULT = "shared";
public static final String RESULTS_INDEX_DEFAULT = "shared-000001";

private AnomalyDetectorsIndexFields() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -33,9 +34,13 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.io.IOException;
Expand Down Expand Up @@ -456,4 +461,129 @@ public static String latestIndex(String[] concreteIndices) {
public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) {
return version.onOrAfter(IndexVersions.V_8_0_0);
}

/**
* Strip any suffix from the index name and find any other indices
* that match the base name. Then return the latest index from the
* matching ones.
*
* @param index The index to check
* @param expressionResolver The expression resolver
* @param latestState The latest cluster state
* @return The latest index that matches the base name of the given index
*/
public static String latestIndexMatchingBaseName(
String index,
IndexNameExpressionResolver expressionResolver,
ClusterState latestState
) {
String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index)
? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length())
: index;

String[] matching = expressionResolver.concreteIndexNames(
latestState,
IndicesOptions.lenientExpandOpenHidden(),
baseIndexName + "*"
);

// We used to assert here if no matching indices could be found. However, when called _before_ a job is created it may be the case
// that no .ml-anomalies-shared* indices yet exist
if (matching.length == 0) {
return index;
}

// Exclude indices that start with the same base name but are a different index
// e.g. .ml-anomalies-foobar should not be included when the index name is
// .ml-anomalies-foo
String[] filtered = Arrays.stream(matching).filter(i -> {
return i.equals(index) || (has6DigitSuffix(i) && i.length() == baseIndexName.length() + FIRST_INDEX_SIX_DIGIT_SUFFIX.length());
}).toArray(String[]::new);

return MlIndexAndAlias.latestIndex(filtered);
}

public static void rollover(Client client, RolloverRequest rolloverRequest, ActionListener<String> listener) {
client.admin()
.indices()
.rolloverIndex(rolloverRequest, ActionListener.wrap(response -> listener.onResponse(response.getNewIndex()), e -> {
if (e instanceof ResourceAlreadyExistsException alreadyExistsException) {
// The destination index already exists possibly because it has been rolled over already.
listener.onResponse(alreadyExistsException.getIndex().getName());
} else {
listener.onFailure(e);
}
}));
}

public static void createAliasForRollover(
org.elasticsearch.logging.Logger logger,
Client client,
String indexName,
String aliasName,
ActionListener<IndicesAliasesResponse> listener
) {
logger.warn("creating rollover [{}] alias for [{}]", aliasName, indexName);
client.admin()
.indices()
.prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName).isHidden(true))
.execute(listener);
}

public static void updateAliases(IndicesAliasesRequestBuilder request, ActionListener<Boolean> listener) {
request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE)));
}

public static IndicesAliasesRequestBuilder addIndexAliasesRequests(
IndicesAliasesRequestBuilder aliasRequestBuilder,
String oldIndex,
String newIndex,
ClusterState clusterState
) {
// Multiple jobs can share the same index each job
// has a read and write alias that needs updating
// after the rollover
var meta = clusterState.metadata().getProject().index(oldIndex);
assert meta != null;
if (meta == null) {
return aliasRequestBuilder;
}

for (var alias : meta.getAliases().values()) {
if (isAnomaliesWriteAlias(alias.alias())) {
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true)
);
aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias()));
} else if (isAnomaliesReadAlias(alias.alias())) {
String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias());
aliasRequestBuilder.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(newIndex)
.alias(alias.alias())
.isHidden(true)
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
);
}
}

return aliasRequestBuilder;
}

public static boolean isAnomaliesWriteAlias(String aliasName) {
return aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_WRITE_PREFIX);
}

public static boolean isAnomaliesReadAlias(String aliasName) {
if (aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX) == false) {
return false;
}

// See {@link AnomalyDetectorsIndex#jobResultsAliasedName}
String jobIdPart = aliasName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
// If this is a write alias it will start with a `.` character
// which is not a valid job id.
return MlStrings.isValidId(jobIdPart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public void testBuilder_setsIndexName() {
Job.Builder builder = buildJobBuilder("foo");
builder.setResultsIndexName("carol");
Job job = builder.build();
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getInitialResultsIndexName());
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol-000001", job.getInitialResultsIndexName());
}

public void testBuilder_withInvalidIndexNameThrows() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.junit.After;
Expand Down Expand Up @@ -357,6 +358,18 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLeg
assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(TEST_INDEX_ALIAS).isHidden(true))));
}

public void testIsAnomaliesWriteAlias() {
assertTrue(MlIndexAndAlias.isAnomaliesWriteAlias(AnomalyDetectorsIndex.resultsWriteAlias("foo")));
assertFalse(MlIndexAndAlias.isAnomaliesWriteAlias(AnomalyDetectorsIndex.jobResultsAliasedName("foo")));
assertFalse(MlIndexAndAlias.isAnomaliesWriteAlias("some-index"));
}

public void testIsAnomaliesAlias() {
assertTrue(MlIndexAndAlias.isAnomaliesReadAlias(AnomalyDetectorsIndex.jobResultsAliasedName("foo")));
assertFalse(MlIndexAndAlias.isAnomaliesReadAlias(AnomalyDetectorsIndex.resultsWriteAlias("foo")));
assertFalse(MlIndexAndAlias.isAnomaliesReadAlias("some-index"));
}

public void testIndexNameComparator() {
Comparator<String> comparator = MlIndexAndAlias.INDEX_NAME_COMPARATOR;
assertThat(Stream.of("test-000001").max(comparator).get(), equalTo("test-000001"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {
private final long now = System.currentTimeMillis();
private static final long DAY = Duration.ofDays(1).toMillis();
private final String jobId = "bulk-failure-retry-job";
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job-000001";

@Before
public void putPastDataIntoIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -54,6 +55,7 @@ public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws Int
client(),
mock(ClusterService.class),
mock(MlAssignmentNotifier.class),
mock(IndexNameExpressionResolver.class),
true,
true,
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testCreateJobsWithIndexNameOption() throws Exception {
"results_index_name" : "%s"}""";

String jobId1 = "create-jobs-with-index-name-option-job-1";
String indexName = "non-default-index";
String indexName = "non-default-index-000001";
putJob(jobId1, Strings.format(jobTemplate, indexName));

String jobId2 = "create-jobs-with-index-name-option-job-2";
Expand Down Expand Up @@ -406,7 +406,7 @@ public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception {
// Check the index mapping contains the first by_field_name
Request getResultsMappingRequest = new Request(
"GET",
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index/_mapping"
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index-000001/_mapping"
);
getResultsMappingRequest.addParameter("pretty", null);
String resultsMappingAfterJob1 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
Expand Down
Loading
Loading