Skip to content

Commit 65a4808

Browse files
jbaieraelasticsearchmachinemasseyke
authored
Force rollover on write to true when data stream indices list is empty (#133347) (#133406)
Updates the serialization and construction logic for failure store components to bias rollover on write to true when no indices are present in the failure store index set. --------- (cherry picked from commit 5ae52bf) Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: Keith Massey <[email protected]>
1 parent 179d843 commit 65a4808

File tree

6 files changed

+269
-9
lines changed

6 files changed

+269
-9
lines changed

docs/changelog/133347.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133347
2+
summary: Force rollover on write to true when data stream indices list is empty
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 133176
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.Response;
16+
import org.elasticsearch.client.ResponseException;
17+
import org.elasticsearch.cluster.metadata.DataStream;
18+
import org.elasticsearch.common.time.DateFormatter;
19+
import org.elasticsearch.common.time.FormatNames;
20+
import org.elasticsearch.test.rest.ObjectPath;
21+
22+
import java.io.IOException;
23+
import java.time.Instant;
24+
import java.util.Map;
25+
26+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
27+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
28+
import static org.hamcrest.Matchers.anyOf;
29+
import static org.hamcrest.Matchers.empty;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.hasSize;
32+
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.not;
34+
import static org.hamcrest.Matchers.notNullValue;
35+
36+
public class FailureStoreUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
37+
38+
public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
39+
super(upgradedNodes);
40+
}
41+
42+
final String INDEX_TEMPLATE = """
43+
{
44+
"index_patterns": ["$PATTERN"],
45+
"data_stream": {},
46+
"template": {
47+
"mappings":{
48+
"properties": {
49+
"@timestamp" : {
50+
"type": "date"
51+
},
52+
"numeral": {
53+
"type": "long"
54+
}
55+
}
56+
}
57+
}
58+
}""";
59+
60+
private static final String VALID_DOC = """
61+
{"@timestamp": "$now", "numeral": 0}
62+
""";
63+
64+
private static final String INVALID_DOC = """
65+
{"@timestamp": "$now", "numeral": "foobar"}
66+
""";
67+
68+
private static final String BULK = """
69+
{"create": {}}
70+
{"@timestamp": "$now", "numeral": 0}
71+
{"create": {}}
72+
{"@timestamp": "$now", "numeral": 1}
73+
{"create": {}}
74+
{"@timestamp": "$now", "numeral": 2}
75+
{"create": {}}
76+
{"@timestamp": "$now", "numeral": 3}
77+
{"create": {}}
78+
{"@timestamp": "$now", "numeral": 4}
79+
""";
80+
81+
private static final String ENABLE_FAILURE_STORE_OPTIONS = """
82+
{
83+
"failure_store": {
84+
"enabled": true
85+
}
86+
}
87+
""";
88+
89+
public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
90+
assumeFalse(
91+
"testing migration from data streams created before failure store feature existed",
92+
oldClusterHasFeature(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)
93+
);
94+
String dataStreamName = "fs-ds-upgrade-test";
95+
String failureStoreName = dataStreamName + "::failures";
96+
String templateName = "fs-ds-template";
97+
if (isOldCluster()) {
98+
// Create template
99+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
100+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
101+
assertOK(client().performRequest(putIndexTemplateRequest));
102+
103+
// Initialize data stream
104+
executeBulk(dataStreamName);
105+
106+
// Ensure document failure
107+
indexDoc(dataStreamName, INVALID_DOC, false);
108+
109+
// Check data stream state
110+
var dataStreams = getDataStream(dataStreamName);
111+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
112+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
113+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
114+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
115+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
116+
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
117+
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));
118+
119+
assertDocCount(client(), dataStreamName, 5);
120+
} else if (isMixedCluster()) {
121+
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
122+
if (isFirstMixedCluster()) {
123+
indexDoc(dataStreamName, VALID_DOC, true);
124+
indexDoc(dataStreamName, INVALID_DOC, false);
125+
}
126+
assertDocCount(client(), dataStreamName, 6);
127+
} else if (isUpgradedCluster()) {
128+
ensureGreen(dataStreamName);
129+
130+
// Ensure correct default failure store state for upgraded data stream
131+
var dataStreams = getDataStream(dataStreamName);
132+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
133+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
134+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
135+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
136+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
137+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
138+
139+
// Ensure invalid document is not indexed
140+
indexDoc(dataStreamName, INVALID_DOC, false);
141+
142+
// Enable failure store on upgraded data stream
143+
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
144+
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
145+
assertOK(client().performRequest(putOptionsRequest));
146+
147+
// Ensure correct enabled failure store state
148+
dataStreams = getDataStream(dataStreamName);
149+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
150+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
151+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
152+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
153+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
154+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
155+
156+
// Initialize failure store
157+
int expectedFailureDocuments = 0;
158+
if (randomBoolean()) {
159+
// Index a failure via a mapping exception
160+
indexDoc(dataStreamName, INVALID_DOC, true);
161+
expectedFailureDocuments = 1;
162+
} else {
163+
// Manually rollover failure store to force initialization
164+
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
165+
assertOK(client().performRequest(failureStoreRolloverRequest));
166+
}
167+
168+
// Ensure correct initialized failure store state
169+
dataStreams = getDataStream(dataStreamName);
170+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
171+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
172+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
173+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
174+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
175+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));
176+
177+
String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
178+
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));
179+
180+
assertDocCount(client(), dataStreamName, 6);
181+
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
182+
}
183+
}
184+
185+
private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
186+
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
187+
indexRequest.addParameter("refresh", "true");
188+
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
189+
Response response = null;
190+
try {
191+
response = client().performRequest(indexRequest);
192+
} catch (ResponseException re) {
193+
response = re.getResponse();
194+
}
195+
assertNotNull(response);
196+
if (expectSuccess) {
197+
assertOK(response);
198+
} else {
199+
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
200+
}
201+
}
202+
203+
private static void executeBulk(String dataStreamName) throws IOException {
204+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
205+
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
206+
bulkRequest.addParameter("refresh", "true");
207+
Response response = null;
208+
try {
209+
response = client().performRequest(bulkRequest);
210+
} catch (ResponseException re) {
211+
response = re.getResponse();
212+
}
213+
assertNotNull(response);
214+
var responseBody = entityAsMap(response);
215+
assertOK(response);
216+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
217+
}
218+
219+
static String formatInstant(Instant instant) {
220+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
221+
}
222+
223+
private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
224+
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
225+
var response = client().performRequest(getDataStreamsRequest);
226+
assertOK(response);
227+
return entityAsMap(response);
228+
}
229+
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,12 @@ public DataStream(
211211
lifecycle,
212212
dataStreamOptions,
213213
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
214-
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
214+
new DataStreamIndices(
215+
FAILURE_STORE_PREFIX,
216+
List.copyOf(failureIndices),
217+
(replicated == false && failureIndices.isEmpty()),
218+
null
219+
)
215220
);
216221
}
217222

