Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133347.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133347
summary: Force rollover on write to true when data stream indices list is empty
area: Data streams
type: bug
issues:
- 133176
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.upgrades;

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.rest.ObjectPath;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

public class FailureStoreUpgradeIT extends AbstractRollingUpgradeTestCase {

public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}

final String INDEX_TEMPLATE = """
{
"index_patterns": ["$PATTERN"],
"data_stream": {},
"template": {
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"numeral": {
"type": "long"
}
}
}
}
}""";

private static final String VALID_DOC = """
{"@timestamp": "$now", "numeral": 0}
""";

private static final String INVALID_DOC = """
{"@timestamp": "$now", "numeral": "foobar"}
""";

private static final String BULK = """
{"create": {}}
{"@timestamp": "$now", "numeral": 0}
{"create": {}}
{"@timestamp": "$now", "numeral": 1}
{"create": {}}
{"@timestamp": "$now", "numeral": 2}
{"create": {}}
{"@timestamp": "$now", "numeral": 3}
{"create": {}}
{"@timestamp": "$now", "numeral": 4}
""";

private static final String ENABLE_FAILURE_STORE_OPTIONS = """
{
"failure_store": {
"enabled": true
}
}
""";

public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
assumeFalse(
"testing migration from data streams created before failure store feature state was added",
oldClusterHasFeature("gte_v8.15.0")
);
String dataStreamName = "fs-ds-upgrade-test";
String failureStoreName = dataStreamName + "::failures";
String templateName = "fs-ds-template";
if (isOldCluster()) {
// Create template
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
assertOK(client().performRequest(putIndexTemplateRequest));

// Initialize data stream
executeBulk(dataStreamName);

// Ensure document failure
indexDoc(dataStreamName, INVALID_DOC, false);

// Check data stream state
var dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));

assertDocCount(client(), dataStreamName, 5);
} else if (isMixedCluster()) {
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
if (isFirstMixedCluster()) {
indexDoc(dataStreamName, VALID_DOC, true);
indexDoc(dataStreamName, INVALID_DOC, false);
}
assertDocCount(client(), dataStreamName, 6);
} else if (isUpgradedCluster()) {
ensureGreen(dataStreamName);

// Ensure correct default failure store state for upgraded data stream
var dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));

// Ensure invalid document is not indexed
indexDoc(dataStreamName, INVALID_DOC, false);

// Enable failure store on upgraded data stream
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
assertOK(client().performRequest(putOptionsRequest));

// Ensure correct enabled failure store state
dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));

// Initialize failure store
int expectedFailureDocuments = 0;
if (randomBoolean()) {
// Index a failure via a mapping exception
indexDoc(dataStreamName, INVALID_DOC, true);
expectedFailureDocuments = 1;
} else {
// Manually rollover failure store to force initialization
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
assertOK(client().performRequest(failureStoreRolloverRequest));
}

// Ensure correct initialized failure store state
dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));

String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));

assertDocCount(client(), dataStreamName, 6);
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
}
}

private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
Response response = null;
try {
response = client().performRequest(indexRequest);
} catch (ResponseException re) {
response = re.getResponse();
}
assertNotNull(response);
if (expectSuccess) {
assertOK(response);
} else {
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
}
}

private static void executeBulk(String dataStreamName) throws IOException {
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
bulkRequest.addParameter("refresh", "true");
Response response = null;
try {
response = client().performRequest(bulkRequest);
} catch (ResponseException re) {
response = re.getResponse();
}
assertNotNull(response);
var responseBody = entityAsMap(response);
assertOK(response);
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
return entityAsMap(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ public DataStream(
lifecycle,
dataStreamOptions,
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
new DataStreamIndices(
FAILURE_STORE_PREFIX,
List.copyOf(failureIndices),
(replicated == false && failureIndices.isEmpty()),
null
)
);
}

Expand Down Expand Up @@ -260,8 +265,15 @@ public static DataStream read(StreamInput in) throws IOException {
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
} else {
// If we are reading from an older version that does not have these fields, just default
// to a reasonable value for rollover on write for the failure store
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
}
DataStreamOptions dataStreamOptions;
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
Expand Down Expand Up @@ -1371,7 +1383,11 @@ public void writeTo(StreamOutput out) throws IOException {
new DataStreamIndices(
FAILURE_STORE_PREFIX,
args[13] != null ? (List<Index>) args[13] : List.of(),
args[14] != null && (boolean) args[14],
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
// then use the rollover on write value (args[14]) present in the parser.
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
|| (args[14] != null && (boolean) args[14]),
(DataStreamAutoShardingEvent) args[15]
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta

private static DataStream newDataStreamInstance(List<Index> backingIndices, List<Index> failureStoreIndices) {
boolean isSystem = randomBoolean();
boolean isReplicated = randomBoolean();
return DataStream.builder(randomAlphaOfLength(50), backingIndices)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build())
.setFailureIndices(
DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices)
.setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty())
.build()
)
.setGeneration(randomLongBetween(1, 1000))
.setMetadata(Map.of())
.setSystem(isSystem)
.setHidden(isSystem || randomBoolean())
.setReplicated(randomBoolean())
.setReplicated(isReplicated)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ public static DataStream newInstance(
.setReplicated(replicated)
.setLifecycle(lifecycle)
.setDataStreamOptions(dataStreamOptions)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.setFailureIndices(
DataStream.DataStreamIndices.failureIndicesBuilder(failureStores)
.setRolloverOnWrite((replicated == false) && (failureStores.isEmpty()))
.build()
)
.build();
}

Expand Down Expand Up @@ -394,7 +398,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time
)
.build(),
DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
.setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
.setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean()))
.setAutoShardingEvent(
failureStore && randomBoolean()
? new DataStreamAutoShardingEvent(
Expand Down