diff --git a/docs/changelog/133347.yaml b/docs/changelog/133347.yaml new file mode 100644 index 0000000000000..dd047bccf217a --- /dev/null +++ b/docs/changelog/133347.yaml @@ -0,0 +1,6 @@ +pr: 133347 +summary: Force rollover on write to true when data stream indices list is empty +area: Data streams +type: bug +issues: + - 133176 diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/FailureStoreUpgradeIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/FailureStoreUpgradeIT.java new file mode 100644 index 0000000000000..66caca9c1c51a --- /dev/null +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/FailureStoreUpgradeIT.java @@ -0,0 +1,229 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.upgrades; + +import com.carrotsearch.randomizedtesting.annotations.Name; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.test.rest.ObjectPath; + +import java.io.IOException; +import java.time.Instant; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; + +public class FailureStoreUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase { + + public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) { + super(upgradedNodes); + } + + final String INDEX_TEMPLATE = """ + { + "index_patterns": ["$PATTERN"], + "data_stream": {}, + "template": { + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "numeral": { + "type": "long" + } + } + } + } + }"""; + + private static final String VALID_DOC = """ + {"@timestamp": "$now", "numeral": 0} + """; + + private static final String INVALID_DOC = """ + {"@timestamp": "$now", "numeral": "foobar"} + """; + + private static final String BULK = """ + {"create": {}} + {"@timestamp": "$now", "numeral": 0} + {"create": {}} + {"@timestamp": "$now", "numeral": 1} + {"create": {}} + {"@timestamp": "$now", "numeral": 2} + {"create": {}} + {"@timestamp": "$now", "numeral": 3} + {"create": {}} + {"@timestamp": "$now", "numeral": 4} + """; + + private static final String ENABLE_FAILURE_STORE_OPTIONS = """ + { + "failure_store": { + "enabled": true + } + } + """; + + public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception { + assumeFalse( + "testing migration from data streams created before failure store feature existed", + oldClusterHasFeature(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE) + ); + String dataStreamName = "fs-ds-upgrade-test"; + String failureStoreName = dataStreamName + "::failures"; + String templateName = "fs-ds-template"; + if (isOldCluster()) { + // Create template + var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName); + putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName)); + assertOK(client().performRequest(putIndexTemplateRequest)); + + // Initialize data stream + executeBulk(dataStreamName); + + // Ensure document failure + indexDoc(dataStreamName, INVALID_DOC, false); + + // Check data stream state + var dataStreams = getDataStream(dataStreamName); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); + String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); + assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1)); + + assertDocCount(client(), dataStreamName, 5); + } else if (isMixedCluster()) { + ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow")); + if (isFirstMixedCluster()) { + indexDoc(dataStreamName, VALID_DOC, true); + indexDoc(dataStreamName, INVALID_DOC, false); + } + assertDocCount(client(), dataStreamName, 6); + } else if (isUpgradedCluster()) { + ensureGreen(dataStreamName); + + // Ensure correct default failure store state for upgraded data stream + var dataStreams = getDataStream(dataStreamName); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue()); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty())); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true)); + + // Ensure invalid document is not indexed + indexDoc(dataStreamName, INVALID_DOC, false); + + // Enable failure store on upgraded data stream + var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options"); + putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS); + assertOK(client().performRequest(putOptionsRequest)); + + // Ensure correct enabled failure store state + dataStreams = getDataStream(dataStreamName); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue()); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty())); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true)); + + // Initialize failure store + int expectedFailureDocuments = 0; + if (randomBoolean()) { + // Index a failure via a mapping exception + indexDoc(dataStreamName, INVALID_DOC, true); + expectedFailureDocuments = 1; + } else { + // Manually rollover failure store to force initialization + var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover"); + assertOK(client().performRequest(failureStoreRolloverRequest)); + } + + // Ensure correct initialized failure store state + dataStreams = getDataStream(dataStreamName); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue()); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty()))); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false)); + + String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name"); + assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true)); + + assertDocCount(client(), dataStreamName, 6); + assertDocCount(client(), failureStoreName, expectedFailureDocuments); + } + } + + private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException { + var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now()))); + Response response = null; + try { + response = client().performRequest(indexRequest); + } catch (ResponseException re) { + response = re.getResponse(); + } + assertNotNull(response); + if (expectSuccess) { + assertOK(response); + } else { + assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201)))); + } + } + + private static void executeBulk(String dataStreamName) throws IOException { + var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk"); + bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now()))); + bulkRequest.addParameter("refresh", "true"); + Response response = null; + try { + response = client().performRequest(bulkRequest); + } catch (ResponseException re) { + response = re.getResponse(); + } + assertNotNull(response); + var responseBody = entityAsMap(response); + assertOK(response); + assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false)); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + + private static Map getDataStream(String dataStreamName) throws IOException { + var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName); + var response = client().performRequest(getDataStreamsRequest); + assertOK(response); + return entityAsMap(response); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index c318159c9ca4c..3d5dd4ce9e17c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -214,7 +214,12 @@ public DataStream( lifecycle, dataStreamOptions, new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent), - new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null) + new DataStreamIndices( + FAILURE_STORE_PREFIX, + List.copyOf(failureIndices), + (replicated == false && failureIndices.isEmpty()), + null + ) ); } @@ -283,8 +288,15 @@ public static DataStream read(StreamInput in) throws IOException { backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new)); } if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) { - failureIndicesBuilder.setRolloverOnWrite(in.readBoolean()) + // Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating + boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty()); + failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite) .setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new)); + } else { + // If we are reading from an older version that does not have these fields, just default + // to a reasonable value for rollover on write for the failure store + boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty(); + failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite); } DataStreamOptions dataStreamOptions; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { @@ -1490,7 +1502,11 @@ public void writeTo(StreamOutput out) throws IOException { new DataStreamIndices( FAILURE_STORE_PREFIX, args[13] != null ? (List) args[13] : List.of(), - args[14] != null && (boolean) args[14], + // If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or + // exists and is empty, then force the rollover on write field to true. If none of those conditions are met, + // then use the rollover on write value (args[14]) present in the parser. + ((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List) args[13]).isEmpty())) + || (args[14] != null && (boolean) args[14]), (DataStreamAutoShardingEvent) args[15] ) ) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java index e847273cd660a..268c9065fe7ea 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java @@ -201,13 +201,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta private static DataStream newDataStreamInstance(List backingIndices, List failureStoreIndices) { boolean isSystem = randomBoolean(); + boolean isReplicated = randomBoolean(); return DataStream.builder(randomAlphaOfLength(50), backingIndices) - .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build()) + .setFailureIndices( + DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices) + .setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty()) + .build() + ) .setGeneration(randomLongBetween(1, 1000)) .setMetadata(Map.of()) .setSystem(isSystem) .setHidden(isSystem || randomBoolean()) - .setReplicated(randomBoolean()) + .setReplicated(isReplicated) .build(); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java index bd63b6e70371d..b92c9ada0bc28 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java @@ -2473,7 +2473,7 @@ public void testToXContent() throws IOException { "system": false, "allow_custom_routing": false, "settings" : { }, - "failure_rollover_on_write": false, + "failure_rollover_on_write": true, "rollover_on_write": false } }, @@ -2740,7 +2740,7 @@ public void testToXContentMultiProject() throws IOException { "system": false, "allow_custom_routing": false, "settings" : { }, - "failure_rollover_on_write": false, + "failure_rollover_on_write": true, "rollover_on_write": false } }, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index e31d3912fa722..abaf363dfebe5 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -184,7 +184,11 @@ public static DataStream newInstance( .setReplicated(replicated) .setLifecycle(lifecycle) .setDataStreamOptions(dataStreamOptions) - .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build()) + .setFailureIndices( + DataStream.DataStreamIndices.failureIndicesBuilder(failureStores) + .setRolloverOnWrite((replicated == false) && (failureStores.isEmpty())) + .build() + ) .build(); } @@ -391,7 +395,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time ) .build(), DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices) - .setRolloverOnWrite(failureStore && replicated == false && randomBoolean()) + .setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean())) .setAutoShardingEvent( failureStore && randomBoolean() ? new DataStreamAutoShardingEvent(