Skip to content

Commit 2aaaefd

Browse files
committed
Fixed LogsIndexModeRollingUpgradeIT
1 parent b43ec87 commit 2aaaefd

File tree

1 file changed

+85
-90
lines changed

1 file changed

+85
-90
lines changed

x-pack/plugin/logsdb/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsIndexModeRollingUpgradeIT.java

Lines changed: 85 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@
1010
package org.elasticsearch.upgrades;
1111

1212
import com.carrotsearch.randomizedtesting.annotations.Name;
13-
1413
import org.elasticsearch.client.Request;
1514
import org.elasticsearch.client.Response;
1615
import org.elasticsearch.client.RestClient;
17-
import org.elasticsearch.common.network.InetAddresses;
16+
import org.elasticsearch.common.network.NetworkAddress;
1817
import org.elasticsearch.common.settings.SecureString;
1918
import org.elasticsearch.common.settings.Settings;
2019
import org.elasticsearch.common.time.DateFormatter;
@@ -32,24 +31,32 @@
3231
import java.util.Map;
3332
import java.util.function.Supplier;
3433

35-
public class LogsIndexModeRollingUpgradeIT extends AbstractRollingUpgradeTestCase {
34+
/**
35+
* This test starts with LogsDB disabled, performs an upgrade, enables LogsDB and indexes some documents.
36+
*/
37+
public class LogsIndexModeRollingUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
3638

3739
private static final String USER = "test_admin";
3840
private static final String PASS = "x-pack-test-password";
3941

42+
private static final String LOGS_TEMPLATE = "logs-template";
43+
private static final String DATA_STREAM = "logs-apache-production";
44+
4045
@ClassRule()
4146
public static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
4247
.distribution(DistributionType.DEFAULT)
48+
.version(OLD_CLUSTER_VERSION)
49+
.nodes(NODE_NUM)
50+
.user(USER, PASS)
4351
.module("constant-keyword")
4452
.module("data-streams")
4553
.module("mapper-extras")
4654
.module("x-pack-aggregate-metric")
4755
.module("x-pack-stack")
4856
.setting("xpack.security.autoconfiguration.enabled", "false")
49-
.user(USER, PASS)
5057
.setting("xpack.license.self_generated.type", initTestSeed().nextBoolean() ? "trial" : "basic")
51-
// We upgrade from standard to logsdb, so we need to start with logsdb disabled,
52-
// then later cluster.logsdb.enabled gets set to true and next rollover data stream is in logsdb mode.
58+
// LogsDB is enabled by default for data streams matching the logs-*-* pattern, and since we upgrade from standard to logsdb,
59+
// we need to start with logsdb disabled, then later enable it and rollover
5360
.setting("cluster.logsdb.enabled", "false")
5461
.setting("stack.templates.enabled", "false")
5562
.build();
@@ -63,14 +70,19 @@ protected String getTestRestCluster() {
6370
return cluster.getHttpAddresses();
6471
}
6572

73+
@Override
74+
protected ElasticsearchCluster getUpgradeCluster() {
75+
return cluster;
76+
}
77+
6678
protected Settings restClientSettings() {
6779
String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()));
6880
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
6981
}
7082

