Skip to content

Commit e95cb35

Browse files
authored
[ML] Use latest results index for new Anomaly Detection jobs (elastic#122597) (elastic#122672)
After upgrading from v7 new anomaly detection jobs should use the latest results index if one has been created.
1 parent 052e5f6 commit e95cb35

File tree

4 files changed

+330
-8
lines changed

4 files changed

+330
-8
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
126126
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
127127
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
128+
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
128129
import org.elasticsearch.xpack.core.security.support.Exceptions;
129130
import org.elasticsearch.xpack.ml.job.categorization.GrokPatternCreator;
130131
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
@@ -305,11 +306,15 @@ public void createJobResultIndex(Job job, ClusterState state, final ActionListen
305306
String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
306307
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId());
307308
String tempIndexName = job.getInitialResultsIndexName();
309+
// Find all indices starting with this name and pick the latest one
310+
String[] concreteIndices = resolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), tempIndexName + "*");
311+
if (concreteIndices.length > 0) {
312+
tempIndexName = MlIndexAndAlias.latestIndex(concreteIndices);
313+
}
308314

309315
// Our read/write aliases should point to the concrete index
310316
// If the initial index is NOT an alias, either it is already a concrete index, or it does not exist yet
311317
if (state.getMetadata().hasAlias(tempIndexName)) {
312-
String[] concreteIndices = resolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), tempIndexName);
313318

314319
// SHOULD NOT be closed as in typical call flow checkForLeftOverDocuments already verified this
315320
// if it is closed, we bailout and return an error
@@ -323,8 +328,8 @@ public void createJobResultIndex(Job job, ClusterState state, final ActionListen
323328
);
324329
return;
325330
}
326-
tempIndexName = concreteIndices[0];
327331
}
332+
328333
final String indexName = tempIndexName;
329334

