Skip to content

Commit 8769513

Browse files
davidkyleelasticsearchmachine
andauthored
[8.x] [ML] Automatically rollover legacy ml indices (elastic#120405) (elastic#120699)
* [ML] Automatically rollover legacy ml indices (elastic#120405) Rollover ml indices created in 7.x and create new indices that version 9 can read and write to. This is required for ml to continue to run after during upgrade and reindex of 7.x indices * fix for backport * annotations index can be ignored * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent a491383 commit 8769513

File tree

12 files changed

+560
-21
lines changed

12 files changed

+560
-21
lines changed

docs/changelog/120405.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120405
2+
summary: Automatically rollover legacy ml indices
3+
area: Machine Learning
4+
type: upgrade
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class AnnotationIndex {
4949

5050
// Exposed for testing, but always use the aliases in non-test code.
5151
public static final String LATEST_INDEX_NAME = ".ml-annotations-000001";
52+
public static final String INDEX_PATTERN = ".ml-annotations-*";
5253
// Due to historical bugs this index may not have the correct mappings
5354
// in some production clusters. Therefore new annotations should be
5455
// written to the latest index. If we ever switch to another new annotations

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public final class MlIndexAndAlias {
6565

6666
private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);
6767

68-
// Visible for testing
6968
static final Comparator<String> INDEX_NAME_COMPARATOR = new Comparator<>() {
7069

7170
private final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
@@ -169,7 +168,7 @@ public static void createIndexAndAliasIfNecessary(
169168
} else {
170169
if (indexPointedByCurrentWriteAlias.isEmpty()) {
171170
assert concreteIndexNames.length > 0;
172-
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
171+
String latestConcreteIndexName = latestIndex(concreteIndexNames);
173172
updateWriteAlias(client, alias, null, latestConcreteIndexName, loggingListener);
174173
return;
175174
}
@@ -274,18 +273,22 @@ private static void createFirstConcreteIndex(
274273
);
275274
}
276275

277-
private static void updateWriteAlias(
276+
public static void updateWriteAlias(
278277
Client client,
279278
String alias,
280279
@Nullable String currentIndex,
281280
String newIndex,
282281
ActionListener<Boolean> listener
283282
) {
284-
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
283+
if (currentIndex != null) {
284+
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
285+
} else {
286+
logger.info("About to create write alias [{}] for index [{}]", alias, newIndex);
287+
}
285288
IndicesAliasesRequestBuilder requestBuilder = client.admin()
286289
.indices()
287290
.prepareAliases()
288-
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true));
291+
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true).writeIndex(true));
289292
if (currentIndex != null) {
290293
requestBuilder.removeAlias(currentIndex, alias);
291294
}
@@ -375,4 +378,16 @@ public static void installIndexTemplateIfRequired(
375378
public static boolean hasIndexTemplate(ClusterState state, String templateName) {
376379
return state.getMetadata().templatesV2().containsKey(templateName);
377380
}
381+
382+
/**
383+
* Returns the latest index. Latest is the index with the highest
384+
* 6 digit suffix.
385+
* @param concreteIndices List of index names
386+
* @return The latest index by index name version suffix
387+
*/
388+
public static String latestIndex(String[] concreteIndices) {
389+
return concreteIndices.length == 1
390+
? concreteIndices[0]
391+
: Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get();
392+
}
378393
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPo
297297
assertThat(
298298
indicesAliasesRequest.getAliasActions(),
299299
contains(
300-
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true),
300+
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true).writeIndex(true),
301301
AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX)
302302
)
303303
);
@@ -317,7 +317,7 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> e
317317
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
318318
assertThat(
319319
indicesAliasesRequest.getAliasActions(),
320-
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true))
320+
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true).writeIndex(true))
321321
);
322322
}
323323

@@ -363,6 +363,11 @@ public void testIndexNameComparator() {
363363
assertThat(Stream.of(".a-000002", ".b-000001").max(comparator).get(), equalTo(".a-000002"));
364364
}
365365