71-
private static final String BULK_INDEX_REQUEST = """
83+
private static final String BULK_INDEX_REQUEST_TEMPLATE = """
7284
{ "create": {} }
73-
{ "@timestamp": "%s", "host.name": "%s", "method": "%s", "ip.address": "%s", "message": "%s" }
85+
{ "@timestamp": "$timestamp", "host.name": "$hostname", "method": "$method", "ip.address": "$ip", "message": "$message" }
7486
""";
7587

7688
private static final String STANDARD_TEMPLATE = """
@@ -103,75 +115,34 @@ protected Settings restClientSettings() {
103115

104116
public void testLogsIndexing() throws IOException {
105117
if (isOldCluster()) {
106-
assertOK(client().performRequest(putTemplate(client(), "logs-template", STANDARD_TEMPLATE)));
107-
assertOK(client().performRequest(createDataStream("logs-apache-production")));
108-
final Response bulkIndexResponse = client().performRequest(bulkIndex("logs-apache-production", () -> {
109-
final StringBuilder sb = new StringBuilder();
110-
for (int i = 0; i < randomIntBetween(10, 20); i++) {
111-
sb.append(
112-
String.format(
113-
BULK_INDEX_REQUEST,
114-
DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()),
115-
randomFrom("foo", "bar"),
116-
randomFrom("PUT", "POST", "GET"),
117-
InetAddresses.toAddrString(randomIp(randomBoolean())),
118-
randomIntBetween(20, 50)
119-
)
120-
);
121-
sb.append("\n");
122-
}
123-
return sb.toString();
124-
}));
125-
assertOK(bulkIndexResponse);
126-
assertThat(entityAsMap(bulkIndexResponse).get("errors"), Matchers.is(false));
118+
// given - create a template and data stream
119+
putTemplate();
120+
createDataStream();
121+
122+
// when/then - index some documents and ensure no issues occurred
123+
bulkIndex(this::bulkIndexRequestBody);
124+
125+
// then continued - verify that the created data stream uses the created template
126+
LogsdbIndexingRollingUpgradeIT.assertDataStream(DATA_STREAM, LOGS_TEMPLATE);
127+
127128
} else if (isMixedCluster()) {
128-
assertOK(client().performRequest(rolloverDataStream(client(), "logs-apache-production")));
129-
final Response bulkIndexResponse = client().performRequest(bulkIndex("logs-apache-production", () -> {
130-
final StringBuilder sb = new StringBuilder();
131-
for (int i = 0; i < randomIntBetween(10, 20); i++) {
132-
sb.append(
133-
String.format(
134-
BULK_INDEX_REQUEST,
135-
DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()),
136-
randomFrom("foo", "bar"),
137-
randomFrom("PUT", "POST", "GET"),
138-
InetAddresses.toAddrString(randomIp(randomBoolean())),
139-
randomIntBetween(20, 50)
140-
)
141-
);
142-
sb.append("\n");
143-
}
144-
return sb.toString();
145-
}));
146-
assertOK(bulkIndexResponse);
147-
assertThat(entityAsMap(bulkIndexResponse).get("errors"), Matchers.is(false));
129+
// when/then - index more documents
130+
bulkIndex(this::bulkIndexRequestBody);
131+
148132
} else if (isUpgradedCluster()) {
133+
// when/then - index some more documents
134+
bulkIndex(this::bulkIndexRequestBody);
135+
136+
// given - enable logsdb and rollover
149137
enableLogsdbByDefault();
150-
assertOK(client().performRequest(rolloverDataStream(client(), "logs-apache-production")));
151-
final Response bulkIndexResponse = client().performRequest(bulkIndex("logs-apache-production", () -> {
152-
final StringBuilder sb = new StringBuilder();
153-
for (int i = 0; i < randomIntBetween(10, 20); i++) {
154-
sb.append(
155-
String.format(
156-
BULK_INDEX_REQUEST,
157-
DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()),
158-
randomFrom("foo", "bar"),
159-
randomFrom("PUT", "POST", "GET"),
160-
InetAddresses.toAddrString(randomIp(randomBoolean())),
161-
randomIntBetween(20, 50)
162-
)
163-
);
164-
sb.append("\n");
165-
}
166-
return sb.toString();
167-
}));
168-
assertOK(bulkIndexResponse);
169-
assertThat(entityAsMap(bulkIndexResponse).get("errors"), Matchers.is(false));
138+
rolloverDataStream();
139+
140+
// when/then
141+
bulkIndex(this::bulkIndexRequestBody);
170142

143+
// then continued - verify that only the latest write index has logsdb enabled
171144
assertIndexSettings(0, Matchers.nullValue());
172-
assertIndexSettings(1, Matchers.nullValue());
173-
assertIndexSettings(2, Matchers.nullValue());
174-
assertIndexSettings(3, Matchers.equalTo("logsdb"));
145+
assertIndexSettings(1, Matchers.equalTo("logsdb"));
175146
}
176147
}
177148

@@ -187,40 +158,64 @@ static void enableLogsdbByDefault() throws IOException {
187158
assertOK(client().performRequest(request));
188159
}
189160