@@ -280,8 +285,15 @@ public static DataStream read(StreamInput in) throws IOException {
280285
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
281286
}
282287
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
283-
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
288+
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
289+
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
290+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
284291
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
292+
} else {
293+
// If we are reading from an older version that does not have these fields, just default
294+
// to a reasonable value for rollover on write for the failure store
295+
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
296+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
285297
}
286298
DataStreamOptions dataStreamOptions;
287299
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
@@ -1434,7 +1446,11 @@ public void writeTo(StreamOutput out) throws IOException {
14341446
new DataStreamIndices(
14351447
FAILURE_STORE_PREFIX,
14361448
args[13] != null ? (List<Index>) args[13] : List.of(),
1437-
args[14] != null && (boolean) args[14],
1449+
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
1450+
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
1451+
// then use the rollover on write value (args[14]) present in the parser.
1452+
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
1453+
|| (args[14] != null && (boolean) args[14]),
14381454
(DataStreamAutoShardingEvent) args[15]
14391455
)
14401456
)

server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta
201201

202202
private static DataStream newDataStreamInstance(List<Index> backingIndices, List<Index> failureStoreIndices) {
203203
boolean isSystem = randomBoolean();
204+
boolean isReplicated = randomBoolean();
204205
return DataStream.builder(randomAlphaOfLength(50), backingIndices)
205-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build())
206+
.setFailureIndices(
207+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices)
208+
.setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty())
209+
.build()
210+
)
206211
.setGeneration(randomLongBetween(1, 1000))
207212
.setMetadata(Map.of())
208213
.setSystem(isSystem)
209214
.setHidden(isSystem || randomBoolean())
210-
.setReplicated(randomBoolean())
215+
.setReplicated(isReplicated)
211216
.build();
212217
}
213218
}

server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public void testToXContent() throws IOException {
286286
"system": false,
287287
"allow_custom_routing": false,
288288
"settings" : { },
289-
"failure_rollover_on_write": false,
289+
"failure_rollover_on_write": true,
290290
"rollover_on_write": false
291291
}
292292
},
@@ -553,7 +553,7 @@ public void testToXContentMultiProject() throws IOException {
553553
"system": false,
554554
"allow_custom_routing": false,
555555
"settings" : { },
556-
"failure_rollover_on_write": false,
556+
"failure_rollover_on_write": true,
557557
"rollover_on_write": false
558558
}
559559
},

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,11 @@ public static DataStream newInstance(
183183
.setReplicated(replicated)
184184
.setLifecycle(lifecycle)
185185
.setDataStreamOptions(dataStreamOptions)
186-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
186+
.setFailureIndices(
187+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStores)
188+
.setRolloverOnWrite((replicated == false) && (failureStores.isEmpty()))
189+
.build()
190+
)
187191
.build();
188192
}
189193

@@ -390,7 +394,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time
390394
)
391395
.build(),
392396
DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
393-
.setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
397+
.setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean()))
394398
.setAutoShardingEvent(
395399
failureStore && randomBoolean()
396400
? new DataStreamAutoShardingEvent(

0 commit comments

Comments
 (0)