Skip to content

Commit 928040e

Browse files
authored
[ML] Automatically rollover legacy ml indices (#120405)
Rollover ml indices created in 7.x and create new indices that version 9 can read and write to. This is required for ml to continue to run after during upgrade and reindex of 7.x indices
1 parent a1fd7bc commit 928040e

File tree

11 files changed

+535
-20
lines changed

11 files changed

+535
-20
lines changed

docs/changelog/120405.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120405
2+
summary: Automatically rollover legacy ml indices
3+
area: Machine Learning
4+
type: upgrade
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ static TransportVersion def(int id) {
161161
public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0);
162162
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0);
163163
public static final TransportVersion RESOLVE_CLUSTER_NO_INDEX_EXPRESSION = def(8_829_00_0);
164+
public static final TransportVersion ML_ROLLOVER_LEGACY_INDICES = def(8_830_00_0);
164165

165166
/*
166167
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class AnnotationIndex {
4949

5050
// Exposed for testing, but always use the aliases in non-test code.
5151
public static final String LATEST_INDEX_NAME = ".ml-annotations-000001";
52+
public static final String INDEX_PATTERN = ".ml-annotations-*";
5253
// Due to historical bugs this index may not have the correct mappings
5354
// in some production clusters. Therefore new annotations should be
5455
// written to the latest index. If we ever switch to another new annotations

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ public final class MlIndexAndAlias {
6666

6767
private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);
6868

69-
// Visible for testing
7069
static final Comparator<String> INDEX_NAME_COMPARATOR = new Comparator<>() {
7170

7271
private final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
@@ -172,7 +171,7 @@ public static void createIndexAndAliasIfNecessary(
172171
} else {
173172
if (indexPointedByCurrentWriteAlias.isEmpty()) {
174173
assert concreteIndexNames.length > 0;
175-
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
174+
String latestConcreteIndexName = latestIndex(concreteIndexNames);
176175
updateWriteAlias(client, alias, null, latestConcreteIndexName, loggingListener);
177176
return;
178177
}
@@ -279,18 +278,22 @@ private static void createFirstConcreteIndex(
279278
);
280279
}
281280

282-
private static void updateWriteAlias(
281+
public static void updateWriteAlias(
283282
Client client,
284283
String alias,
285284
@Nullable String currentIndex,
286285
String newIndex,
287286
ActionListener<Boolean> listener
288287
) {
289-
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
288+
if (currentIndex != null) {
289+
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
290+
} else {
291+
logger.info("About to create write alias [{}] for index [{}]", alias, newIndex);
292+
}
290293
IndicesAliasesRequestBuilder requestBuilder = client.admin()
291294
.indices()
292295
.prepareAliases()
293-
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true));
296+
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true).writeIndex(true));
294297
if (currentIndex != null) {
295298
requestBuilder.removeAlias(currentIndex, alias);
296299
}
@@ -380,4 +383,16 @@ public static void installIndexTemplateIfRequired(
380383
public static boolean hasIndexTemplate(ClusterState state, String templateName) {
381384
return state.getMetadata().templatesV2().containsKey(templateName);
382385
}
386+
387+
/**
388+
* Returns the latest index. Latest is the index with the highest
389+
* 6 digit suffix.
390+
* @param concreteIndices List of index names
391+
* @return The latest index by index name version suffix
392+
*/
393+
public static String latestIndex(String[] concreteIndices) {
394+
return concreteIndices.length == 1
395+
? concreteIndices[0]
396+
: Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get();
397+
}
383398
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPo
298298
assertThat(
299299
indicesAliasesRequest.getAliasActions(),
300300
contains(
301-
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true),
301+
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true).writeIndex(true),
302302
AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX)
303303
)
304304
);
@@ -318,7 +318,7 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> e
318318
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
319319
assertThat(
320320
indicesAliasesRequest.getAliasActions(),
321-
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true))
321+
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true).writeIndex(true))
322322
);
323323
}
324324

