Skip to content
6 changes: 6 additions & 0 deletions docs/changelog/133347.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133347
summary: Force rollover on write to true when data stream indices list is empty
area: Data streams
type: bug
issues:
- 133176
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.upgrades;

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.rest.ObjectPath;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

public class FailureStoreUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {

public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}

final String INDEX_TEMPLATE = """
{
"index_patterns": ["$PATTERN"],
"data_stream": {},
"template": {
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"numeral": {
"type": "long"
}
}
}
}
}""";

private static final String VALID_DOC = """
{"@timestamp": "$now", "numeral": 0}
""";

private static final String INVALID_DOC = """
{"@timestamp": "$now", "numeral": "foobar"}
""";

private static final String BULK = """
{"create": {}}
{"@timestamp": "$now", "numeral": 0}
{"create": {}}
{"@timestamp": "$now", "numeral": 1}
{"create": {}}
{"@timestamp": "$now", "numeral": 2}
{"create": {}}
{"@timestamp": "$now", "numeral": 3}
{"create": {}}
{"@timestamp": "$now", "numeral": 4}
""";

private static final String ENABLE_FAILURE_STORE_OPTIONS = """
{
"failure_store": {
"enabled": true
}
}
""";

public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
assumeFalse(
"testing migration from data streams created before failure store feature existed",
oldClusterHasFeature(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)
);
String dataStreamName = "fs-ds-upgrade-test";
String failureStoreName = dataStreamName + "::failures";
String templateName = "fs-ds-template";
if (isOldCluster()) {
// Create template
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
assertOK(client().performRequest(putIndexTemplateRequest));

// Initialize data stream
executeBulk(dataStreamName);

// Ensure document failure
indexDoc(dataStreamName, INVALID_DOC, false);

// Check data stream state
var dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));

assertDocCount(client(), dataStreamName, 5);
} else if (isMixedCluster()) {
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
if (isFirstMixedCluster()) {
indexDoc(dataStreamName, VALID_DOC, true);
indexDoc(dataStreamName, INVALID_DOC, false);
}
assertDocCount(client(), dataStreamName, 6);
} else if (isUpgradedCluster()) {
ensureGreen(dataStreamName);

// Ensure correct default failure store state for upgraded data stream
var dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));

// Ensure invalid document is not indexed
indexDoc(dataStreamName, INVALID_DOC, false);

// Enable failure store on upgraded data stream
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
assertOK(client().performRequest(putOptionsRequest));

// Ensure correct enabled failure store state
dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));

// Initialize failure store
int expectedFailureDocuments = 0;
if (randomBoolean()) {
// Index a failure via a mapping exception
indexDoc(dataStreamName, INVALID_DOC, true);
expectedFailureDocuments = 1;
} else {
// Manually rollover failure store to force initialization
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
assertOK(client().performRequest(failureStoreRolloverRequest));
}

// Ensure correct initialized failure store state
dataStreams = getDataStream(dataStreamName);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));

String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));

assertDocCount(client(), dataStreamName, 6);
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
}
}

private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
Response response = null;
try {
response = client().performRequest(indexRequest);
} catch (ResponseException re) {
response = re.getResponse();
}
assertNotNull(response);
if (expectSuccess) {
assertOK(response);
} else {
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
}
}

private static void executeBulk(String dataStreamName) throws IOException {
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
bulkRequest.addParameter("refresh", "true");
Response response = null;
try {
response = client().performRequest(bulkRequest);
} catch (ResponseException re) {
response = re.getResponse();
}
assertNotNull(response);
var responseBody = entityAsMap(response);
assertOK(response);
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
return entityAsMap(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ public DataStream(
lifecycle,
dataStreamOptions,
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
new DataStreamIndices(
FAILURE_STORE_PREFIX,
List.copyOf(failureIndices),
(replicated == false && failureIndices.isEmpty()),
null
)
);
}

Expand Down Expand Up @@ -283,8 +288,15 @@ public static DataStream read(StreamInput in) throws IOException {
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
} else {
// If we are reading from an older version that does not have these fields, just default
// to a reasonable value for rollover on write for the failure store
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
}
DataStreamOptions dataStreamOptions;
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
Expand Down Expand Up @@ -1490,7 +1502,11 @@ public void writeTo(StreamOutput out) throws IOException {
new DataStreamIndices(
FAILURE_STORE_PREFIX,
args[13] != null ? (List<Index>) args[13] : List.of(),
args[14] != null && (boolean) args[14],
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
// then use the rollover on write value (args[14]) present in the parser.
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
|| (args[14] != null && (boolean) args[14]),
(DataStreamAutoShardingEvent) args[15]
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta

private static DataStream newDataStreamInstance(List<Index> backingIndices, List<Index> failureStoreIndices) {
boolean isSystem = randomBoolean();
boolean isReplicated = randomBoolean();
return DataStream.builder(randomAlphaOfLength(50), backingIndices)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build())
.setFailureIndices(
DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices)
.setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty())
.build()
)
.setGeneration(randomLongBetween(1, 1000))
.setMetadata(Map.of())
.setSystem(isSystem)
.setHidden(isSystem || randomBoolean())
.setReplicated(randomBoolean())
.setReplicated(isReplicated)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2473,7 +2473,7 @@ public void testToXContent() throws IOException {
"system": false,
"allow_custom_routing": false,
"settings" : { },
"failure_rollover_on_write": false,
"failure_rollover_on_write": true,
"rollover_on_write": false
}
},
Expand Down Expand Up @@ -2740,7 +2740,7 @@ public void testToXContentMultiProject() throws IOException {
"system": false,
"allow_custom_routing": false,
"settings" : { },
"failure_rollover_on_write": false,
"failure_rollover_on_write": true,
"rollover_on_write": false
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ public static DataStream newInstance(
.setReplicated(replicated)
.setLifecycle(lifecycle)
.setDataStreamOptions(dataStreamOptions)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.setFailureIndices(
DataStream.DataStreamIndices.failureIndicesBuilder(failureStores)
.setRolloverOnWrite((replicated == false) && (failureStores.isEmpty()))
.build()
)
.build();
}

Expand Down Expand Up @@ -391,7 +395,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time
)
.build(),
DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
.setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
.setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean()))
.setAutoShardingEvent(
failureStore && randomBoolean()
? new DataStreamAutoShardingEvent(
Expand Down