Skip to content

Commit 9ff0b94

Browse files
committed
[ML] 8Manage rollover of AD results indices
Add a rollover check for the AD results indices to the nightly ML maintenance task. The concrete AD reults indices now have a six digit suffix. This is necessary to keep track of rollover behaviour and to determine which index is the "latest" in the series. WIP
1 parent 3f56dbc commit 9ff0b94

File tree

26 files changed

+606
-209
lines changed

26 files changed

+606
-209
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesRequestBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.client.internal.ElasticsearchClient;
1515
import org.elasticsearch.core.TimeValue;
1616
import org.elasticsearch.index.query.QueryBuilder;
17+
import org.elasticsearch.logging.LogManager;
1718

1819
import java.util.Map;
1920

@@ -142,6 +143,7 @@ public IndicesAliasesRequestBuilder addAlias(String index, String alias, boolean
142143
* @param alias The alias
143144
*/
144145
public IndicesAliasesRequestBuilder removeAlias(String index, String alias) {
146+
LogManager.getLogger(IndicesAliasesRequestBuilder.class).info("removing alias [{}] from index [{}]", alias, index);
145147
request.addAliasAction(AliasActions.remove().index(index).alias(alias));
146148
return this;
147149
}

server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ protected void masterOperation(
247247
break;
248248
case REMOVE:
249249
for (String alias : concreteAliases(action, projectMetadata, index.getName())) {
250+
logger.warn("Adding alias [{}] for index [{}] to remove list", alias, index.getName());
250251
finalActions.add(new AliasAction.Remove(index.getName(), alias, action.mustExist()));
251252
numAliasesRemoved++;
252253
}

x-pack/plugin/core/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
requires org.apache.httpcomponents.client5.httpclient5;
2727
requires org.apache.httpcomponents.core5.httpcore5;
2828
requires org.slf4j;
29+
requires org.elasticsearch.logging;
2930

3031
exports org.elasticsearch.index.engine.frozen;
3132
exports org.elasticsearch.license;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
*/
77
package org.elasticsearch.xpack.core.ml.job.config;
88

9+
import org.apache.lucene.util.SetOnce;
910
import org.elasticsearch.ResourceAlreadyExistsException;
1011
import org.elasticsearch.action.support.IndicesOptions;
12+
import org.elasticsearch.cluster.ClusterState;
1113
import org.elasticsearch.cluster.SimpleDiffable;
14+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1215
import org.elasticsearch.common.Strings;
1316
import org.elasticsearch.common.io.stream.StreamInput;
1417
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -18,18 +21,22 @@
1821
import org.elasticsearch.common.unit.ByteSizeValue;
1922
import org.elasticsearch.core.Nullable;
2023
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.logging.LogManager;
25+
import org.elasticsearch.logging.Logger;
2126
import org.elasticsearch.xcontent.ObjectParser;
2227
import org.elasticsearch.xcontent.ObjectParser.ValueType;
2328
import org.elasticsearch.xcontent.ParseField;
2429
import org.elasticsearch.xcontent.ToXContentObject;
2530
import org.elasticsearch.xcontent.XContentBuilder;
2631
import org.elasticsearch.xpack.core.common.time.TimeUtils;
2732
import org.elasticsearch.xpack.core.ml.MlConfigVersion;
33+
import org.elasticsearch.xpack.core.ml.utils.MlAnomaliesIndexUtils;
2834
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2935
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3036
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
3137
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3238
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
39+
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
3340
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
3441
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3542

@@ -805,6 +812,8 @@ public static class Builder implements Writeable {
805812
private boolean allowLazyOpen;
806813
private Blocked blocked = Blocked.none();
807814
private DatafeedConfig.Builder datafeedConfig;
815+
private SetOnce<ClusterState> clusterState = new SetOnce<>();
816+
private SetOnce<IndexNameExpressionResolver> indexNameExpressionResolver = new SetOnce<>();
808817

809818
public Builder() {}
810819

@@ -879,6 +888,14 @@ public String getId() {
879888
return id;
880889
}
881890

891+
private void setClusterState(ClusterState state) {
892+
this.clusterState.set(state);
893+
}
894+
895+
private void setIndexNameExpressionResolver(IndexNameExpressionResolver indexNameExpressionResolver) {
896+
this.indexNameExpressionResolver.set(indexNameExpressionResolver);
897+
}
898+
882899
public void setJobVersion(MlConfigVersion jobVersion) {
883900
this.jobVersion = jobVersion;
884901
}
@@ -1305,6 +1322,18 @@ public void validateDetectorsAreUnique() {
13051322
}
13061323
}
13071324

1325+
public Job build(
1326+
@SuppressWarnings("HiddenField") Date createTime,
1327+
ClusterState state,
1328+
IndexNameExpressionResolver indexNameExpressionResolver
1329+
) {
1330+
// setCreateTime(createTime);
1331+
// setJobVersion(MlConfigVersion.CURRENT);
1332+
setClusterState(state);
1333+
setIndexNameExpressionResolver(indexNameExpressionResolver);
1334+
return build(createTime);
1335+
}
1336+
13081337
/**
13091338
* Builds a job with the given {@code createTime} and the current version.
13101339
* This should be used when a new job is created as opposed to {@link #build()}.
@@ -1313,6 +1342,7 @@ public void validateDetectorsAreUnique() {
13131342
* @return The job
13141343
*/
13151344
public Job build(@SuppressWarnings("HiddenField") Date createTime) {
1345+
LogManager.getLogger(Job.class).debug("[ML] building job withe create time: [{}]", createTime);
13161346
setCreateTime(createTime);
13171347
setJobVersion(MlConfigVersion.CURRENT);
13181348
return build();
@@ -1342,13 +1372,40 @@ public Job build() {
13421372
// Creation time is NOT required in user input, hence validated only on build
13431373
ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
13441374

1375+
LogManager.getLogger(Job.class).warn("resultsIndexName: [{}]: ", resultsIndexName);
1376+
13451377
if (Strings.isNullOrEmpty(resultsIndexName)) {
13461378
resultsIndexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
1379+
LogManager.getLogger(Job.class).warn("Using default resultsIndexName: [{}]: ", resultsIndexName);
1380+
13471381
} else if (resultsIndexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT) == false) {
1348-
// User-defined names are prepended with "custom"
1382+
// User-defined names are prepended with "custom" and end with a 6 digit suffix
13491383
// Conditional guards against multiple prepending due to updates instead of first creation
13501384
resultsIndexName = resultsIndexName.startsWith("custom-") ? resultsIndexName : "custom-" + resultsIndexName;
13511385
}
1386+
1387+
LogManager.getLogger(Job.class).warn("Before: [{}]: ", resultsIndexName);
1388+
1389+
resultsIndexName = MlIndexAndAlias.indexNameHasSixDigitSuffix(resultsIndexName)
1390+
? resultsIndexName
1391+
: resultsIndexName + "-000001";
1392+
1393+
if (indexNameExpressionResolver.get() != null && clusterState.get() != null) {
1394+
LogManager.getLogger(Job.class).warn("Getting latest index matching base name: [{}]: ", resultsIndexName);
1395+
1396+
String tmpResultsIndexName = MlIndexAndAlias.latestIndexMatchingBaseName(
1397+
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + resultsIndexName,
1398+
indexNameExpressionResolver.get(),
1399+
clusterState.get()
1400+
);
1401+
1402+
resultsIndexName = tmpResultsIndexName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
1403+
1404+
LogManager.getLogger(Job.class).warn("OBTAINED latest index matching base name: [{}]: ", resultsIndexName);
1405+
}
1406+
1407+
LogManager.getLogger(Job.class).warn("After: [{}]: ", resultsIndexName);
1408+
13521409
if (datafeedConfig != null) {
13531410
if (datafeedConfig.getId() == null) {
13541411
datafeedConfig.setId(id);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public final class AnomalyDetectorsIndexFields {
1414
// ".write" rather than simply "write" to avoid the danger of clashing
1515
// with the read alias of a job whose name begins with "write-"
1616
public static final String RESULTS_INDEX_WRITE_PREFIX = RESULTS_INDEX_PREFIX + ".write-";
17-
public static final String RESULTS_INDEX_DEFAULT = "shared";
17+
public static final String RESULTS_INDEX_DEFAULT = "shared-000001";
1818

1919
private AnomalyDetectorsIndexFields() {}
2020
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.utils;
9+
10+
import org.elasticsearch.ResourceAlreadyExistsException;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
13+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
14+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
15+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
16+
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.index.query.QueryBuilders;
20+
import org.elasticsearch.logging.Logger;
21+
import org.elasticsearch.xpack.core.ml.job.config.Job;
22+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
23+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
24+
25+
public class MlAnomaliesIndexUtils {
26+
public static void rollover(Client client, RolloverRequest rolloverRequest, ActionListener<String> listener) {
27+
client.admin()
28+
.indices()
29+
.rolloverIndex(
30+
rolloverRequest,
31+
ActionListener.wrap(response -> listener.onResponse(response.getNewIndex()), e -> {
32+
if (e instanceof ResourceAlreadyExistsException alreadyExistsException) {
33+
// The destination index already exists possibly because it has been rolled over already.
34+
listener.onResponse(alreadyExistsException.getIndex().getName());
35+
} else {
36+
listener.onFailure(e);
37+
}
38+
})
39+
);
40+
}
41+
42+
public static void createAliasForRollover(
43+
Logger logger,
44+
Client client,
45+
String indexName,
46+
String aliasName,
47+
ActionListener<IndicesAliasesResponse> listener
48+
) {
49+
logger.warn("creating rollover [{}] alias for [{}]", aliasName, indexName);
50+
client.admin()
51+
.indices()
52+
.prepareAliases(
53+
TimeValue.THIRTY_SECONDS,
54+
TimeValue.THIRTY_SECONDS
55+
)
56+
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName).isHidden(true))
57+
.execute(listener);
58+
}
59+
60+
public static void updateAliases(IndicesAliasesRequestBuilder request, ActionListener<Boolean> listener) {
61+
request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE)));
62+
}
63+
64+
public static IndicesAliasesRequestBuilder addIndexAliasesRequests(
65+
IndicesAliasesRequestBuilder aliasRequestBuilder,
66+
String oldIndex,
67+
String newIndex,
68+
ClusterState clusterState
69+
) {
70+
// Multiple jobs can share the same index each job
71+
// has a read and write alias that needs updating
72+
// after the rollover
73+
var meta = clusterState.metadata().getProject().index(oldIndex);
74+
assert meta != null;
75+
if (meta == null) {
76+
return aliasRequestBuilder;
77+
}
78+
79+
for (var alias : meta.getAliases().values()) {
80+
if (isAnomaliesWriteAlias(alias.alias())) {
81+
aliasRequestBuilder.addAliasAction(
82+
IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true)
83+
);
84+
aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias()));
85+
} else if (isAnomaliesReadAlias(alias.alias())) {
86+
String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias());
87+
aliasRequestBuilder.addAliasAction(
88+
IndicesAliasesRequest.AliasActions.add()
89+
.index(newIndex)
90+
.alias(alias.alias())
91+
.isHidden(true)
92+
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
93+
);
94+
}
95+
}
96+
97+
return aliasRequestBuilder;
98+
}
99+
100+
public static boolean isAnomaliesWriteAlias(String aliasName) {
101+
return aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_WRITE_PREFIX);
102+
}
103+
104+
static boolean isAnomaliesReadAlias(String aliasName) {
105+
if (aliasName.startsWith(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX) == false) {
106+
return false;
107+
}
108+
109+
// See {@link AnomalyDetectorsIndex#jobResultsAliasedName}
110+
String jobIdPart = aliasName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length());
111+
// If this is a write alias it will start with a `.` character
112+
// which is not a valid job id.
113+
return MlStrings.isValidId(jobIdPart);
114+
}
115+
116+
117+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public final class MlIndexAndAlias {
6969
public static final String FIRST_INDEX_SIX_DIGIT_SUFFIX = "-000001";
7070

7171
private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);
72-
private static final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
72+
private static final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("^.*\\d{6}$").asMatchPredicate();
7373