@@ -364,6 +364,11 @@ public void testIndexNameComparator() {
364364
assertThat(Stream.of(".a-000002", ".b-000001").max(comparator).get(), equalTo(".a-000002"));
365365
}
366366

367+
public void testLatestIndex() {
368+
var names = new String[] { "index-000001", "index-000002", "index-000003" };
369+
assertThat(MlIndexAndAlias.latestIndex(names), equalTo("index-000003"));
370+
}
371+
367372
private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
368373
MlIndexAndAlias.createIndexAndAliasIfNecessary(
369374
client,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@
183183
import org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction;
184184
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
185185
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
186+
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
186187
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
187188
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
188189
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
@@ -1222,7 +1223,25 @@ public Collection<?> createComponents(PluginServices services) {
12221223

12231224
MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(
12241225
threadPool,
1225-
List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver))
1226+
List.of(
1227+
new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver),
1228+
new MlIndexRollover(
1229+
List.of(
1230+
new MlIndexRollover.IndexPatternAndAlias(
1231+
AnomalyDetectorsIndex.jobStateIndexPattern(),
1232+
AnomalyDetectorsIndex.jobStateIndexWriteAlias()
1233+
),
1234+
new MlIndexRollover.IndexPatternAndAlias(MlStatsIndex.indexPattern(), MlStatsIndex.writeAlias()),
1235+
new MlIndexRollover.IndexPatternAndAlias(AnnotationIndex.INDEX_PATTERN, AnnotationIndex.WRITE_ALIAS_NAME)
1236+
// TODO notifications = https://github.com/elastic/elasticsearch/pull/120064
1237+
// TODO anomaly results
1238+
// TODO .ml-inference-XXXXXX - requires alias
1239+
// TODO .ml-inference-native-XXXXXX - requires alias (index added in 8.0)
1240+
),
1241+
indexNameExpressionResolver,
1242+
client
1243+
)
1244+
)
12261245
);
12271246
clusterService.addListener(mlAutoUpdateService);
12281247
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
@@ -2025,6 +2044,9 @@ public void indicesMigrationComplete(
20252044
new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"),
20262045
new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"),
20272046
new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices")
2047+
// TODO should the inference indices be included here?
2048+
// new AssociatedIndexDescriptor(".ml-inference-*", "ML Data Frame Analytics")
2049+
// new AssociatedIndexDescriptor(".ml-inference-native*", "ML indices for trained models")
20282050
);
20292051

20302052
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public interface UpdateAction {
3030

3131
String getName();
3232

33-
void runUpdate();
33+
void runUpdate(ClusterState latestState);
3434
}
3535

3636
private final List<UpdateAction> updateActions;
@@ -47,27 +47,34 @@ public MlAutoUpdateService(ThreadPool threadPool, List<UpdateAction> updateActio
4747

4848
@Override
4949
public void clusterChanged(ClusterChangedEvent event) {
50-
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
50+
if (event.localNodeMaster() == false) {
5151
return;
5252
}
53-
if (event.localNodeMaster() == false) {
53+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
5454
return;
5555
}
5656

57-
TransportVersion minTransportVersion = event.state().getMinTransportVersion();
57+
if (completedUpdates.size() == updateActions.size()) {
58+
return; // all work complete
59+
}
60+
61+
final var latestState = event.state();
62+
TransportVersion minTransportVersion = latestState.getMinTransportVersion();
5863
final List<UpdateAction> toRun = updateActions.stream()
5964
.filter(action -> action.isMinTransportVersionSupported(minTransportVersion))
6065
.filter(action -> completedUpdates.contains(action.getName()) == false)
61-
.filter(action -> action.isAbleToRun(event.state()))
66+
.filter(action -> action.isAbleToRun(latestState))
6267
.filter(action -> currentlyUpdating.add(action.getName()))
6368
.toList();
64-
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> toRun.forEach(this::runUpdate));
69+
// TODO run updates serially
70+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
71+
.execute(() -> toRun.forEach((action) -> this.runUpdate(action, latestState)));
6572
}
6673

