Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
package org.elasticsearch.upgrades;

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
Expand All @@ -32,24 +31,32 @@
import java.util.Map;
import java.util.function.Supplier;

public class LogsIndexModeRollingUpgradeIT extends AbstractRollingUpgradeTestCase {
/**
* This test starts with LogsDB disabled, performs an upgrade, enables LogsDB and indexes some documents.
*/
public class LogsIndexModeRollingUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapped AbstractRollingUpgradeTestCase for AbstractRollingUpgradeWithSecurityTestCase since this test has auth enabled.


private static final String USER = "test_admin";
private static final String PASS = "x-pack-test-password";

private static final String LOGS_TEMPLATE = "logs-template";
private static final String DATA_STREAM = "logs-apache-production";

@ClassRule()
public static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.version(OLD_CLUSTER_VERSION)
.nodes(NODE_NUM)
.user(USER, PASS)
.module("constant-keyword")
.module("data-streams")
.module("mapper-extras")
.module("x-pack-aggregate-metric")
.module("x-pack-stack")
.setting("xpack.security.autoconfiguration.enabled", "false")
.user(USER, PASS)
.setting("xpack.license.self_generated.type", initTestSeed().nextBoolean() ? "trial" : "basic")
// We upgrade from standard to logsdb, so we need to start with logsdb disabled,
// then later cluster.logsdb.enabled gets set to true and next rollover data stream is in logsdb mode.
// LogsDB is enabled by default for data streams matching the logs-*-* pattern, and since we upgrade from standard to logsdb,
// we need to start with logsdb disabled, then later enable it and rollover
.setting("cluster.logsdb.enabled", "false")
.setting("stack.templates.enabled", "false")
.build();
Expand All @@ -63,14 +70,19 @@ protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected ElasticsearchCluster getUpgradeCluster() {
return cluster;
}

protected Settings restClientSettings() {
String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()));
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}

private static final String BULK_INDEX_REQUEST = """
private static final String BULK_INDEX_REQUEST_TEMPLATE = """
{ "create": {} }
{ "@timestamp": "%s", "host.name": "%s", "method": "%s", "ip.address": "%s", "message": "%s" }
{ "@timestamp": "$timestamp", "host.name": "$hostname", "method": "$method", "ip.address": "$ip", "message": "$message" }
""";

private static final String STANDARD_TEMPLATE = """
Expand Down Expand Up @@ -103,75 +115,34 @@ protected Settings restClientSettings() {

public void testLogsIndexing() throws IOException {
if (isOldCluster()) {
assertOK(client().performRequest(putTemplate(client(), "logs-template", STANDARD_TEMPLATE)));
assertOK(client().performRequest(createDataStream("logs-apache-production")));
final Response bulkIndexResponse = client().performRequest(bulkIndex("logs-apache-production", () -> {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < randomIntBetween(10, 20); i++) {
sb.append(
String.format(
BULK_INDEX_REQUEST,
DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()),
randomFrom("foo", "bar"),
randomFrom("PUT", "POST", "GET"),
InetAddresses.toAddrString(randomIp(randomBoolean())),
randomIntBetween(20, 50)
)
);
sb.append("\n");
}
return sb.toString();
}));
assertOK(bulkIndexResponse);
assertThat(entityAsMap(bulkIndexResponse).get("errors"), Matchers.is(false));
// given - create a template and data stream
putTemplate();
createDataStream();

// when/then - index some documents and ensure no issues occurred
bulkIndex(this::bulkIndexRequestBody);

// then continued - verify that the created data stream uses the created template
LogsdbIndexingRollingUpgradeIT.assertDataStream(DATA_STREAM, LOGS_TEMPLATE);

} else if (isMixedCluster()) {
assertOK(client().performRequest(rolloverDataStream(client(), "logs-apache-production")));
final Response bulkIndexResponse = client().performRequest(bulkIndex("logs-apache-production", () -> {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < randomIntBetween(10, 20); i++) {
sb.append(
String.format(
BULK_INDEX_REQUEST,
DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()),
randomFrom("foo", "bar"),
randomFrom("PUT", "POST", "GET"),
InetAddresses.toAddrString(randomIp(randomBoolean())),
randomIntBetween(20, 50)
)
);
sb.append("\n");
}
return sb.toString();
}));
assertOK(bulkIndexResponse);
assertThat(entityAsMap(bulkIndexResponse).get("errors"), Matchers.is(false));
// when/then - index more documents
bulkIndex(this::bulkIndexRequestBody);

} else if (isUpgradedCluster()) {
// when/then - index some more documents
bulkIndex(this::bulkIndexRequestBody);

// given - enable logsdb and rollover
enableLogsdbByDefault();
assertOK(client().performRequest(rolloverDataStream(client(), "logs-apache-production")));
final Response bulkIndexResponse = client().performRequest(bulkIndex("logs-apache-production", () -> {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < randomIntBetween(10, 20); i++) {
sb.append(
String.format(
BULK_INDEX_REQUEST,
DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()),
randomFrom("foo", "bar"),
randomFrom("PUT", "POST", "GET"),
InetAddresses.toAddrString(randomIp(randomBoolean())),
randomIntBetween(20, 50)
)
);
sb.append("\n");
}
return sb.toString();
}));
assertOK(bulkIndexResponse);
assertThat(entityAsMap(bulkIndexResponse).get("errors"), Matchers.is(false));
rolloverDataStream();

