Skip to content

Commit d739724

Browse files
authored
[8.19] Force rollover on write to true when data stream indices list is empty (#133347) (#133407)
Updates the serialization and construction logic for failure store components to bias rollover on write to true when no indices are present in the failure store index set.
1 parent f17d3d0 commit d739724

File tree

5 files changed

+266
-7
lines changed

5 files changed

+266
-7
lines changed

docs/changelog/133347.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133347
2+
summary: Force rollover on write to true when data stream indices list is empty
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 133176
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.Response;
16+
import org.elasticsearch.client.ResponseException;
17+
import org.elasticsearch.common.time.DateFormatter;
18+
import org.elasticsearch.common.time.FormatNames;
19+
import org.elasticsearch.test.rest.ObjectPath;
20+
21+
import java.io.IOException;
22+
import java.time.Instant;
23+
import java.util.Map;
24+
25+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
26+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
27+
import static org.hamcrest.Matchers.anyOf;
28+
import static org.hamcrest.Matchers.empty;
29+
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.hasSize;
31+
import static org.hamcrest.Matchers.is;
32+
import static org.hamcrest.Matchers.not;
33+
import static org.hamcrest.Matchers.notNullValue;
34+
35+
public class FailureStoreUpgradeIT extends AbstractRollingUpgradeTestCase {
36+
37+
public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
38+
super(upgradedNodes);
39+
}
40+
41+
final String INDEX_TEMPLATE = """
42+
{
43+
"index_patterns": ["$PATTERN"],
44+
"data_stream": {},
45+
"template": {
46+
"mappings":{
47+
"properties": {
48+
"@timestamp" : {
49+
"type": "date"
50+
},
51+
"numeral": {
52+
"type": "long"
53+
}
54+
}
55+
}
56+
}
57+
}""";
58+
59+
private static final String VALID_DOC = """
60+
{"@timestamp": "$now", "numeral": 0}
61+
""";
62+
63+
private static final String INVALID_DOC = """
64+
{"@timestamp": "$now", "numeral": "foobar"}
65+
""";
66+
67+
private static final String BULK = """
68+
{"create": {}}
69+
{"@timestamp": "$now", "numeral": 0}
70+
{"create": {}}
71+
{"@timestamp": "$now", "numeral": 1}
72+
{"create": {}}
73+
{"@timestamp": "$now", "numeral": 2}
74+
{"create": {}}
75+
{"@timestamp": "$now", "numeral": 3}
76+
{"create": {}}
77+
{"@timestamp": "$now", "numeral": 4}
78+
""";
79+
80+
private static final String ENABLE_FAILURE_STORE_OPTIONS = """
81+
{
82+
"failure_store": {
83+
"enabled": true
84+
}
85+
}
86+
""";
87+
88+
public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
89+
assumeFalse(
90+
"testing migration from data streams created before failure store feature state was added",
91+
oldClusterHasFeature("gte_v8.15.0")
92+
);
93+
String dataStreamName = "fs-ds-upgrade-test";
94+
String failureStoreName = dataStreamName + "::failures";
95+
String templateName = "fs-ds-template";
96+
if (isOldCluster()) {
97+
// Create template
98+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
99+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
100+
assertOK(client().performRequest(putIndexTemplateRequest));
101+
102+
// Initialize data stream
103+
executeBulk(dataStreamName);
104+
105+
// Ensure document failure
106+
indexDoc(dataStreamName, INVALID_DOC, false);
107+
108+
// Check data stream state
109+
var dataStreams = getDataStream(dataStreamName);
110+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
111+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
112+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
113+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
114+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
115+
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
116+
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));
117+
118+
assertDocCount(client(), dataStreamName, 5);
119+
} else if (isMixedCluster()) {
120+
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
121+
if (isFirstMixedCluster()) {
122+
indexDoc(dataStreamName, VALID_DOC, true);
123+
indexDoc(dataStreamName, INVALID_DOC, false);
124+
}
125+
assertDocCount(client(), dataStreamName, 6);
126+
} else if (isUpgradedCluster()) {
127+
ensureGreen(dataStreamName);
128+
129+
// Ensure correct default failure store state for upgraded data stream
130+
var dataStreams = getDataStream(dataStreamName);
131+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
132+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
133+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
134+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
135+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
136+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
137+
138+
// Ensure invalid document is not indexed
139+
indexDoc(dataStreamName, INVALID_DOC, false);
140+
141+
// Enable failure store on upgraded data stream
142+
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
143+
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
144+
assertOK(client().performRequest(putOptionsRequest));
145+
146+
// Ensure correct enabled failure store state
147+
dataStreams = getDataStream(dataStreamName);
148+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
149+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
150+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
151+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
152+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
153+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
154+
155+
// Initialize failure store
156+
int expectedFailureDocuments = 0;
157+
if (randomBoolean()) {
158+
// Index a failure via a mapping exception
159+
indexDoc(dataStreamName, INVALID_DOC, true);
160+
expectedFailureDocuments = 1;
161+
} else {
162+
// Manually rollover failure store to force initialization
163+
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
164+
assertOK(client().performRequest(failureStoreRolloverRequest));
165+
}
166+
167+
// Ensure correct initialized failure store state
168+
dataStreams = getDataStream(dataStreamName);
169+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
170+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
171+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
172+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
173+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
174+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));
175+
176+
String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
177+
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));
178+
179+
assertDocCount(client(), dataStreamName, 6);
180+
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
181+
}
182+
}
183+
184+
private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
185+
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
186+
indexRequest.addParameter("refresh", "true");
187+
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
188+
Response response = null;
189+
try {
190+
response = client().performRequest(indexRequest);
191+
} catch (ResponseException re) {
192+
response = re.getResponse();
193+
}
194+
assertNotNull(response);
195+
if (expectSuccess) {
196+
assertOK(response);
197+
} else {
198+
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
199+
}
200+
}
201+
202+
private static void executeBulk(String dataStreamName) throws IOException {
203+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
204+
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
205+
bulkRequest.addParameter("refresh", "true");
206+
Response response = null;
207+
try {
208+
response = client().performRequest(bulkRequest);
209+
} catch (ResponseException re) {
210+
response = re.getResponse();
211+
}
212+
assertNotNull(response);
213+
var responseBody = entityAsMap(response);
214+
assertOK(response);
215+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
216+
}
217+
218+
static String formatInstant(Instant instant) {
219+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
220+
}
221+
222+
private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
223+
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
224+
var response = client().performRequest(getDataStreamsRequest);
225+
assertOK(response);
226+
return entityAsMap(response);
227+
}
228+
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,12 @@ public DataStream(
191191
lifecycle,
192192
dataStreamOptions,
193193
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
194-
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
194+
new DataStreamIndices(
195+
FAILURE_STORE_PREFIX,
196+
List.copyOf(failureIndices),
197+
(replicated == false && failureIndices.isEmpty()),
198+
null
199+
)
195200
);
196201
}
197202