330335
ActionListener<Boolean> indexAndMappingsListener = ActionListener.wrap(success -> {

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private Response buildAndPutJob(String jobId, TimeValue bucketSpan) throws Excep
279279
return client().performRequest(request);
280280
}
281281

282-
private static List<String> generateData(
282+
static List<String> generateData(
283283
long timestamp,
284284
TimeValue bucketSpan,
285285
int bucketCount,
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.upgrades;
9+
10+
import org.apache.http.util.EntityUtils;
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.RequestOptions;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.WarningsHandler;
16+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
17+
import org.elasticsearch.core.Strings;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.xpack.test.rest.XPackRestTestConstants;
20+
import org.junit.BeforeClass;
21+
22+
import java.io.IOException;
23+
import java.time.Instant;
24+
import java.time.temporal.ChronoUnit;
25+
import java.util.Collection;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
31+
32+
import static org.elasticsearch.upgrades.MlJobSnapshotUpgradeIT.generateData;
33+
import static org.hamcrest.Matchers.anyOf;
34+
import static org.hamcrest.Matchers.containsInAnyOrder;
35+
import static org.hamcrest.Matchers.containsString;
36+
import static org.hamcrest.Matchers.greaterThan;
37+
import static org.hamcrest.Matchers.hasEntry;
38+
import static org.hamcrest.Matchers.hasSize;
39+
import static org.hamcrest.Matchers.is;
40+
import static org.hamcrest.Matchers.nullValue;
41+
42+
public class MlRolloverLegacyIndicesIT extends AbstractUpgradeTestCase {
43+
44+
private static final String JOB_ID = "ml-rollover-legacy-job";
45+
private static final String CUSTOM_INDEX_JOB_ID = "ml-rollover-legacy-custom-job";
46+
private static final String CUSTOM_RESULTS_INDEX_NAME = "dedicated-results-index";
47+
private static final String UPGRADED_CLUSTER_JOB_ID = "ml-rollover-upgraded-job";
48+
private static final String UPGRADED_CUSTOM_INDEX_CLUSTER_JOB_ID = "ml-rollover-upgraded-custom-job";
49+
private static final int NUM_BUCKETS = 10;
50+
51+
@BeforeClass
52+
public static void maybeSkip() {
53+
assumeFalse("Skip ML tests on unsupported glibc versions", SKIP_ML_TESTS);
54+
}
55+
56+
@Override
57+
protected Collection<String> templatesToWaitFor() {
58+
// We shouldn't wait for ML templates during the upgrade - production won't
59+
if (CLUSTER_TYPE != ClusterType.OLD) {
60+
return super.templatesToWaitFor();
61+
}
62+
return Stream.concat(XPackRestTestConstants.ML_POST_V7120_TEMPLATES.stream(), super.templatesToWaitFor().stream())
63+
.collect(Collectors.toSet());
64+
}
65+
66+
/**
67+
* Test rolling over v7 legacy indices and that the results index aliases are
68+
* updated to point to the new indices. The test covers both the shared and
69+
* custom results indices.
70+
*/
71+
public void testRolloverLegacyIndices() throws Exception {
72+
73+
switch (CLUSTER_TYPE) {
74+
case OLD:
75+
createAndRunJob(JOB_ID, false);
76+
createAndRunJob(CUSTOM_INDEX_JOB_ID, true);
77+
break;
78+
case MIXED:
79+
break;
80+
case UPGRADED:
81+
assertLegacyIndicesRollover();
82+
assertAnomalyIndicesRollover();
83+
assertNotificationsIndexAliasCreated();
84+
createAndRunJob(UPGRADED_CLUSTER_JOB_ID, false);
85+
closeJob(UPGRADED_CLUSTER_JOB_ID);
86+
createAndRunJob(UPGRADED_CUSTOM_INDEX_CLUSTER_JOB_ID, true);
87+
closeJob(UPGRADED_CUSTOM_INDEX_CLUSTER_JOB_ID);
88+
assertResultsInNewIndex(false);
89+
assertResultsInNewIndex(true);
90+
break;
91+
default:
92+
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
93+
}
94+
}
95+
96+
private void createAndRunJob(String jobId, boolean useCustomIndex) throws IOException {
97+
var resultsIndex = useCustomIndex ? "\"results_index_name\": \"" + CUSTOM_RESULTS_INDEX_NAME + "\"," : "";
98+
99+
String jobConfig = Strings.format("""
100+
{
101+
%s
102+
"analysis_config" : {
103+
"bucket_span": "600s",
104+
"detectors" :[{"function":"metric","field_name":"value","partition_field_name":"series"}]
105+
},
106+
"data_description" : {
107+
}
108+
}"
109+
""", resultsIndex);
110+
111+
Request putJob = new Request("PUT", "_ml/anomaly_detectors/" + jobId);
112+
putJob.setJsonEntity(jobConfig);
113+
Response response = client().performRequest(putJob);
114+
assertEquals(200, response.getStatusLine().getStatusCode());
115+
116+
Request openJob = new Request("POST", "_ml/anomaly_detectors/" + jobId + "/_open");
117+
response = client().performRequest(openJob);
118+
assertEquals(200, response.getStatusLine().getStatusCode());
119+
120+
TimeValue bucketSpan = TimeValue.timeValueMinutes(10);
121+
long startTime = Instant.now().minus(24L, ChronoUnit.HOURS).toEpochMilli();
122+
123+
var dataCounts = entityAsMap(
124+
postData(
125+
jobId,
126+
String.join(
127+
"",
128+
generateData(
129+
startTime,
130+
bucketSpan,
131+
NUM_BUCKETS,
132+
Collections.singletonList("foo"),
133+
(bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0
134+
)
135+
)
136+
)
137+
);
138+
assertThat(dataCounts.toString(), (Integer) dataCounts.get("bucket_count"), greaterThan(0));
139+
flushJob(jobId);
140+
}
141+
142+
protected Response postData(String jobId, String data) throws IOException {
143+
// Post data is deprecated, so a deprecation warning is possible (depending on the old version)
144+
RequestOptions postDataOptions = RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> {
145+
if (warnings.isEmpty()) {
146+
// No warning is OK - it means we hit an old node where post data is not deprecated
147+
return false;
148+
} else if (warnings.size() > 1) {
149+
return true;
150+
}
151+
return warnings.get(0)
152+
.equals(
153+
"Posting data directly to anomaly detection jobs is deprecated, "
154+
+ "in a future major version it will be compulsory to use a datafeed"
155+
) == false;
156+
}).build();
157+
158+
Request postDataRequest = new Request("POST", "/_ml/anomaly_detectors/" + jobId + "/_data");
159+
// Post data is deprecated, so expect a deprecation warning
160+
postDataRequest.setOptions(postDataOptions);
161+
postDataRequest.setJsonEntity(data);
162+
return client().performRequest(postDataRequest);
163+
}
164+
165+
protected void flushJob(String jobId) throws IOException {
166+
client().performRequest(new Request("POST", "/_ml/anomaly_detectors/" + jobId + "/_flush"));
167+
}
168+
169+
private void closeJob(String jobId) throws IOException {
170+
Response closeResponse = client().performRequest(new Request("POST", "/_ml/anomaly_detectors/" + jobId + "/_close"));
171+
assertThat(entityAsMap(closeResponse), hasEntry("closed", true));
172+
}
173+
174+
@SuppressWarnings("unchecked")
175+
private void assertLegacyIndicesRollover() throws Exception {
176+
if (isOriginalClusterVersionAtLeast(Version.V_8_0_0)) {
177+
// not a legacy index
178+
return;
179+
}
180+
181+
assertBusy(() -> {
182+
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
183+
builder.setWarningsHandler(WarningsHandler.PERMISSIVE); // ignore warnings about accessing system index
184+
Request getIndices = new Request("GET", ".ml*");
185+
getIndices.setOptions(builder);
186+
Response getIndicesResponse = client().performRequest(getIndices);
187+
assertOK(getIndicesResponse);
188+
var asString = EntityUtils.toString(getIndicesResponse.getEntity());
189+
// legacy -000001 index is rolled over creating -000002
190+
assertThat(asString, containsString(".ml-state-000002"));
191+
192+
Request getAliases = new Request("GET", "_alias/.ml*");
193+
getAliases.setOptions(builder);
194+
Response getAliasesResponse = client().performRequest(getAliases);
195+
196+
// Check the write alias points to the new index
197+
Map<String, Object> aliasesMap = entityAsMap(getAliasesResponse);
198+
var stateAlias = (Map<String, Object>) aliasesMap.get(".ml-state-000002");
199+
assertNotNull(stateAlias);
200+
var isHidden = XContentMapValues.extractValue(stateAlias, "aliases", ".ml-state-write", "is_hidden");
201+
assertEquals(Boolean.TRUE, isHidden);
202+
});
203+
}
204+
205+
@SuppressWarnings("unchecked")
206+
private void assertAnomalyIndicesRollover() throws Exception {
207+
if (isOriginalClusterVersionAtLeast(Version.V_8_0_0)) {
208+
// not a legacy index
209+
return;
210+
}
211+
212+
assertBusy(() -> {
213+
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
214+
builder.setWarningsHandler(WarningsHandler.PERMISSIVE); // ignore warnings about accessing system index
215+
Request getIndices = new Request("GET", ".ml-anomalies*");
216+
getIndices.setOptions(builder);
217+
Response getIndicesResponse = client().performRequest(getIndices);
218+
assertOK(getIndicesResponse);
219+
var asString = EntityUtils.toString(getIndicesResponse.getEntity());
220+
assertThat(asString, containsString(".ml-anomalies-custom-" + CUSTOM_RESULTS_INDEX_NAME));
221+
assertThat(asString, containsString(".ml-anomalies-custom-" + CUSTOM_RESULTS_INDEX_NAME + "-000001"));
222+
assertThat(asString, containsString(".ml-anomalies-shared"));
223+
assertThat(asString, containsString(".ml-anomalies-shared-000001"));
224+
225+
Request getAliases = new Request("GET", "_alias/.ml*");
226+
getAliases.setOptions(builder);
227+
Response getAliasesResponse = client().performRequest(getAliases);
228+
229+
// Check the write alias points to the new index
230+
Map<String, Object> aliasesResponseMap = entityAsMap(getAliasesResponse);
231+
232+
String expectedReadAlias = ".ml-anomalies-" + CUSTOM_INDEX_JOB_ID;
233+
String expectedWriteAlias = ".ml-anomalies-.write-" + CUSTOM_INDEX_JOB_ID;
234+
235+
{
236+
var rolledCustomResultsIndex = (Map<String, Object>) aliasesResponseMap.get(
237+
".ml-anomalies-custom-" + CUSTOM_RESULTS_INDEX_NAME + "-000001"
238+
);
239+
assertNotNull(aliasesResponseMap.toString(), rolledCustomResultsIndex);
240+
241+
var aliases = (Map<String, Object>) rolledCustomResultsIndex.get("aliases");
242+
assertThat(aliasesResponseMap.toString(), aliases.entrySet(), hasSize(2));
243+
assertThat(aliasesResponseMap.toString(), aliases.keySet(), containsInAnyOrder(expectedReadAlias, expectedWriteAlias));
244+
245+
// Read alias
246+
var isHidden = XContentMapValues.extractValue(rolledCustomResultsIndex, "aliases", expectedReadAlias, "is_hidden");
247+
assertEquals(Boolean.TRUE, isHidden);
248+
var isWrite = XContentMapValues.extractValue(rolledCustomResultsIndex, "aliases", expectedReadAlias, "is_write_index");
249+
assertNull(isWrite); // not a write index
250+
var filter = XContentMapValues.extractValue(rolledCustomResultsIndex, "aliases", expectedReadAlias, "filter");
251+
assertNotNull(filter);
252+
253+
// Write alias
254+
isHidden = XContentMapValues.extractValue(rolledCustomResultsIndex, "aliases", expectedWriteAlias, "is_hidden");
255+
assertEquals(Boolean.TRUE, isHidden);
256+
isWrite = XContentMapValues.extractValue(rolledCustomResultsIndex, "aliases", expectedWriteAlias, "is_write_index");
257+
assertEquals(Boolean.TRUE, isWrite);
258+
filter = XContentMapValues.extractValue(rolledCustomResultsIndex, "aliases", expectedReadAlias, "filter");
259+
assertNotNull(filter);
260+
}
261+
262+
{
263+
var olcustomResultsIndex = (Map<String, Object>) aliasesResponseMap.get(
264+
".ml-anomalies-custom-" + CUSTOM_RESULTS_INDEX_NAME
265+
);
266+
assertNotNull(aliasesResponseMap.toString(), olcustomResultsIndex);
267+
var aliases = (Map<String, Object>) olcustomResultsIndex.get("aliases");
268+
assertThat(aliasesResponseMap.toString(), aliases.entrySet(), hasSize(1));
269+
assertThat(aliasesResponseMap.toString(), aliases.keySet(), containsInAnyOrder(expectedReadAlias));
270+
271+
// Read alias
272+
var isHidden = XContentMapValues.extractValue(olcustomResultsIndex, "aliases", expectedReadAlias, "is_hidden");
273+
assertEquals(Boolean.TRUE, isHidden);
274+
var isWrite = XContentMapValues.extractValue(olcustomResultsIndex, "aliases", expectedReadAlias, "is_write_index");
275+
assertNull(isWrite); // not a write index
276+
var filter = XContentMapValues.extractValue(olcustomResultsIndex, "aliases", expectedReadAlias, "filter");
277+
assertNotNull(filter);
278+
}
279+
});
280+
}
281+
282+
@SuppressWarnings("unchecked")
283+
public void assertResultsInNewIndex(boolean checkCustomIndex) throws Exception {
284+
if (isOriginalClusterVersionAtLeast(Version.V_8_0_0)) {
285+
// not a legacy index
286+
return;
287+
}
288+
289+
var searchUrl = checkCustomIndex
290+
? ".ml-anomalies-custom-" + CUSTOM_RESULTS_INDEX_NAME + "-000001/_search"
291+
: ".ml-anomalies-shared-000001/_search";
292+
293+
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
294+
builder.setWarningsHandler(WarningsHandler.PERMISSIVE); // ignore warnings about accessing hidden index
295+
Request getIndices = new Request("GET", searchUrl);
296+
getIndices.setOptions(builder);
297+
Response searchResponse = client().performRequest(getIndices);
298+
assertOK(searchResponse);
299+
300+
final Map<String, Object> responseMap = responseAsMap(searchResponse);
301+
Map<String, Object> hits = ((Map<String, Object>) responseMap.get("hits"));
302+
assertEquals(responseMap.toString(), NUM_BUCKETS, ((List<Object>) hits.get("hits")).size());
303+
}
304+
305+
@SuppressWarnings("unchecked")
306+
private void assertNotificationsIndexAliasCreated() throws Exception {
307+
assertBusy(() -> {
308+
Request getMappings = new Request("GET", "_alias/.ml-notifications-write");
309+
Response response = client().performRequest(getMappings);
310+
Map<String, Object> responseMap = entityAsMap(response);
311+
assertThat(responseMap.entrySet(), hasSize(1));
312+
var aliases = (Map<String, Object>) responseMap.get(".ml-notifications-000002");
313+
assertThat(aliases.entrySet(), hasSize(1));
314+
var allAliases = (Map<String, Object>) aliases.get("aliases");
315+
var writeAlias = (Map<String, Object>) allAliases.get(".ml-notifications-write");
316+
317+
assertThat(writeAlias, hasEntry("is_hidden", Boolean.TRUE));
318+
var isWriteIndex = (Boolean) writeAlias.get("is_write_index");
319+
assertThat(isWriteIndex, anyOf(is(Boolean.TRUE), nullValue()));
320+
});
321+
}
322+
}

x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,15 @@ public final class XPackRestTestConstants {
1717
public static final String[] TEMPLATE_NAMES_NO_ILM = new String[] { HISTORY_TEMPLATE_NAME_NO_ILM };
1818

1919
// ML constants:
20-
public static final String ML_META_INDEX_NAME = ".ml-meta";
21-
public static final String CONFIG_INDEX = ".ml-config";
2220
public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
2321
public static final String STATE_INDEX_PREFIX = ".ml-state";
24-
public static final String RESULTS_INDEX_DEFAULT = "shared";
2522

2623
public static final List<String> ML_POST_V7120_TEMPLATES = List.of(STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX);
2724

2825
// Transform constants:
2926
public static final String TRANSFORM_TASK_NAME = "data_frame/transforms";
3027
public static final String TRANSFORM_INTERNAL_INDEX_PREFIX = ".transform-internal-";
31-
public static final String TRANSFORM_NOTIFICATIONS_INDEX_PREFIX = ".transform-notifications-";
3228
public static final String TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED = ".data-frame-internal-";
33-
public static final String TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED = ".data-frame-notifications-";
3429

3530
private XPackRestTestConstants() {}
3631
}

0 commit comments

Comments
 (0)