Skip to content

Commit c534b97

Browse files
Add WildcardRollingUpgradeIT (#137348)
Factor existing TextRollingUpgradeIT and MatchOnlyTextRollingUpgradeIT into an abstract parent class. Add WildcardRollingUpgradeIT as a new test for "wildcard" field type. Related to #137139
1 parent 4ae62d6 commit c534b97

File tree

4 files changed

+373
-612
lines changed

4 files changed

+373
-612
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.upgrades;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.Name;
11+
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.Response;
14+
import org.elasticsearch.client.ResponseException;
15+
import org.elasticsearch.common.network.NetworkAddress;
16+
import org.elasticsearch.common.time.DateFormatter;
17+
import org.elasticsearch.common.time.FormatNames;
18+
import org.elasticsearch.common.xcontent.XContentHelper;
19+
import org.elasticsearch.test.rest.ObjectPath;
20+
import org.elasticsearch.xcontent.XContentType;
21+
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.time.Instant;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Locale;
28+
import java.util.Map;
29+
30+
import static org.elasticsearch.upgrades.StandardToLogsDbIndexModeRollingUpgradeIT.enableLogsdbByDefault;
31+
import static org.elasticsearch.upgrades.StandardToLogsDbIndexModeRollingUpgradeIT.getWriteBackingIndex;
32+
import static org.hamcrest.Matchers.containsString;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
35+
import static org.hamcrest.Matchers.notNullValue;
36+
37+
public abstract class AbstractStringTypeRollingUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
38+
39+
private static final String DATA_STREAM = "logs-bwc-test";
40+
41+
private static final int IGNORE_ABOVE_MAX = 256;
42+
private static final int NUM_REQUESTS = 4;
43+
private static final int NUM_DOCS_PER_REQUEST = 1024;
44+
45+
static String BULK_ITEM_TEMPLATE =
46+
"""
47+
{ "create": {} }
48+
{"@timestamp": "$now", "host.name": "$host", "method": "$method", "ip": "$ip", "message": "$message", "length": $length, "factor": $factor}
49+
""";
50+
51+
private static final String TEMPLATE = """
52+
{
53+
"mappings": {
54+
"properties": {
55+
"@timestamp" : {
56+
"type": "date"
57+
},
58+
"method": {
59+
"type": "keyword"
60+
},
61+
"message": {
62+
"type": "$STRING_TYPE",
63+
"fields": {
64+
"keyword": {
65+
"ignore_above": $IGNORE_ABOVE,
66+
"type": "keyword"
67+
}
68+
}
69+
},
70+
"ip": {
71+
"type": "ip"
72+
},
73+
"length": {
74+
"type": "long"
75+
},
76+
"factor": {
77+
"type": "double"
78+
}
79+
}
80+
}
81+
}""";
82+
83+
// when sorted, this message will appear at the top and hence can be used to validate query results
84+
private static String smallestMessage;
85+
86+
public AbstractStringTypeRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
87+
super(upgradedNodes);
88+
}
89+
90+
abstract String stringType();
91+
92+
public void testIndexing() throws Exception {
93+
if (isOldCluster()) {
94+
// given - enable logsdb and create a template
95+
startTrial();
96+
enableLogsdbByDefault();
97+
String templateId = getClass().getSimpleName().toLowerCase(Locale.ROOT);
98+
createTemplate(DATA_STREAM, templateId, prepareTemplate());
99+
100+
// when - index some documents
101+
bulkIndex(NUM_REQUESTS, NUM_DOCS_PER_REQUEST);
102+
103+
// then - verify that logsdb and synthetic source are both enabled
104+
String firstBackingIndex = getWriteBackingIndex(client(), DATA_STREAM, 0);
105+
var settings = (Map<?, ?>) getIndexSettingsWithDefaults(firstBackingIndex).get(firstBackingIndex);
106+
assertThat(((Map<?, ?>) settings.get("settings")).get("index.mode"), equalTo("logsdb"));
107+
assertThat(((Map<?, ?>) settings.get("defaults")).get("index.mapping.source.mode"), equalTo("SYNTHETIC"));
108+
109+
// then continued - verify that the created data stream using the created template
110+
LogsdbIndexingRollingUpgradeIT.assertDataStream(DATA_STREAM, templateId);
111+
112+
// when/then - run some queries and verify results
113+
ensureGreen(DATA_STREAM);
114+
search(DATA_STREAM);
115+
phraseSearch(DATA_STREAM);
116+
query(DATA_STREAM);
117+
} else if (isMixedCluster()) {
118+
// when
119+
bulkIndex(NUM_REQUESTS, NUM_DOCS_PER_REQUEST);
120+
121+
// when/then
122+
ensureGreen(DATA_STREAM);
123+
search(DATA_STREAM);
124+
phraseSearch(DATA_STREAM);
125+
query(DATA_STREAM);
126+
} else if (isUpgradedCluster()) {
127+
// when/then
128+
ensureGreen(DATA_STREAM);
129+
bulkIndex(NUM_REQUESTS, NUM_DOCS_PER_REQUEST);
130+
search(DATA_STREAM);
131+
phraseSearch(DATA_STREAM);
132+
query(DATA_STREAM);
133+
134+
// when/then continued - force merge all shard segments into one
135+
var forceMergeRequest = new Request("POST", "/" + DATA_STREAM + "/_forcemerge");
136+
forceMergeRequest.addParameter("max_num_segments", "1");
137+
assertOK(client().performRequest(forceMergeRequest));
138+
139+
// then continued
140+
ensureGreen(DATA_STREAM);
141+
search(DATA_STREAM);
142+
query(DATA_STREAM);
143+
}
144+
}
145+
146+
private String prepareTemplate() {
147+
boolean shouldSetIgnoreAbove = randomBoolean();
148+
String templateWithType = TEMPLATE.replace("$STRING_TYPE", stringType());
149+
if (shouldSetIgnoreAbove) {
150+
return templateWithType.replace("$IGNORE_ABOVE", String.valueOf(randomInt(IGNORE_ABOVE_MAX)));
151+
}
152+
153+
// removes the entire line that defines ignore_above
154+
return templateWithType.replaceAll("(?m)^\\s*\"ignore_above\":\\s*\\$IGNORE_ABOVE\\s*,?\\s*\\n?", "");
155+
}
156+
157+
static void createTemplate(String dataStreamName, String id, String template) throws IOException {
158+
final String INDEX_TEMPLATE = """
159+
{
160+
"priority": 500,
161+
"index_patterns": ["$DATASTREAM"],
162+
"template": $TEMPLATE,
163+
"data_stream": {
164+
}
165+
}""";
166+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + id);
167+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$TEMPLATE", template).replace("$DATASTREAM", dataStreamName));
168+
assertOK(client().performRequest(putIndexTemplateRequest));
169+
}
170+
171+
private void bulkIndex(int numRequest, int numDocs) throws Exception {
172+
String firstIndex = null;
173+
Instant startTime = Instant.now().minusSeconds(60 * 60);
174+
175+
for (int i = 0; i < numRequest; i++) {
176+
var bulkRequest = new Request("POST", "/" + DATA_STREAM + "/_bulk");
177+
bulkRequest.setJsonEntity(bulkIndexRequestBody(numDocs, startTime));
178+
bulkRequest.addParameter("refresh", "true");
179+
180+
var response = client().performRequest(bulkRequest);
181+
var responseBody = entityAsMap(response);
182+
183+
assertOK(response);
184+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
185+
if (firstIndex == null) {
186+
firstIndex = (String) ((Map<?, ?>) ((Map<?, ?>) ((List<?>) responseBody.get("items")).get(0)).get("create")).get("_index");
187+
}
188+
}
189+
}
190+
191+
private String bulkIndexRequestBody(int numDocs, Instant startTime) {
192+
StringBuilder requestBody = new StringBuilder();
193+
194+
for (int j = 0; j < numDocs; j++) {
195+
String hostName = "host" + j % 50; // Not realistic, but makes asserting search / query response easier.
196+
String methodName = "method" + j % 5;
197+
String ip = NetworkAddress.format(randomIp(true));
198+
String message = randomAlphasDelimitedBySpace(10, 1, 15);
199+
recordSmallestMessage(message);
200+
long length = randomLong();
201+
double factor = randomDouble();
202+
203+
requestBody.append(
204+
BULK_ITEM_TEMPLATE.replace("$now", formatInstant(startTime))
205+
.replace("$host", hostName)
206+
.replace("$method", methodName)
207+
.replace("$ip", ip)
208+
.replace("$message", message)
209+
.replace("$length", Long.toString(length))
210+
.replace("$factor", Double.toString(factor))
211+
);
212+
requestBody.append('\n');
213+
214+
startTime = startTime.plusMillis(1);
215+
}
216+
217+
return requestBody.toString();
218+
}
219+
220+
/**
221+
* Generates a string containing a random number of random length alphas, all delimited by space.
222+
*/
223+
public static String randomAlphasDelimitedBySpace(int maxAlphas, int minCodeUnits, int maxCodeUnits) {
224+
int numAlphas = randomIntBetween(1, maxAlphas);
225+
List<String> alphas = new ArrayList<>(numAlphas);
226+
for (int i = 0; i < numAlphas; i++) {
227+
alphas.add(randomAlphaOfLengthBetween(minCodeUnits, maxCodeUnits));
228+
}
229+
return String.join(" ", alphas);
230+
}
231+
232+
private void recordSmallestMessage(final String message) {
233+
if (smallestMessage == null || message.compareTo(smallestMessage) < 0) {
234+
smallestMessage = message;
235+
}
236+
}
237+
238+
private void search(String dataStreamName) throws Exception {
239+
var searchRequest = new Request("POST", "/" + dataStreamName + "/_search");
240+
searchRequest.addParameter("pretty", "true");
241+
searchRequest.setJsonEntity("""
242+
{
243+
"size": 500
244+
}
245+
""");
246+
var response = client().performRequest(searchRequest);
247+
assertOK(response);
248+
var responseBody = entityAsMap(response);
249+
logger.info("{}", responseBody);
250+
251+
Integer totalCount = ObjectPath.evaluate(responseBody, "hits.total.value");
252+
assertThat(totalCount, greaterThanOrEqualTo(NUM_REQUESTS * NUM_DOCS_PER_REQUEST));
253+
}
254+
255+
private void phraseSearch(String dataStreamName) throws Exception {
256+
var searchRequest = new Request("POST", "/" + dataStreamName + "/_search");
257+
searchRequest.addParameter("pretty", "true");
258+
searchRequest.setJsonEntity("""
259+
{
260+
"query": {
261+
"match_phrase": {
262+
"message": "$smallestMessage"
263+
}
264+
}
265+
}
266+
""".replace("$smallestMessage", smallestMessage));
267+
var response = client().performRequest(searchRequest);
268+
assertOK(response);
269+
var responseBody = entityAsMap(response);
270+
logger.info("{}", responseBody);
271+
assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), greaterThanOrEqualTo(1));
272+
}
273+
274+
private void query(String dataStreamName) throws Exception {
275+
var queryRequest = new Request("POST", "/_query");
276+
queryRequest.addParameter("pretty", "true");
277+
queryRequest.setJsonEntity("""
278+
{
279+
"query": "FROM $ds | STATS max(length), max(factor) BY message | SORT message | LIMIT 5"
280+
}
281+
""".replace("$ds", dataStreamName));
282+
var response = client().performRequest(queryRequest);
283+
assertOK(response);
284+
var responseBody = entityAsMap(response);
285+
logger.info("{}", responseBody);
286+
287+
String column1 = ObjectPath.evaluate(responseBody, "columns.0.name");
288+
assertThat(column1, equalTo("max(length)"));
289+
String column2 = ObjectPath.evaluate(responseBody, "columns.1.name");
290+
assertThat(column2, equalTo("max(factor)"));
291+
String column3 = ObjectPath.evaluate(responseBody, "columns.2.name");
292+
assertThat(column3, equalTo("message"));
293+
294+
Long maxRx = ObjectPath.evaluate(responseBody, "values.0.0");
295+
assertThat(maxRx, notNullValue());
296+
Double maxTx = ObjectPath.evaluate(responseBody, "values.0.1");
297+
assertThat(maxTx, notNullValue());
298+
String key = ObjectPath.evaluate(responseBody, "values.0.2");
299+
assertThat(key, equalTo(smallestMessage));
300+
}
301+
302+
protected static void startTrial() throws IOException {
303+
Request startTrial = new Request("POST", "/_license/start_trial");
304+
startTrial.addParameter("acknowledge", "true");
305+
try {
306+
assertOK(client().performRequest(startTrial));
307+
} catch (ResponseException e) {
308+
var responseBody = entityAsMap(e.getResponse());
309+
String error = ObjectPath.evaluate(responseBody, "error_message");
310+
assertThat(error, containsString("Trial was already activated."));
311+
}
312+
}
313+
314+
static Map<String, Object> getIndexSettingsWithDefaults(String index) throws IOException {
315+
Request request = new Request("GET", "/" + index + "/_settings");
316+
request.addParameter("flat_settings", "true");
317+
request.addParameter("include_defaults", "true");
318+
Response response = client().performRequest(request);
319+
try (InputStream is = response.getEntity().getContent()) {
320+
return XContentHelper.convertToMap(
321+
XContentType.fromMediaType(response.getEntity().getContentType().getValue()).xContent(),
322+
is,
323+
true
324+
);
325+
}
326+
}
327+
328+
static String formatInstant(Instant instant) {
329+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
330+
}
331+
332+
}

0 commit comments

Comments
 (0)