Skip to content

Commit af801a5

Browse files
edsavageafoucret
authored andcommitted
[ML] Manage AD results indices (elastic#136065)
Add a rollover check for the AD results indices to the nightly ML maintenance task. The results indices will be rolled over when they are of a size equal or greater to 50GB (This value can be adjusted in the cluster config) The concrete AD reults indices now have a six digit suffix. This is necessary to keep track of rollover behaviour and to determine which index is the "latest" in the series. We choose not to use ILM to manage the rollover as that is not available for Serverless.
1 parent dc2ec20 commit af801a5

File tree

32 files changed

+1361
-402
lines changed

32 files changed

+1361
-402
lines changed

docs/changelog/136065.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136065
2+
summary: Nightly maintenance for anomaly detection results indices to keep to manageable size.
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

docs/reference/elasticsearch/configuration-reference/machine-learning-settings.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ $$$xpack.ml.max_open_jobs$$$
8686
`xpack.ml.nightly_maintenance_requests_per_second`
8787
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The rate at which the nightly maintenance task deletes expired model snapshots and results. The setting is a proxy to the [`requests_per_second`](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query) parameter used in the delete by query requests and controls throttling. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Valid values must be greater than `0.0` or equal to `-1.0`, where `-1.0` means a default value is used. Defaults to `-1.0`
8888

89+
`xpack.ml.results_index_rollover_max_size`
90+
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The maximum size the anomaly detection results indices can reach before being rolled over by the nightly maintenance task. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Valid values must be greater than or equal to `-1B`. A value of `-1B` means the indices will never be rolled over. A value of `0B` means the indices will always be rolled over, regardless of size. Defaults to `50GB`.
91+
8992
`xpack.ml.node_concurrent_job_allocations`
9093
: ([Dynamic](docs-content://deploy-manage/stack-settings.md#dynamic-cluster-setting)) The maximum number of jobs that can concurrently be in the `opening` state on each node. Typically, jobs spend a small amount of time in this state before they move to `open` state. Jobs that must restore large models when they are opening spend more time in the `opening` state. When the {{operator-feature}} is enabled, this setting can be updated only by operator users. Defaults to `2`.
9194

x-pack/plugin/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ tasks.named("precommit").configure {
8585
}
8686

8787
tasks.named("yamlRestCompatTestTransform").configure({ task ->
88+
task.replaceIsTrue("\\.ml-anomalies-shared.mappings._meta.version", "\\.ml-anomalies-shared-000001.mappings._meta.version")
89+
task.replaceKeyInMatch("\\.ml-anomalies-shared.mappings.new_field.mapping.new_field.type", "\\.ml-anomalies-shared-000001.mappings.new_field.mapping.new_field.type")
90+
task.replaceValueTextByKeyValue("index", ".ml-anomalies-shared", ".ml-anomalies-shared-000001")
91+
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-1,.ml-anomalies-custom-all-test-2", ".ml-anomalies-custom-all-test-1-000001,.ml-anomalies-custom-all-test-2-000001")
92+
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-1", ".ml-anomalies-custom-all-test-1-000001")
93+
task.replaceValueTextByKeyValue("index", ".ml-anomalies-custom-all-test-2", ".ml-anomalies-custom-all-test-2-000001")
8894
task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry)", "The telemetry output changed. We dropped a column. That's safe.")
8995
task.skipTest("inference/inference_crud/Test get all", "Assertions on number of inference models break due to default configs")
9096
task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry) snapshot version", "The number of functions is constantly increasing")

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ public Job build() {
13491349
// Conditional guards against multiple prepending due to updates instead of first creation
13501350
resultsIndexName = resultsIndexName.startsWith("custom-") ? resultsIndexName : "custom-" + resultsIndexName;
13511351
}
1352+
13521353
if (datafeedConfig != null) {
13531354
if (datafeedConfig.getId() == null) {
13541355
datafeedConfig.setId(id);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

Lines changed: 249 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2121
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
2222
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
23+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
2324
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2425
import org.elasticsearch.action.support.ActiveShardCount;
2526
import org.elasticsearch.action.support.IndicesOptions;
@@ -30,17 +31,23 @@
3031
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3132
import org.elasticsearch.core.Nullable;
3233
import org.elasticsearch.core.TimeValue;
34+
import org.elasticsearch.core.Tuple;
3335
import org.elasticsearch.index.Index;
3436
import org.elasticsearch.index.IndexVersion;
3537
import org.elasticsearch.index.IndexVersions;
38+
import org.elasticsearch.index.query.QueryBuilders;
3639
import org.elasticsearch.indices.SystemIndexDescriptor;
3740
import org.elasticsearch.xcontent.XContentParserConfiguration;
3841
import org.elasticsearch.xcontent.json.JsonXContent;
42+
import org.elasticsearch.xpack.core.ml.job.config.Job;
43+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
44+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
3945
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
4046

4147
import java.io.IOException;
4248
import java.util.Arrays;
4349
import java.util.Comparator;
50+
import java.util.Objects;
4451
import java.util.Optional;
4552
import java.util.function.Predicate;
4653
import java.util.regex.Pattern;
@@ -70,6 +77,10 @@ public final class MlIndexAndAlias {
7077

7178
private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);
7279
private static final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
80+
private static final Predicate<String> IS_ANOMALIES_SHARED_INDEX = Pattern.compile(
81+
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}"
82+
).asMatchPredicate();
83+
public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias";
7384

7485
static final Comparator<String> INDEX_NAME_COMPARATOR = (index1, index2) -> {
7586
String[] index1Parts = index1.split("-");
@@ -224,6 +235,21 @@ public static void createIndexAndAliasIfNecessary(
224235
loggingListener.onResponse(false);
225236
}
226237

238+
/**
239+
* Creates a system index based on the provided descriptor if it does not already exist.
240+
* <p>
241+
* The check for existence is simple and will return the listener on the calling thread if successful.
242+
* If the index needs to be created an async call will be made and this method will wait for the index to reach at least
243+
* a yellow health status before notifying the listener, ensuring it is ready for use
244+
* upon a successful response. A {@link ResourceAlreadyExistsException} during creation
245+
* is handled gracefully and treated as a success.
246+
*
247+
* @param client The client to use for the create index request.
248+
* @param clusterState The current cluster state, used for the initial existence check.
249+
* @param descriptor The descriptor containing the index name, settings, and mappings.
250+
* @param masterNodeTimeout The timeout for waiting on the master node.
251+
* @param finalListener Async listener
252+
*/
227253
public static void createSystemIndexIfNecessary(
228254
Client client,
229255
ClusterState clusterState,
@@ -323,6 +349,16 @@ private static void createFirstConcreteIndex(
323349
);
324350
}
325351

352+
/**
353+
* Creates or moves a write alias from one index to another.
354+
*
355+
* @param client The client to use for the add alias request.
356+
* @param alias The alias to update.
357+
* @param currentIndex The index the alias is currently pointing to.
358+
* @param newIndex The new index the alias should point to.
359+
* @param masterNodeTimeout The timeout for waiting on the master node.
360+
* @param listener Async listener
361+
*/
326362
public static void updateWriteAlias(
327363
Client client,
328364
String alias,
@@ -357,7 +393,7 @@ public static void updateWriteAlias(
357393
/**
358394
* Installs the index template specified by {@code templateConfig} if it is not in already
359395
* installed in {@code clusterState}.
360-
*
396+
* <p>
361397
* The check for presence is simple and will return the listener on
362398
* the calling thread if successful. If the template has to be installed
363399
* an async call will be made.
@@ -427,17 +463,38 @@ public static void installIndexTemplateIfRequired(
427463
executeAsyncWithOrigin(client, ML_ORIGIN, TransportPutComposableIndexTemplateAction.TYPE, templateRequest, innerListener);
428464
}
429465

430-
public static boolean hasIndexTemplate(ClusterState state, String templateName, long version) {
466+
private static boolean hasIndexTemplate(ClusterState state, String templateName, long version) {
431467
var template = state.getMetadata().getProject().templatesV2().get(templateName);
432468
return template != null && Long.valueOf(version).equals(template.version());
433469
}
434470

471+
public static String ensureValidResultsIndexName(String indexName) {
472+
// The results index name is either the original one provided or the original with a suffix appended.
473+
return has6DigitSuffix(indexName) ? indexName : indexName + FIRST_INDEX_SIX_DIGIT_SUFFIX;
474+
}
475+
476+
/**
477+
* Checks if an index name ends with a 6-digit suffix (e.g., "-000001").
478+
*
479+
* @param indexName The name of the index to check.
480+
* @return {@code true} if the index name has a 6-digit suffix, {@code false} otherwise.
481+
*/
435482
public static boolean has6DigitSuffix(String indexName) {
436483
String[] indexParts = indexName.split("-");
437484
String suffix = indexParts[indexParts.length - 1];
438485
return HAS_SIX_DIGIT_SUFFIX.test(suffix);
439486
}
440487

488+
/**
489+
* Checks if an index name matches the pattern for the default ML anomalies indices (e.g., ".ml-anomalies-shared-000001").
490+
*
491+
* @param indexName The name of the index to check.
492+
* @return {@code true} if the index is a shared anomalies index, {@code false} otherwise.
493+
*/
494+
public static boolean isAnomaliesSharedIndex(String indexName) {
495+
return IS_ANOMALIES_SHARED_INDEX.test(indexName);
496+
}
497+
441498
/**
442499
* Returns the latest index. Latest is the index with the highest
443500
* 6 digit suffix.
@@ -456,4 +513,194 @@ public static String latestIndex(String[] concreteIndices) {
456513
public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) {
457514
return version.onOrAfter(IndexVersions.V_8_0_0);
458515
}
516+
517+
/**
518+
* Strip any suffix from the index name and find any other indices
519+
* that match the base name. Then return the latest index from the
520+
* matching ones.
521+
*
522+
* @param index The index to check
523+
* @param expressionResolver The expression resolver
524+
* @param latestState The latest cluster state
525+
* @return The latest index that matches the base name of the given index
526+
*/
527+
public static String latestIndexMatchingBaseName(
528+
String index,
529+
IndexNameExpressionResolver expressionResolver,
530+
ClusterState latestState
531+
) {
532+
String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index)
533+
? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length())
534+
: index;
535+
536+
String[] matching = expressionResolver.concreteIndexNames(
537+
latestState,
538+
IndicesOptions.lenientExpandOpenHidden(),
539+
baseIndexName + "*"
540+
);
541+
542+
// We used to assert here if no matching indices could be found. However, when called _before_ a job is created it may be the case
543+
// that no .ml-anomalies-shared* indices yet exist
544+
if (matching.length == 0) {
545+
return index;
546+
}
547+
548+
// Exclude indices that start with the same base name but are a different index
549+
// e.g. .ml-anomalies-foobar should not be included when the index name is
550+
// .ml-anomalies-foo
551+
String[] filtered = Arrays.stream(matching).filter(i -> {
552+
return i.equals(index) || (has6DigitSuffix(i) && i.length() == baseIndexName.length() + FIRST_INDEX_SIX_DIGIT_SUFFIX.length());
553+
}).toArray(String[]::new);
554+
555+
return MlIndexAndAlias.latestIndex(filtered);
556+
}
557+
558+
/**
559+
* Executes a rollover request. It handles {@link ResourceAlreadyExistsException} gracefully by treating it as a success
560+
* and returning the name of the existing index.
561+
*
562+
* @param client The client to use for the rollover request.
563+
* @param rolloverRequest The rollover request to execute.
564+
* @param listener A listener that will be notified with the name of the new (or pre-existing) index on success,
565+
* or an exception on failure.
566+
*/
567+
public static void rollover(Client client, RolloverRequest rolloverRequest, ActionListener<String> listener) {
568+
client.admin()
569+
.indices()
570+
.rolloverIndex(rolloverRequest, ActionListener.wrap(response -> listener.onResponse(response.getNewIndex()), e -> {
571+
if (e instanceof ResourceAlreadyExistsException alreadyExistsException) {
572+
// The destination index already exists possibly because it has been rolled over already.
573+
listener.onResponse(alreadyExistsException.getIndex().getName());
574+
} else {
575+
listener.onFailure(e);
576+
}
577+
}));
578+
}
579+
580+
public static Tuple<String, String> createRolloverAliasAndNewIndexName(String index) {
581+
String indexName = Objects.requireNonNull(index);
582+
583+
// Create an alias specifically for rolling over.
584+
// The ml-anomalies index has aliases for each job, any
585+
// of which could be used but that means one alias is
586+
// treated differently.
587+
// ROLLOVER_ALIAS_SUFFIX puts a `.` in the alias name to avoid any conflicts
588+
// as AD job Ids cannot start with `.`
589+
String rolloverAlias = indexName + ROLLOVER_ALIAS_SUFFIX;
590+
591+
// If the index does not end in a digit then rollover does not know
592+
// what to name the new index so it must be specified in the request.
593+
// Otherwise leave null and rollover will calculate the new name
594+
String newIndexName = MlIndexAndAlias.has6DigitSuffix(index) ? null : indexName + MlIndexAndAlias.FIRST_INDEX_SIX_DIGIT_SUFFIX;
595+
596+
return new Tuple<>(rolloverAlias, newIndexName);
597+
}
598+
599+
public static IndicesAliasesRequestBuilder createIndicesAliasesRequestBuilder(Client client) {
600+
return client.admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS);
601+
}
602+
603+
/**
604+
* Creates a hidden alias for an index, typically used as a rollover target.
605+
*
606+
* @param client The client to use for the alias request.
607+
* @param indexName The name of the index to which the alias will be added.
608+
* @param aliasName The name of the alias to create.
609+
* @param listener A listener that will be notified with the response.
610+
*/
611+
public static void createAliasForRollover(
612+
Client client,
613+
String indexName,
614+
String aliasName,
615+
ActionListener<IndicesAliasesResponse> listener
616+
) {
617+
logger.info("creating rollover [{}] alias for [{}]", aliasName, indexName);
618+
createIndicesAliasesRequestBuilder(client).addAliasAction(
619+
IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName).isHidden(true)
620+
).execute(listener);
621+
}
622+
623+
/**
624+
* Executes a prepared {@link IndicesAliasesRequestBuilder} and notifies the listener of the result.
625+
*
626+
* @param request The prepared request builder containing alias actions.
627+
* @param listener A listener that will be notified with {@code true} on success.
628+
*/
629+
public static void updateAliases(IndicesAliasesRequestBuilder request, ActionListener<Boolean> listener) {
630+
request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE)));
631+
}
632+
633+
/**
634+
* Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover.
635+
* This includes moving the write alias and re-creating the filtered read aliases on the new index.
636+
*
637+
* @param aliasRequestBuilder The request builder to add actions to.
638+
* @param oldIndex The index from which aliases are being moved.
639+
* @param newIndex The new index to which aliases will be moved.
640+
* @param clusterState The current cluster state, used to inspect existing aliases on the old index.
641+
* @return The modified {@link IndicesAliasesRequestBuilder}.
642+
*/
643+
public static IndicesAliasesRequestBuilder addIndexAliasesRequests(
644+
IndicesAliasesRequestBuilder aliasRequestBuilder,
645+
String oldIndex,
646+
String newIndex,
647+
ClusterState clusterState
648+
) {
649+
// Multiple jobs can share the same index each job
650+
// has a read and write alias that needs updating
651+
// after the rollover
652+
var meta = clusterState.metadata().getProject().index(oldIndex);
653+
assert meta != null;
654+
if (meta == null) {
655+
return aliasRequestBuilder;
656+
}
657+
658+
for (var alias : meta.getAliases().values()) {
659+
if (isAnomaliesWriteAlias(alias.alias())) {
660+
aliasRequestBuilder.addAliasAction(
661+
IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true)
662+
);
663+
aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias()));
664+
} else if (isAnomaliesReadAlias(alias.alias())) {
665+
String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias());
666+
aliasRequestBuilder.addAliasAction(
667+
IndicesAliasesRequest.AliasActions.add()
668+
.index(newIndex)
669+
.alias(alias.alias())
670+
.isHidden(true)
671+
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
672+
);
673+
}
674+
}
675+
676+
return aliasRequestBuilder;
677+
}
678+
679+
/**
680+
* Determines if an alias name is an ML anomalies write alias.
681+
*
682+
* @param aliasName The alias name to check.
683+
* @return {@code true} if the name matches the write alias pattern, {@code false} otherwise.
684+
*/
685+
public static boolean isAnomaliesWriteAlias(String aliasName) {
686+
return aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_WRITE_PREFIX);
687+
}
688+
689+
/**
690+
* Determines if an alias name is an ML anomalies read alias.
691+
*
692+
* @param aliasName The alias name to check.
693+
* @return {@code true} if the name matches the read alias pattern, {@code false} otherwise.
694+
*/
695+
public static boolean isAnomaliesReadAlias(String aliasName) {
696+
if (aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX) == false) {
697+
return false;
698+
}
699+
700+
// See {@link AnomalyDetectorsIndex#jobResultsAliasedName}
701+
String jobIdPart = aliasName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
702+
// If this is a write alias it will start with a `.` character
703+
// which is not a valid job id.
704+
return MlStrings.isValidId(jobIdPart);
705+
}
459706
}

0 commit comments

Comments
 (0)