Skip to content

Commit d6e3b65

Browse files
committed
Changes to run LogsdbIndexingRollingUpgradeIT in serverless.
* Moving LogsdbIndexingRollingUpgradeIT in a new gradle module, which can then can also be run in serverless. Old module has a dependency issue with `:qa:rolling-upgrade` and `javaRestTest`. The modified LogsdbIndexingRollingUpgradeIT no longer needs these dependencies. * Tests extending from ParameterizedRollingUpgradeTestCase can't run in serverless, so LogsdbIndexingRollingUpgradeIT now extends from ESRestTestCase. * LogsdbIndexingRollingUpgradeIT has one method that also performns the upgrade of nodes.
1 parent 40e5ea3 commit d6e3b65

File tree

4 files changed

+373
-0
lines changed

4 files changed

+373
-0
lines changed

build-tools-internal/src/main/resources/checkstyle_suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
<suppress files="qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]TsdbIT.java" checks="LineLength" />
3939
<suppress files="qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]TsdbIndexingRollingUpgradeIT.java" checks="LineLength" />
4040
<suppress files="qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]LogsdbIndexingRollingUpgradeIT.java" checks="LineLength" />
41+
<suppress files="qa[/\\]rolling-upgrade2[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]LogsdbIndexingRollingUpgradeIT.java" checks="LineLength" />
4142
<suppress files="plugin[/\\]logsdb[/\\]qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]MatchOnlyTextRollingUpgradeIT.java" checks="LineLength" />
4243

