Skip to content

Commit 954db37

Browse files
authored
Add migration test for .async-search system index (#121517)
This change adds migration testing for the ".async-search" system index to the full cluster upgrade tests that perform updates from versions N-2 to N via N-1. The test creates a system index by using async_search on a cluster with version N-2, then calls the "_migrate" API in version N-1 and finally checks that on the upgraded cluster in N we are still able to retrieve async search results from previous versions and can still write to the system index. This is necessary to ensure we don't end up with a write-only async search system index when migrating to version 9.
1 parent 0cd3ff5 commit 954db37

File tree

2 files changed

+172
-2
lines changed

2 files changed

+172
-2
lines changed

qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/AbstractIndexCompatibilityTestCase.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
import org.apache.http.entity.ContentType;
1313
import org.apache.http.entity.InputStreamEntity;
1414
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.RequestOptions;
1516
import org.elasticsearch.client.ResponseException;
17+
import org.elasticsearch.client.WarningsHandler;
1618
import org.elasticsearch.cluster.block.ClusterBlock;
1719
import org.elasticsearch.cluster.metadata.IndexMetadata;
1820
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
@@ -27,6 +29,7 @@
2729
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2830
import org.elasticsearch.test.cluster.util.Version;
2931
import org.elasticsearch.test.rest.ESRestTestCase;
32+
import org.elasticsearch.test.rest.ObjectPath;
3033
import org.elasticsearch.xcontent.XContentType;
3134
import org.hamcrest.Matcher;
3235
import org.junit.After;
@@ -161,8 +164,21 @@ protected static boolean isFullyUpgradedTo(Version version) throws Exception {
161164
}
162165

163166
protected static Version indexVersion(String indexName) throws Exception {
164-
var response = assertOK(client().performRequest(new Request("GET", "/" + indexName + "/_settings")));
165-
int id = Integer.parseInt(createFromResponse(response).evaluate(indexName + ".settings.index.version.created"));
167+
return indexVersion(indexName, false);
168+
}
169+
170+
protected static Version indexVersion(String indexName, boolean ignoreWarnings) throws Exception {
171+
Request request = new Request("GET", "/" + indexName + "/_settings");
172+
request.addParameter("flat_settings", "true");
173+
if (ignoreWarnings) {
174+
RequestOptions.Builder options = request.getOptions().toBuilder();
175+
options.setWarningsHandler(WarningsHandler.PERMISSIVE);
176+
request.setOptions(options);
177+
}
178+
var response = assertOK(client().performRequest(request));
179+
ObjectPath fromResponse = createFromResponse(response);
180+
Map<String, Object> settings = fromResponse.evaluateExact(indexName, "settings");
181+
int id = Integer.parseInt((String) settings.get("index.version.created"));
166182
return new Version((byte) ((id / 1000000) % 100), (byte) ((id / 10000) % 100), (byte) ((id / 100) % 100));
167183
}
168184

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.lucene;
11+
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.RequestOptions;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.RestClient;
16+
import org.elasticsearch.client.WarningsHandler;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.test.cluster.util.Version;
20+
import org.elasticsearch.test.rest.ObjectPath;
21+
22+
import java.io.IOException;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import static org.hamcrest.Matchers.equalTo;
27+
28+
public class FullClusterRestartSystemIndexCompatibilityIT extends FullClusterRestartIndexCompatibilityTestCase {
29+
30+
static {
31+
clusterConfig = config -> config.setting("xpack.license.self_generated.type", "trial");
32+
}
33+
34+
public FullClusterRestartSystemIndexCompatibilityIT(Version version) {
35+
super(version);
36+
}
37+
38+
// we need a place to store async_search ids across cluster restarts
39+
private static Map<String, String> async_search_ids = new HashMap<>(3);
40+
41+
/**
42+
* 1. creates an index on N-2 and performs async_search on it that is kept in system index
43+
* 2. After update to N-1 (latest) perform a system index migration step, also write block the index
44+
* 3. on N, check that async search results are still retrievable and we can write to the system index
45+
*/
46+
public void testAsyncSearchIndexMigration() throws Exception {
47+
final String index = suffix("index");
48+
final String asyncSearchIndex = ".async-search";
49+
final int numDocs = 2431;
50+
51+
final Request asyncSearchRequest = new Request("POST", "/" + index + "/_async_search?size=100&keep_on_completion=true");
52+
53+
if (isFullyUpgradedTo(VERSION_MINUS_2)) {
54+
createIndex(
55+
client(),
56+
index,
57+
Settings.builder()
58+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
59+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
60+
.build()
61+
);
62+
indexDocs(index, numDocs);
63+
ensureGreen(index);
64+
65+
assertThat(indexVersion(index), equalTo(VERSION_MINUS_2));
66+
String asyncId = searchAsyncAndStoreId(asyncSearchRequest, "n-2_id");
67+
ensureGreen(asyncSearchIndex);
68+
69+
assertAsyncSearchHitCount(asyncId, numDocs);
70+
assertBusy(() -> assertDocCountNoWarnings(client(), asyncSearchIndex, 1));
71+
assertThat(indexVersion(asyncSearchIndex, true), equalTo(VERSION_MINUS_2));
72+
return;
73+
}
74+
75+
if (isFullyUpgradedTo(VERSION_MINUS_1)) {
76+
// check .async-search index is readable
77+
assertThat(indexVersion(asyncSearchIndex, true), equalTo(VERSION_MINUS_2));
78+
assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs);
79+
80+
// migrate system indices
81+
Request migrateRequest = new Request("POST", "/_migration/system_features");
82+
assertThat(
83+
ObjectPath.createFromResponse(client().performRequest(migrateRequest)).evaluate("features.0.feature_name"),
84+
equalTo("async_search")
85+
);
86+
assertBusy(() -> {
87+
Request checkMigrateProgress = new Request("GET", "/_migration/system_features");
88+
Response resp = null;
89+
try {
90+
assertFalse(
91+
ObjectPath.createFromResponse(client().performRequest(checkMigrateProgress))
92+
.evaluate("migration_status")
93+
.equals("IN_PROGRESS")
94+
);
95+
} catch (IOException e) {
96+
throw new AssertionError("System feature migration failed", e);
97+
}
98+
});
99+
100+
// check search results from n-2 search are still readable
101+
assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs);
102+
103+
// perform new async search and check its readable
104+
String asyncId = searchAsyncAndStoreId(asyncSearchRequest, "n-1_id");
105+
assertAsyncSearchHitCount(asyncId, numDocs);
106+
assertBusy(() -> assertDocCountNoWarnings(client(), asyncSearchIndex, 2));
107+
108+
// in order to move to current version we need write block for n-2 index
109+
addIndexBlock(index, IndexMetadata.APIBlock.WRITE);
110+
}
111+
112+
if (isFullyUpgradedTo(VERSION_CURRENT)) {
113+
assertThat(indexVersion(index, true), equalTo(VERSION_MINUS_2));
114+
assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs);
115+
assertAsyncSearchHitCount(async_search_ids.get("n-1_id"), numDocs);
116+
117+
// check system index is still writeable
118+
String asyncId = searchAsyncAndStoreId(asyncSearchRequest, "n_id");
119+
assertAsyncSearchHitCount(asyncId, numDocs);
120+
assertBusy(() -> assertDocCountNoWarnings(client(), asyncSearchIndex, 3));
121+
}
122+
123+
}
124+
125+
private static String searchAsyncAndStoreId(Request asyncSearchRequest, String asyncIdName) throws IOException {
126+
ObjectPath resp = ObjectPath.createFromResponse(client().performRequest(asyncSearchRequest));
127+
String asyncId = resp.evaluate("id");
128+
assertNotNull(asyncId);
129+
async_search_ids.put(asyncIdName, asyncId);
130+
return asyncId;
131+
}
132+
133+
private static void assertAsyncSearchHitCount(String asyncId, int numDocs) throws IOException {
134+
var asyncGet = new Request("GET", "/_async_search/" + asyncId);
135+
ObjectPath resp = ObjectPath.createFromResponse(client().performRequest(asyncGet));
136+
assertEquals(Integer.valueOf(numDocs), resp.evaluate("response.hits.total.value"));
137+
}
138+
139+
/**
140+
* Assert that the index in question has the given number of documents present
141+
*/
142+
private static void assertDocCountNoWarnings(RestClient client, String indexName, long docCount) throws IOException {
143+
Request countReq = new Request("GET", "/" + indexName + "/_count");
144+
RequestOptions.Builder options = countReq.getOptions().toBuilder();
145+
options.setWarningsHandler(WarningsHandler.PERMISSIVE);
146+
countReq.setOptions(options);
147+
ObjectPath resp = ObjectPath.createFromResponse(client.performRequest(countReq));
148+
assertEquals(
149+
"expected " + docCount + " documents but it was a different number",
150+
docCount,
151+
Long.parseLong(resp.evaluate("count").toString())
152+
);
153+
}
154+
}

0 commit comments

Comments
 (0)