67-
private void runUpdate(UpdateAction action) {
74+
private void runUpdate(UpdateAction action, ClusterState latestState) {
6875
try {
6976
logger.debug(() -> "[" + action.getName() + "] starting executing update action");
70-
action.runUpdate();
77+
action.runUpdate(latestState);
7178
this.completedUpdates.add(action.getName());
7279
logger.debug(() -> "[" + action.getName() + "] succeeded executing update action");
7380
} catch (Exception ex) {
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ElasticsearchStatusException;
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.TransportVersions;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.action.support.PlainActionFuture;
18+
import org.elasticsearch.action.support.SubscribableListener;
19+
import org.elasticsearch.client.internal.Client;
20+
import org.elasticsearch.client.internal.OriginSettingClient;
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
24+
import org.elasticsearch.index.IndexVersion;
25+
import org.elasticsearch.index.IndexVersions;
26+
import org.elasticsearch.logging.LogManager;
27+
import org.elasticsearch.logging.Logger;
28+
import org.elasticsearch.rest.RestStatus;
29+
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
34+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
35+
36+
/**
37+
* If any of the indices listed in {@code indicesToRollover} are legacy indices
38+
* then call rollover to produce a new index with the current version. If the
39+
* index does not have an alias the alias is created first.
40+
* If none of the {@code indicesToRollover} exist or they are all non-legacy
41+
* indices then nothing will be updated.
42+
*/
43+
public class MlIndexRollover implements MlAutoUpdateService.UpdateAction {
44+
45+
private static final Logger logger = LogManager.getLogger(MlIndexRollover.class);
46+
47+
public record IndexPatternAndAlias(String indexPattern, String alias) {}
48+
49+
private final IndexNameExpressionResolver expressionResolver;
50+
private final OriginSettingClient client;
51+
private final List<IndexPatternAndAlias> indicesToRollover;
52+
53+
public MlIndexRollover(List<IndexPatternAndAlias> indicesToRollover, IndexNameExpressionResolver expressionResolver, Client client) {
54+
this.expressionResolver = expressionResolver;
55+
this.client = new OriginSettingClient(client, ML_ORIGIN);
56+
this.indicesToRollover = indicesToRollover;
57+
}
58+
59+
@Override
60+
public boolean isMinTransportVersionSupported(TransportVersion minTransportVersion) {
61+
// Wait for all nodes to be upgraded to ensure that the
62+
// newly created index will be of the latest version.
63+
return minTransportVersion.onOrAfter(TransportVersions.ML_ROLLOVER_LEGACY_INDICES);
64+
}
65+
66+
@Override
67+
public boolean isAbleToRun(ClusterState latestState) {
68+
for (var indexPatternAndAlias : indicesToRollover) {
69+
String[] indices = expressionResolver.concreteIndexNames(
70+
latestState,
71+
IndicesOptions.lenientExpandOpenHidden(),
72+
indexPatternAndAlias.indexPattern
73+
);
74+
if (indices.length == 0) {
75+
// The index does not exist but the MlAutoUpdateService will
76+
// need to run this action and mark it as done.
77+
// Ignore the missing index and continue the loop
78+
continue;
79+
}
80+
81+
String latestIndex = MlIndexAndAlias.latestIndex(indices);
82+
IndexRoutingTable routingTable = latestState.getRoutingTable().index(latestIndex);
83+
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
84+
return false;
85+
}
86+
}
87+
88+
return true;
89+
}
90+
91+
@Override
92+
public String getName() {
93+
return "ml_legacy_index_rollover";
94+
}
95+
96+
@Override
97+
public void runUpdate(ClusterState latestState) {
98+
List<Exception> failures = new ArrayList<>();
99+
100+
for (var indexPatternAndAlias : indicesToRollover) {
101+
PlainActionFuture<Boolean> rolloverIndices = new PlainActionFuture<>();
102+
rolloverLegacyIndices(latestState, indexPatternAndAlias.indexPattern(), indexPatternAndAlias.alias(), rolloverIndices);
103+
try {
104+
rolloverIndices.actionGet();
105+
} catch (Exception ex) {
106+
logger.warn(() -> "failed rolling over legacy index [" + indexPatternAndAlias.indexPattern() + "]", ex);
107+
if (ex instanceof ElasticsearchException elasticsearchException) {
108+
failures.add(
109+
new ElasticsearchStatusException("Failed rollover", elasticsearchException.status(), elasticsearchException)
110+
);
111+
} else {
112+
failures.add(new ElasticsearchStatusException("Failed rollover", RestStatus.REQUEST_TIMEOUT, ex));
113+
}
114+
115+
break;
116+
}
117+
}
118+
119+
if (failures.isEmpty()) {
120+
logger.info("ML legacy indies rolled over");
121+
return;
122+
}
123+
124+
ElasticsearchException exception = new ElasticsearchException("some error");
125+
failures.forEach(exception::addSuppressed);
126+
throw exception;
127+
}
128+
129+
private void rolloverLegacyIndices(ClusterState clusterState, String indexPattern, String alias, ActionListener<Boolean> listener) {
130+
var concreteIndices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED, indexPattern);
131+
132+
if (concreteIndices.length == 0) {
133+
// no matching indices
134+
listener.onResponse(Boolean.FALSE);
135+
return;
136+
}
137+
138+
String latestIndex = MlIndexAndAlias.latestIndex(concreteIndices);
139+
boolean isCompatibleIndexVersion = isCompatibleIndexVersion(clusterState.metadata().index(latestIndex).getCreationVersion());
140+
boolean hasAlias = clusterState.getMetadata().hasAlias(alias);
141+
142+
if (isCompatibleIndexVersion && hasAlias) {
143+
// v8 index with alias, no action required
144+
listener.onResponse(Boolean.FALSE);
145+
return;
146+
}
147+
148+
SubscribableListener.<Boolean>newForked(l -> {
149+
if (hasAlias == false) {
150+
MlIndexAndAlias.updateWriteAlias(client, alias, null, latestIndex, l);
151+
} else {
152+
l.onResponse(Boolean.TRUE);
153+
}
154+
}).<Boolean>andThen((l, success) -> {
155+
if (isCompatibleIndexVersion == false) {
156+
logger.info("rolling over legacy index [{}] with alias [{}]", latestIndex, alias);
157+
rollover(alias, l);
158+
} else {
159+
l.onResponse(Boolean.TRUE);
160+
}
161+
}).addListener(listener);
162+
}
163+
164+
private void rollover(String alias, ActionListener<Boolean> listener) {
165+
client.admin().indices().rolloverIndex(new RolloverRequest(alias, null), listener.delegateFailure((l, response) -> {
166+
l.onResponse(Boolean.TRUE);
167+
}));
168+
}
169+
170+
/**
171+
* True if the version is read *and* write compatible not just read only compatible
172+
*/
173+
static boolean isCompatibleIndexVersion(IndexVersion version) {
174+
return version.onOrAfter(IndexVersions.MINIMUM_COMPATIBLE);
175+
}
176+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public String getName() {
7777
}
7878

7979
@Override
80-
public void runUpdate() {
80+
public void runUpdate(ClusterState latestState) {
8181
PlainActionFuture<List<DatafeedConfig.Builder>> getdatafeeds = new PlainActionFuture<>();
8282
provider.expandDatafeedConfigs("_all", true, null, getdatafeeds);
8383
List<DatafeedConfig.Builder> datafeedConfigBuilders = getdatafeeds.actionGet();

0 commit comments

Comments
 (0)