// when/then
bulkIndex(this::bulkIndexRequestBody);

// then continued - verify that only the latest write index has logsdb enabled
assertIndexSettings(0, Matchers.nullValue());
assertIndexSettings(1, Matchers.nullValue());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we no longer rollover when the cluster is in a mixed state, we have only two indices at the end

assertIndexSettings(2, Matchers.nullValue());
assertIndexSettings(3, Matchers.equalTo("logsdb"));
assertIndexSettings(1, Matchers.equalTo("logsdb"));
}
}

Expand All @@ -187,40 +158,64 @@ static void enableLogsdbByDefault() throws IOException {
assertOK(client().performRequest(request));
}

private String bulkIndexRequestBody() {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < randomIntBetween(10, 20); i++) {
sb.append(
BULK_INDEX_REQUEST_TEMPLATE
.replace("$timestamp", DateFormatter.forPattern(FormatNames.DATE_TIME.getName()).format(Instant.now()))
.replace("$hostname", randomFrom("potato.host", "tomato.host"))
.replace("$method", randomFrom("PUT", "POST", "GET"))
.replace("$ip", NetworkAddress.format(randomIp(randomBoolean())))
.replace("$message", randomAlphaOfLength(128))
);
sb.append("\n");
}
return sb.toString();
}

private void assertIndexSettings(int backingIndex, final Matcher<Object> indexModeMatcher) throws IOException {
assertThat(
getSettings(client(), getWriteBackingIndex(client(), "logs-apache-production", backingIndex)).get("index.mode"),
indexModeMatcher
);
assertThat(getSettings(client(), getWriteBackingIndex(client(), DATA_STREAM, backingIndex)).get("index.mode"), indexModeMatcher);
}

private static Request createDataStream(final String dataStreamName) {
return new Request("PUT", "/_data_stream/" + dataStreamName);
private static void createDataStream() throws IOException {
final Request request = new Request("PUT", "/_data_stream/" + DATA_STREAM);
final Response response = client().performRequest(request);
assertOK(response);
}

private static Request bulkIndex(final String dataStreamName, final Supplier<String> bulkIndexRequestSupplier) {
final Request request = new Request("POST", dataStreamName + "/_bulk");
private static void bulkIndex(final Supplier<String> bulkIndexRequestSupplier) throws IOException {
final Request request = new Request("POST", DATA_STREAM + "/_bulk");
request.setJsonEntity(bulkIndexRequestSupplier.get());
request.addParameter("refresh", "true");
return request;

final Response response = client().performRequest(request);
final var responseBody = entityAsMap(response);

// then - ensure no issues
assertOK(response);
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), Matchers.is(false));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

printing the error in response here makes finding the errors significantly easier. Previously, I had to log dive to understand what happened

}

private static Request putTemplate(final RestClient client, final String templateName, final String mappings) throws IOException {
final Request request = new Request("PUT", "/_index_template/" + templateName);
request.setJsonEntity(mappings);
return request;
private static void putTemplate() throws IOException {
final Request request = new Request("PUT", "/_index_template/" + LOGS_TEMPLATE);
request.setJsonEntity(STANDARD_TEMPLATE);
final Response response = client().performRequest(request);
assertOK(response);
}

private static Request rolloverDataStream(final RestClient client, final String dataStreamName) throws IOException {
return new Request("POST", "/" + dataStreamName + "/_rollover");
private static void rolloverDataStream() throws IOException {
final Request request = new Request("POST", "/" + DATA_STREAM + "/_rollover");
final Response response = client().performRequest(request);
assertOK(response);
}

@SuppressWarnings("unchecked")
static String getWriteBackingIndex(final RestClient client, final String dataStreamName, int backingIndex) throws IOException {
final Request request = new Request("GET", "_data_stream/" + dataStreamName);
final List<Object> dataStreams = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
final Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
final List<Map<String, String>> backingIndices = (List<Map<String, String>>) dataStream.get("indices");
final List<Object> DATA_STREAMs = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
final Map<String, Object> DATA_STREAM = (Map<String, Object>) DATA_STREAMs.get(0);
final List<Map<String, String>> backingIndices = (List<Map<String, String>>) DATA_STREAM.get("indices");
return backingIndices.get(backingIndex).get("index_name");
}

Expand Down