4344
<!-- Gradle requires inputs to be seriablizable -->
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask
8+
9+
apply plugin: 'elasticsearch.internal-java-rest-test'
10+
apply plugin: 'elasticsearch.internal-test-artifact-base'
11+
apply plugin: 'elasticsearch.bwc-test'
12+
apply plugin: 'elasticsearch.fwc-test'
13+
apply plugin: 'elasticsearch.bc-upgrade-test'
14+
apply plugin: 'elasticsearch.internal-test-artifact' // needed to run logsdb rest tests in serverless
15+
16+
dependencies {
17+
javaRestTestImplementation project(xpackModule('logsdb'))
18+
}
19+
20+
buildParams.bwcVersions.withWireCompatible { bwcVersion, baseName ->
21+
tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
22+
usesBwcDistribution(bwcVersion)
23+
systemProperty("tests.old_cluster_version", bwcVersion)
24+
}
25+
}
26+
27+
tasks.withType(Test).configureEach {
28+
// CI doesn't like it when there's multiple clusters running at once
29+
maxParallelForks = 1
30+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.logsdb;
9+
10+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
11+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
12+
import org.elasticsearch.test.cluster.util.Version;
13+
14+
public class Clusters {
15+
public static ElasticsearchCluster oldVersionCluster(String user, String pass) {
16+
String oldVersionString = System.getProperty("tests.old_cluster_version");
17+
Version oldVersion = Version.fromString(oldVersionString);
18+
boolean isDetachedVersion = System.getProperty("tests.bwc.refspec.main") != null;
19+
var cluster = ElasticsearchCluster.local()
20+
.distribution(DistributionType.DEFAULT)
21+
.withNode(node -> node.version(oldVersionString, isDetachedVersion))
22+
.withNode(node -> node.version(oldVersionString, isDetachedVersion))
23+
.setting("xpack.security.enabled", "true")
24+
.user(user, pass)
25+
.keystore("bootstrap.password", pass)
26+
.setting("xpack.license.self_generated.type", "trial");
27+
28+
if (supportRetryOnShardFailures(oldVersion) == false) {
29+
cluster.setting("cluster.routing.rebalance.enable", "none");
30+
}
31+
return cluster.build();
32+
}
33+
34+
private static boolean supportRetryOnShardFailures(Version version) {
35+
return version.onOrAfter(Version.fromString("9.1.0"))
36+
|| (version.onOrAfter(Version.fromString("8.19.0")) && version.before(Version.fromString("9.0.0")));
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.logsdb;
8+
9+
import org.elasticsearch.client.Request;
10+
import org.elasticsearch.client.Response;
11+
import org.elasticsearch.client.RestClient;
12+
import org.elasticsearch.common.network.NetworkAddress;
13+
import org.elasticsearch.common.settings.SecureString;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.common.time.DateFormatter;
16+
import org.elasticsearch.common.time.FormatNames;
17+
import org.elasticsearch.common.util.concurrent.ThreadContext;
18+
import org.elasticsearch.common.xcontent.XContentHelper;
19+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20+
import org.elasticsearch.test.cluster.util.Version;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.elasticsearch.test.rest.ObjectPath;
23+
import org.elasticsearch.xcontent.XContentType;
24+
import org.junit.ClassRule;
25+
26+
import java.io.IOException;
27+
import java.io.InputStream;
28+
import java.time.Instant;
29+
import java.util.List;
30+
import java.util.Locale;
31+
import java.util.Map;
32+
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
35+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
36+
import static org.hamcrest.Matchers.hasSize;
37+
import static org.hamcrest.Matchers.notNullValue;
38+
39+
public class LogsdbIndexingRollingUpgradeIT extends ESRestTestCase {
40+
41+
static String BULK_ITEM_TEMPLATE =
42+
"""
43+
{"@timestamp": "$now", "host.name": "$host", "method": "$method", "ip": "$ip", "message": "$message", "length": $length, "factor": $factor}
44+
""";
45+
46+
private static final String TEMPLATE = """
47+
{
48+
"mappings": {
49+
"properties": {
50+
"@timestamp" : {
51+
"type": "date"
52+
},
53+
"method": {
54+
"type": "keyword"
55+
},
56+
"message": {
57+
"type": "text"
58+
},
59+
"ip": {
60+
"type": "ip"
61+
},
62+
"length": {
63+
"type": "long"
64+
},
65+
"factor": {
66+
"type": "double"
67+
}
68+
}
69+
}
70+
}""";
71+
72+
private static final String USER = "admin-user";
73+
private static final String PASS = "x-pack-test-password";
74+
75+
@ClassRule
76+
public static final ElasticsearchCluster cluster = Clusters.oldVersionCluster(USER, PASS);
77+
78+
@Override
79+
protected String getTestRestCluster() {
80+
return cluster.getHttpAddresses();
81+
}
82+
83+
protected Settings restClientSettings() {
84+
String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()));
85+
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
86+
}
87+
88+
public void testIndexing() throws Exception {
89+
String dataStreamName = "logs-bwc-test";
90+
{
91+
String templateId = getClass().getSimpleName().toLowerCase(Locale.ROOT);
92+
createTemplate(dataStreamName, templateId, TEMPLATE);
93+
94+
Instant startTime = Instant.now().minusSeconds(60 * 60);
95+
bulkIndex(dataStreamName, 4, 1024, startTime);
96+
97+
String firstBackingIndex = getWriteBackingIndex(client(), dataStreamName, 0);
98+
var settings = (Map<?, ?>) getIndexSettingsWithDefaults(firstBackingIndex).get(firstBackingIndex);
99+
assertThat(((Map<?, ?>) settings.get("settings")).get("index.mode"), equalTo("logsdb"));
100+
assertThat(((Map<?, ?>) settings.get("defaults")).get("index.mapping.source.mode"), equalTo("SYNTHETIC"));
101+
102+
// check prior to rollover
103+
assertDataStream(dataStreamName, templateId);
104+
ensureGreen(dataStreamName);
105+
search(dataStreamName);
106+
query(dataStreamName);
107+
upgradeNode(0);
108+
}
109+
{
110+
Instant startTime = Instant.now().minusSeconds(60 * 30);
111+
bulkIndex(dataStreamName, 4, 1024, startTime);
112+
113+
ensureGreen(dataStreamName);
114+
search(dataStreamName);
115+
query(dataStreamName);
116+
upgradeNode(1);
117+
}
118+
{
119+
Instant startTime = Instant.now();
120+
bulkIndex(dataStreamName, 4, 1024, startTime);
121+
search(dataStreamName);
122+
query(dataStreamName);
123+
124+
var forceMergeRequest = new Request("POST", "/" + dataStreamName + "/_forcemerge");
125+
forceMergeRequest.addParameter("max_num_segments", "1");
126+
assertOK(client().performRequest(forceMergeRequest));
127+
128+
ensureGreen(dataStreamName);
129+
search(dataStreamName);
130+
query(dataStreamName);
131+
}
132+
}
133+
134+
private void upgradeNode(int n) throws IOException {
135+
closeClients();
136+
var upgradeVersion = Version.CURRENT;
137+
logger.info("Upgrading node {} to version {}", n, upgradeVersion);
138+
cluster.upgradeNodeToVersion(n, upgradeVersion);
139+
initClient();
140+
}
141+
142+
static void assertDataStream(String dataStreamName, String templateId) throws IOException {
143+
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
144+
var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
145+
assertOK(getDataStreamResponse);
146+
var dataStreams = entityAsMap(getDataStreamResponse);
147+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
148+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
149+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateId));
150+
}
151+
152+
static void createTemplate(String dataStreamName, String id, String template) throws IOException {
153+
final String INDEX_TEMPLATE = """
154+
{
155+
"priority": 200,
156+
"index_patterns": ["$DATASTREAM"],
157+
"template": $TEMPLATE,
158+
"data_stream": {
159+
}
160+
}""";
161+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + id);
162+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$TEMPLATE", template).replace("$DATASTREAM", dataStreamName));
163+
assertOK(client().performRequest(putIndexTemplateRequest));
164+
}
165+
166+
static String bulkIndex(String dataStreamName, int numRequest, int numDocs, Instant startTime) throws Exception {
167+
String firstIndex = null;
168+
for (int i = 0; i < numRequest; i++) {
169+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
170+
StringBuilder requestBody = new StringBuilder();
171+
for (int j = 0; j < numDocs; j++) {
172+
String hostName = "host" + j % 50; // Not realistic, but makes asserting search / query response easier.
173+
String methodName = "method" + j % 5;
174+
String ip = NetworkAddress.format(randomIp(true));
175+
String message = randomAlphaOfLength(128);
176+
long length = randomLong();
177+
double factor = randomDouble();
178+
179+
requestBody.append("{\"create\": {}}");
180+
requestBody.append('\n');
181+
requestBody.append(
182+
BULK_ITEM_TEMPLATE.replace("$now", formatInstant(startTime))
183+
.replace("$host", hostName)
184+
.replace("$method", methodName)
185+
.replace("$ip", ip)
186+
.replace("$message", message)
187+
.replace("$length", Long.toString(length))
188+
.replace("$factor", Double.toString(factor))
189+
);
190+
requestBody.append('\n');
191+
192+
startTime = startTime.plusMillis(1);
193+
}
194+
bulkRequest.setJsonEntity(requestBody.toString());
195+
bulkRequest.addParameter("refresh", "true");
196+
var response = client().performRequest(bulkRequest);
197+
assertOK(response);
198+
var responseBody = entityAsMap(response);
199+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
200+
if (firstIndex == null) {
201+
firstIndex = (String) ((Map<?, ?>) ((Map<?, ?>) ((List<?>) responseBody.get("items")).get(0)).get("create")).get("_index");
202+
}
203+
}
204+
return firstIndex;
205+
}
206+
207+
void search(String dataStreamName) throws Exception {
208+
var searchRequest = new Request("POST", "/" + dataStreamName + "/_search");
209+
searchRequest.addParameter("pretty", "true");
210+
searchRequest.setJsonEntity("""
211+
{
212+
"size": 0,
213+
"aggs": {
214+
"host_name": {
215+
"terms": {
216+
"field": "host.name",
217+
"order": { "_key": "asc" }
218+
},
219+
"aggs": {
220+
"max_length": {
221+
"max": {
222+
"field": "length"
223+
}
224+
},
225+
"max_factor": {
226+
"max": {
227+
"field": "factor"
228+
}
229+
}
230+
}
231+
}
232+
}
233+
}
234+
""");
235+
var response = client().performRequest(searchRequest);
236+
assertOK(response);
237+
var responseBody = entityAsMap(response);
238+
239+
Integer totalCount = ObjectPath.evaluate(responseBody, "hits.total.value");
240+
assertThat(totalCount, greaterThanOrEqualTo(4096));
241+
String key = ObjectPath.evaluate(responseBody, "aggregations.host_name.buckets.0.key");
242+
assertThat(key, equalTo("host0"));
243+
Integer docCount = ObjectPath.evaluate(responseBody, "aggregations.host_name.buckets.0.doc_count");
244+
assertThat(docCount, greaterThan(0));
245+
Double maxTx = ObjectPath.evaluate(responseBody, "aggregations.host_name.buckets.0.max_length.value");
246+
assertThat(maxTx, notNullValue());
247+
Double maxRx = ObjectPath.evaluate(responseBody, "aggregations.host_name.buckets.0.max_factor.value");
248+
assertThat(maxRx, notNullValue());
249+
}
250+
251+
void query(String dataStreamName) throws Exception {
252+
var queryRequest = new Request("POST", "/_query");
253+
queryRequest.addParameter("pretty", "true");
254+
queryRequest.setJsonEntity("""
255+
{
256+
"query": "FROM $ds | STATS max(length), max(factor) BY host.name | SORT host.name | LIMIT 5"
257+
}
258+
""".replace("$ds", dataStreamName));
259+
var response = client().performRequest(queryRequest);
260+
assertOK(response);
261+
var responseBody = entityAsMap(response);
262+
263+
String column1 = ObjectPath.evaluate(responseBody, "columns.0.name");
264+
String column2 = ObjectPath.evaluate(responseBody, "columns.1.name");
265+
String column3 = ObjectPath.evaluate(responseBody, "columns.2.name");
266+
assertThat(column1, equalTo("max(length)"));
267+
assertThat(column2, equalTo("max(factor)"));
268+
assertThat(column3, equalTo("host.name"));
269+
270+
String key = ObjectPath.evaluate(responseBody, "values.0.2");
271+
assertThat(key, equalTo("host0"));
272+
Long maxRx = ObjectPath.evaluate(responseBody, "values.0.0");
273+
assertThat(maxRx, notNullValue());
274+
Double maxTx = ObjectPath.evaluate(responseBody, "values.0.1");
275+
assertThat(maxTx, notNullValue());
276+
}
277+
278+
static Map<String, Object> getIndexSettingsWithDefaults(String index) throws IOException {
279+
Request request = new Request("GET", "/" + index + "/_settings");
280+
request.addParameter("flat_settings", "true");
281+
request.addParameter("include_defaults", "true");
282+
Response response = client().performRequest(request);
283+
try (InputStream is = response.getEntity().getContent()) {
284+
return XContentHelper.convertToMap(
285+
XContentType.fromMediaType(response.getEntity().getContentType().getValue()).xContent(),
286+
is,
287+
true
288+
);
289+
}
290+
}
291+
292+
static String formatInstant(Instant instant) {
293+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
294+
}
295+
296+
static String getWriteBackingIndex(final RestClient client, final String dataStreamName, int backingIndex) throws IOException {
297+
final Request request = new Request("GET", "_data_stream/" + dataStreamName);
298+
final List<?> dataStreams = (List<?>) entityAsMap(client.performRequest(request)).get("data_streams");
299+
final Map<?, ?> dataStream = (Map<?, ?>) dataStreams.getFirst();
300+
final List<?> backingIndices = (List<?>) dataStream.get("indices");
301+
return (String) ((Map<?, ?>)backingIndices.get(backingIndex)).get("index_name");
302+
}
303+
304+
}

0 commit comments

Comments
 (0)