@@ -260,8 +265,15 @@ public static DataStream read(StreamInput in) throws IOException {
260265
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
261266
}
262267
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
263-
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
268+
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
269+
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
270+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
264271
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
272+
} else {
273+
// If we are reading from an older version that does not have these fields, just default
274+
// to a reasonable value for rollover on write for the failure store
275+
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
276+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
265277
}
266278
DataStreamOptions dataStreamOptions;
267279
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
@@ -1371,7 +1383,11 @@ public void writeTo(StreamOutput out) throws IOException {
13711383
new DataStreamIndices(
13721384
FAILURE_STORE_PREFIX,
13731385
args[13] != null ? (List<Index>) args[13] : List.of(),
1374-
args[14] != null && (boolean) args[14],
1386+
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
1387+
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
1388+
// then use the rollover on write value (args[14]) present in the parser.
1389+
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
1390+
|| (args[14] != null && (boolean) args[14]),
13751391
(DataStreamAutoShardingEvent) args[15]
13761392
)
13771393
)

server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta
199199

200200
private static DataStream newDataStreamInstance(List<Index> backingIndices, List<Index> failureStoreIndices) {
201201
boolean isSystem = randomBoolean();
202+
boolean isReplicated = randomBoolean();
202203
return DataStream.builder(randomAlphaOfLength(50), backingIndices)
203-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build())
204+
.setFailureIndices(
205+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices)
206+
.setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty())
207+
.build()
208+
)
204209
.setGeneration(randomLongBetween(1, 1000))
205210
.setMetadata(Map.of())
206211
.setSystem(isSystem)
207212
.setHidden(isSystem || randomBoolean())
208-
.setReplicated(randomBoolean())
213+
.setReplicated(isReplicated)
209214
.build();
210215
}
211216
}

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,11 @@ public static DataStream newInstance(
179179
.setReplicated(replicated)
180180
.setLifecycle(lifecycle)
181181
.setDataStreamOptions(dataStreamOptions)
182-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
182+
.setFailureIndices(
183+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStores)
184+
.setRolloverOnWrite((replicated == false) && (failureStores.isEmpty()))
185+
.build()
186+
)
183187
.build();
184188
}
185189

@@ -394,7 +398,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time
394398
)
395399
.build(),
396400
DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
397-
.setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
401+
.setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean()))
398402
.setAutoShardingEvent(
399403
failureStore && randomBoolean()
400404
? new DataStreamAutoShardingEvent(

0 commit comments

Comments
 (0)