161+
private String bulkIndexRequestBody() {
162+
final StringBuilder sb = new StringBuilder();
163+
for (int i = 0; i < randomIntBetween(10, 20); i++) {
164+
sb.append(
165+
BULK_INDEX_REQUEST_TEMPLATE
166+
.replace("$timestamp", DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()))
167+
.replace("$hostname", randomFrom("potato.host", "tomato.host"))
168+
.replace("$method", randomFrom("PUT", "POST", "GET"))
169+
.replace("$ip", NetworkAddress.format(randomIp(randomBoolean())))
170+
.replace("$message", randomAlphaOfLength(128))
171+
);
172+
sb.append("\n");
173+
}
174+
return sb.toString();
175+
}
176+
190177
private void assertIndexSettings(int backingIndex, final Matcher<Object> indexModeMatcher) throws IOException {
191-
assertThat(
192-
getSettings(client(), getWriteBackingIndex(client(), "logs-apache-production", backingIndex)).get("index.mode"),
193-
indexModeMatcher
194-
);
178+
assertThat(getSettings(client(), getWriteBackingIndex(client(), DATA_STREAM, backingIndex)).get("index.mode"), indexModeMatcher);
195179
}
196180

197-
private static Request createDataStream(final String dataStreamName) {
198-
return new Request("PUT", "/_data_stream/" + dataStreamName);
181+
private static void createDataStream() throws IOException {
182+
final Request request = new Request("PUT", "/_data_stream/" + DATA_STREAM);
183+
final Response response = client().performRequest(request);
184+
assertOK(response);
199185
}
200186

201-
private static Request bulkIndex(final String dataStreamName, final Supplier<String> bulkIndexRequestSupplier) {
202-
final Request request = new Request("POST", dataStreamName + "/_bulk");
187+
private static void bulkIndex(final Supplier<String> bulkIndexRequestSupplier) throws IOException {
188+
final Request request = new Request("POST", DATA_STREAM + "/_bulk");
203189
request.setJsonEntity(bulkIndexRequestSupplier.get());
204190
request.addParameter("refresh", "true");
205-
return request;
191+
192+
final Response response = client().performRequest(request);
193+
final var responseBody = entityAsMap(response);
194+
195+
// then - ensure no issues
196+
assertOK(response);
197+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), Matchers.is(false));
206198
}
207199

208-
private static Request putTemplate(final RestClient client, final String templateName, final String mappings) throws IOException {
209-
final Request request = new Request("PUT", "/_index_template/" + templateName);
210-
request.setJsonEntity(mappings);
211-
return request;
200+
private static void putTemplate() throws IOException {
201+
final Request request = new Request("PUT", "/_index_template/" + LOGS_TEMPLATE);
202+
request.setJsonEntity(STANDARD_TEMPLATE);
203+
final Response response = client().performRequest(request);
204+
assertOK(response);
212205
}
213206

214-
private static Request rolloverDataStream(final RestClient client, final String dataStreamName) throws IOException {
215-
return new Request("POST", "/" + dataStreamName + "/_rollover");
207+
private static void rolloverDataStream() throws IOException {
208+
final Request request = new Request("POST", "/" + DATA_STREAM + "/_rollover");
209+
final Response response = client().performRequest(request);
210+
assertOK(response);
216211
}
217212

218213
@SuppressWarnings("unchecked")
219214
static String getWriteBackingIndex(final RestClient client, final String dataStreamName, int backingIndex) throws IOException {
220215
final Request request = new Request("GET", "_data_stream/" + dataStreamName);
221-
final List<Object> dataStreams = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
222-
final Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
223-
final List<Map<String, String>> backingIndices = (List<Map<String, String>>) dataStream.get("indices");
216+
final List<Object> DATA_STREAMs = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
217+
final Map<String, Object> DATA_STREAM = (Map<String, Object>) DATA_STREAMs.get(0);
218+
final List<Map<String, String>> backingIndices = (List<Map<String, String>>) DATA_STREAM.get("indices");
224219
return backingIndices.get(backingIndex).get("index_name");
225220
}
226221

0 commit comments

Comments
 (0)