7474
static final Comparator<String> INDEX_NAME_COMPARATOR = (index1, index2) -> {
7575
String[] index1Parts = index1.split("-");
@@ -456,4 +456,54 @@ public static String latestIndex(String[] concreteIndices) {
456456
public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) {
457457
return version.onOrAfter(IndexVersions.V_8_0_0);
458458
}
459+
460+
/**
461+
* True if the index name ends with a 6 digit suffix, e.g. 000001
462+
*/
463+
public static boolean indexNameHasSixDigitSuffix(String indexName) {
464+
boolean ret = HAS_SIX_DIGIT_SUFFIX.test(indexName);
465+
logger.warn("indexNameHasSixDigitSuffix [{}] returning [{}]", indexName, ret);
466+
return ret;
467+
}
468+
469+
/**
470+
* Strip any suffix from the index name and find any other indices
471+
* that match the base name. Then return the latest index from the
472+
* matching ones.
473+
*
474+
* @param index The index to check
475+
* @param expressionResolver The expression resolver
476+
* @param latestState The latest cluster state
477+
* @return The latest index that matches the base name of the given index
478+
*/
479+
public static String latestIndexMatchingBaseName(
480+
String index,
481+
IndexNameExpressionResolver expressionResolver,
482+
ClusterState latestState
483+
) {
484+
String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index)
485+
? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length())
486+
: index;
487+
488+
String[] matching = expressionResolver.concreteIndexNames(
489+
latestState,
490+
IndicesOptions.lenientExpandOpenHidden(),
491+
baseIndexName + "*"
492+
);
493+
494+
// This should never happen
495+
assert matching.length > 0 : "No indices matching [" + baseIndexName + "*]";
496+
if (matching.length == 0) {
497+
return index;
498+
}
499+
500+
// Exclude indices that start with the same base name but are a different index
501+
// e.g. .ml-anomalies-foobar should not be included when the index name is
502+
// .ml-anomalies-foo
503+
String[] filtered = Arrays.stream(matching).filter(i -> {
504+
return i.equals(index) || (has6DigitSuffix(i) && i.length() == baseIndexName.length() + FIRST_INDEX_SIX_DIGIT_SUFFIX.length());
505+
}).toArray(String[]::new);
506+
507+
return MlIndexAndAlias.latestIndex(filtered);
508+
}
459509
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {
4444
private final long now = System.currentTimeMillis();
4545
private static final long DAY = Duration.ofDays(1).toMillis();
4646
private final String jobId = "bulk-failure-retry-job";
47-
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";
47+
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job-000001";
4848

4949
@Before
5050
public void putPastDataIntoIndex() {

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.cluster.ClusterName;
11+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1112
import org.elasticsearch.cluster.service.ClusterService;
1213
import org.elasticsearch.core.TimeValue;
1314
import org.elasticsearch.index.IndexVersion;
@@ -54,6 +55,7 @@ public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws Int
5455
client(),
5556
mock(ClusterService.class),
5657
mock(MlAssignmentNotifier.class),
58+
mock(IndexNameExpressionResolver.class),
5759
true,
5860
true,
5961
true

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void testCreateJobsWithIndexNameOption() throws Exception {
212212
"results_index_name" : "%s"}""";
213213

214214
String jobId1 = "create-jobs-with-index-name-option-job-1";
215-
String indexName = "non-default-index";
215+
String indexName = "non-default-index-000001";
216216
putJob(jobId1, Strings.format(jobTemplate, indexName));
217217

218218
String jobId2 = "create-jobs-with-index-name-option-job-2";
@@ -406,7 +406,7 @@ public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception {
406406
// Check the index mapping contains the first by_field_name
407407
Request getResultsMappingRequest = new Request(
408408
"GET",
409-
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index/_mapping"
409+
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index-000001/_mapping"
410410
);
411411
getResultsMappingRequest.addParameter("pretty", null);
412412
String resultsMappingAfterJob1 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());

0 commit comments

Comments
 (0)