366+
public void testLatestIndex() {
367+
var names = new String[] { "index-000001", "index-000002", "index-000003" };
368+
assertThat(MlIndexAndAlias.latestIndex(names), equalTo("index-000003"));
369+
}
370+
366371
private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
367372
MlIndexAndAlias.createIndexAndAliasIfNecessary(
368373
client,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,25 @@ public Collection<?> createComponents(PluginServices services) {
12221222

12231223
MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(
12241224
threadPool,
1225-
List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver))
1225+
List.of(
1226+
new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver),
1227+
new MlIndexRollover(
1228+
List.of(
1229+
new MlIndexRollover.IndexPatternAndAlias(
1230+
AnomalyDetectorsIndex.jobStateIndexPattern(),
1231+
AnomalyDetectorsIndex.jobStateIndexWriteAlias()
1232+
),
1233+
new MlIndexRollover.IndexPatternAndAlias(MlStatsIndex.indexPattern(), MlStatsIndex.writeAlias())
1234+
// .ml-annotations-000001 added in v8.0, no need to update
1235+
// TODO notifications = https://github.com/elastic/elasticsearch/pull/120064
1236+
// TODO anomaly results
1237+
// TODO .ml-inference-XXXXXX - requires alias
1238+
// TODO .ml-inference-native-XXXXXX - requires alias (index added in 8.0)
1239+
),
1240+
indexNameExpressionResolver,
1241+
client
1242+
)
1243+
)
12261244
);
12271245
clusterService.addListener(mlAutoUpdateService);
12281246
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
@@ -2028,6 +2046,9 @@ public void indicesMigrationComplete(
20282046
new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"),
20292047
new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"),
20302048
new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices")
2049+
// TODO should the inference indices be included here?
2050+
// new AssociatedIndexDescriptor(".ml-inference-*", "ML Data Frame Analytics")
2051+
// new AssociatedIndexDescriptor(".ml-inference-native*", "ML indices for trained models")
20312052
);
20322053

20332054
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public interface UpdateAction {
3030

3131
String getName();
3232

33-
void runUpdate();
33+
void runUpdate(ClusterState latestState);
3434
}
3535

3636
private final List<UpdateAction> updateActions;
@@ -47,27 +47,34 @@ public MlAutoUpdateService(ThreadPool threadPool, List<UpdateAction> updateActio
4747

4848
@Override
4949
public void clusterChanged(ClusterChangedEvent event) {
50-
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
50+
if (event.localNodeMaster() == false) {
5151
return;
5252
}
53-
if (event.localNodeMaster() == false) {
53+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
5454
return;
5555
}
5656

57-
TransportVersion minTransportVersion = event.state().getMinTransportVersion();
57+
if (completedUpdates.size() == updateActions.size()) {
58+
return; // all work complete
59+
}
60+
61+
final var latestState = event.state();
62+
TransportVersion minTransportVersion = latestState.getMinTransportVersion();
5863
final List<UpdateAction> toRun = updateActions.stream()
5964
.filter(action -> action.isMinTransportVersionSupported(minTransportVersion))
6065
.filter(action -> completedUpdates.contains(action.getName()) == false)
61-
.filter(action -> action.isAbleToRun(event.state()))
66+
.filter(action -> action.isAbleToRun(latestState))
6267
.filter(action -> currentlyUpdating.add(action.getName()))
6368
.toList();
64-
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> toRun.forEach(this::runUpdate));
69+
// TODO run updates serially
70+
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
71+
.execute(() -> toRun.forEach((action) -> this.runUpdate(action, latestState)));
6572
}
6673

67-
private void runUpdate(UpdateAction action) {
74+
private void runUpdate(UpdateAction action, ClusterState latestState) {
6875
try {
6976
logger.debug(() -> "[" + action.getName() + "] starting executing update action");
70-
action.runUpdate();
77+
action.runUpdate(latestState);
7178
this.completedUpdates.add(action.getName());
7279
logger.debug(() -> "[" + action.getName() + "] succeeded executing update action");
7380
} catch (Exception ex) {
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ElasticsearchStatusException;
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.TransportVersions;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.action.support.PlainActionFuture;
18+
import org.elasticsearch.action.support.SubscribableListener;
19+
import org.elasticsearch.client.internal.Client;
20+
import org.elasticsearch.client.internal.OriginSettingClient;
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
24+
import org.elasticsearch.index.IndexVersions;
25+
import org.elasticsearch.logging.LogManager;
26+
import org.elasticsearch.logging.Logger;
27+
import org.elasticsearch.rest.RestStatus;
28+
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
34+
35+
/**
36+
* If any of the indices listed in {@code indicesToRollover} are legacy indices
37+
* then call rollover to produce a new index with the current version. If the
38+
* index does not have an alias the alias is created first.
39+
* If none of the {@code indicesToRollover} exist or they are all non-legacy
40+
* indices then nothing will be updated.
41+
*/
42+
public class MlIndexRollover implements MlAutoUpdateService.UpdateAction {
43+
44+
private static final Logger logger = LogManager.getLogger(MlIndexRollover.class);
45+
46+
public record IndexPatternAndAlias(String indexPattern, String alias) {}
47+
48+
private final IndexNameExpressionResolver expressionResolver;
49+
private final OriginSettingClient client;
50+
private final List<IndexPatternAndAlias> indicesToRollover;
51+
52+
public MlIndexRollover(List<IndexPatternAndAlias> indicesToRollover, IndexNameExpressionResolver expressionResolver, Client client) {
53+
this.expressionResolver = expressionResolver;
54+
this.client = new OriginSettingClient(client, ML_ORIGIN);
55+
this.indicesToRollover = indicesToRollover;
56+
}
57+
58+
@Override
59+
public boolean isMinTransportVersionSupported(TransportVersion minTransportVersion) {
60+
// Wait for all nodes to be upgraded to ensure that the
61+
// newly created index will be of the latest version.
62+
return minTransportVersion.onOrAfter(TransportVersions.ML_ROLLOVER_LEGACY_INDICES);
63+
}
64+
65+
@Override
66+
public boolean isAbleToRun(ClusterState latestState) {
67+
for (var indexPatternAndAlias : indicesToRollover) {
68+
String[] indices = expressionResolver.concreteIndexNames(
69+
latestState,
70+
IndicesOptions.lenientExpandOpenHidden(),
71+
indexPatternAndAlias.indexPattern
72+
);
73+
if (indices.length == 0) {
74+
// The index does not exist but the MlAutoUpdateService will
75+
// need to run this action and mark it as done.
76+
// Ignore the missing index and continue the loop
77+
continue;
78+
}
79+
80+
String latestIndex = MlIndexAndAlias.latestIndex(indices);
81+
IndexRoutingTable routingTable = latestState.getRoutingTable().index(latestIndex);
82+
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
83+
return false;
84+
}
85+
}
86+
87+
return true;
88+
}
89+
90+
@Override
91+
public String getName() {
92+
return "ml_legacy_index_rollover";
93+
}
94+
95+
@Override
96+
public void runUpdate(ClusterState latestState) {
97+
List<Exception> failures = new ArrayList<>();
98+
99+
for (var indexPatternAndAlias : indicesToRollover) {
100+
PlainActionFuture<Boolean> rolloverIndices = new PlainActionFuture<>();
101+
rolloverLegacyIndices(latestState, indexPatternAndAlias.indexPattern(), indexPatternAndAlias.alias(), rolloverIndices);
102+
try {
103+
rolloverIndices.actionGet();
104+
} catch (Exception ex) {
105+
logger.warn(() -> "failed rolling over legacy index [" + indexPatternAndAlias.indexPattern() + "]", ex);
106+
if (ex instanceof ElasticsearchException elasticsearchException) {
107+
failures.add(
108+
new ElasticsearchStatusException("Failed rollover", elasticsearchException.status(), elasticsearchException)
109+
);
110+
} else {
111+
failures.add(new ElasticsearchStatusException("Failed rollover", RestStatus.REQUEST_TIMEOUT, ex));
112+
}
113+
114+
break;
115+
}
116+
}
117+
118+
if (failures.isEmpty()) {
119+
logger.info("ML legacy indies rolled over");
120+
return;
121+
}
122+
123+
ElasticsearchException exception = new ElasticsearchException("some error");
124+
failures.forEach(exception::addSuppressed);
125+
throw exception;
126+
}
127+
128+
private void rolloverLegacyIndices(ClusterState clusterState, String indexPattern, String alias, ActionListener<Boolean> listener) {
129+
var concreteIndices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED, indexPattern);
130+
131+
if (concreteIndices.length == 0) {
132+
// no matching indices
133+
listener.onResponse(Boolean.FALSE);
134+
return;
135+
}
136+
137+
String latestIndex = MlIndexAndAlias.latestIndex(concreteIndices);
138+
// Indices created before 8.0 are read only in 9
139+
boolean isCompatibleIndexVersion = clusterState.metadata().index(latestIndex).getCreationVersion().onOrAfter(IndexVersions.V_8_0_0);
140+
boolean hasAlias = clusterState.getMetadata().hasAlias(alias);
141+
142+
if (isCompatibleIndexVersion && hasAlias) {
143+
// v8 index with alias, no action required
144+
listener.onResponse(Boolean.FALSE);
145+
return;
146+
}
147+
148+
SubscribableListener.<Boolean>newForked(l -> {
149+
if (hasAlias == false) {
150+
MlIndexAndAlias.updateWriteAlias(client, alias, null, latestIndex, l);
151+
} else {
152+
l.onResponse(Boolean.TRUE);
153+
}
154+
}).<Boolean>andThen((l, success) -> {
155+
if (isCompatibleIndexVersion == false) {
156+
logger.info("rolling over legacy index [{}] with alias [{}]", latestIndex, alias);
157+
rollover(alias, l);
158+
} else {
159+
l.onResponse(Boolean.TRUE);
160+
}
161+
}).addListener(listener);
162+
}
163+
164+
private void rollover(String alias, ActionListener<Boolean> listener) {
165+
client.admin().indices().rolloverIndex(new RolloverRequest(alias, null), listener.delegateFailure((l, response) -> {
166+
l.onResponse(Boolean.TRUE);
167+
}));
168+
}
169+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public String getName() {
7777
}
7878

7979
@Override
80-
public void runUpdate() {
80+
public void runUpdate(ClusterState latestState) {
8181
PlainActionFuture<List<DatafeedConfig.Builder>> getdatafeeds = new PlainActionFuture<>();
8282
provider.expandDatafeedConfigs("_all", true, null, getdatafeeds);
8383
List<DatafeedConfig.Builder> datafeedConfigBuilders = getdatafeeds.actionGet();

0 commit comments

Comments
 (0)