Skip to content

Commit 5ae52bf

Browse files
jbaieraelasticsearchmachinemasseyke
authored
Force rollover on write to true when data stream indices list is empty (elastic#133347)
Updates the serialization and construction logic for failure store components to bias rollover on write to true when no indices are present in the failure store index set. --------- Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: Keith Massey <[email protected]>
1 parent acdc6a4 commit 5ae52bf

File tree

6 files changed

+269
-9
lines changed

6 files changed

+269
-9
lines changed

docs/changelog/133347.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133347
2+
summary: Force rollover on write to true when data stream indices list is empty
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 133176
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.Response;
16+
import org.elasticsearch.client.ResponseException;
17+
import org.elasticsearch.cluster.metadata.DataStream;
18+
import org.elasticsearch.common.time.DateFormatter;
19+
import org.elasticsearch.common.time.FormatNames;
20+
import org.elasticsearch.test.rest.ObjectPath;
21+
22+
import java.io.IOException;
23+
import java.time.Instant;
24+
import java.util.Map;
25+
26+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
27+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
28+
import static org.hamcrest.Matchers.anyOf;
29+
import static org.hamcrest.Matchers.empty;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.hasSize;
32+
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.not;
34+
import static org.hamcrest.Matchers.notNullValue;
35+
36+
public class FailureStoreUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
37+
38+
public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
39+
super(upgradedNodes);
40+
}
41+
42+
final String INDEX_TEMPLATE = """
43+
{
44+
"index_patterns": ["$PATTERN"],
45+
"data_stream": {},
46+
"template": {
47+
"mappings":{
48+
"properties": {
49+
"@timestamp" : {
50+
"type": "date"
51+
},
52+
"numeral": {
53+
"type": "long"
54+
}
55+
}
56+
}
57+
}
58+
}""";
59+
60+
private static final String VALID_DOC = """
61+
{"@timestamp": "$now", "numeral": 0}
62+
""";
63+
64+
private static final String INVALID_DOC = """
65+
{"@timestamp": "$now", "numeral": "foobar"}
66+
""";
67+
68+
private static final String BULK = """
69+
{"create": {}}
70+
{"@timestamp": "$now", "numeral": 0}
71+
{"create": {}}
72+
{"@timestamp": "$now", "numeral": 1}
73+
{"create": {}}
74+
{"@timestamp": "$now", "numeral": 2}
75+
{"create": {}}
76+
{"@timestamp": "$now", "numeral": 3}
77+
{"create": {}}
78+
{"@timestamp": "$now", "numeral": 4}
79+
""";
80+
81+
private static final String ENABLE_FAILURE_STORE_OPTIONS = """
82+
{
83+
"failure_store": {
84+
"enabled": true
85+
}
86+
}
87+
""";
88+
89+
public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
90+
assumeFalse(
91+
"testing migration from data streams created before failure store feature existed",
92+
oldClusterHasFeature(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)
93+
);
94+
String dataStreamName = "fs-ds-upgrade-test";
95+
String failureStoreName = dataStreamName + "::failures";
96+
String templateName = "fs-ds-template";
97+
if (isOldCluster()) {
98+
// Create template
99+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
100+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
101+
assertOK(client().performRequest(putIndexTemplateRequest));
102+
103+
// Initialize data stream
104+
executeBulk(dataStreamName);
105+
106+
// Ensure document failure
107+
indexDoc(dataStreamName, INVALID_DOC, false);
108+
109+
// Check data stream state
110+
var dataStreams = getDataStream(dataStreamName);
111+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
112+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
113+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
114+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
115+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
116+
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
117+
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));
118+
119+
assertDocCount(client(), dataStreamName, 5);
120+
} else if (isMixedCluster()) {
121+
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
122+
if (isFirstMixedCluster()) {
123+
indexDoc(dataStreamName, VALID_DOC, true);
124+
indexDoc(dataStreamName, INVALID_DOC, false);
125+
}
126+
assertDocCount(client(), dataStreamName, 6);
127+
} else if (isUpgradedCluster()) {
128+
ensureGreen(dataStreamName);
129+
130+
// Ensure correct default failure store state for upgraded data stream
131+
var dataStreams = getDataStream(dataStreamName);
132+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
133+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
134+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
135+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
136+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
137+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
138+
139+
// Ensure invalid document is not indexed
140+
indexDoc(dataStreamName, INVALID_DOC, false);
141+
142+
// Enable failure store on upgraded data stream
143+
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
144+
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
145+
assertOK(client().performRequest(putOptionsRequest));
146+
147+
// Ensure correct enabled failure store state
148+
dataStreams = getDataStream(dataStreamName);
149+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
150+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
151+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
152+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
153+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
154+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
155+
156+
// Initialize failure store
157+
int expectedFailureDocuments = 0;
158+
if (randomBoolean()) {
159+
// Index a failure via a mapping exception
160+
indexDoc(dataStreamName, INVALID_DOC, true);
161+
expectedFailureDocuments = 1;
162+
} else {
163+
// Manually rollover failure store to force initialization
164+
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
165+
assertOK(client().performRequest(failureStoreRolloverRequest));
166+
}
167+
168+
// Ensure correct initialized failure store state
169+
dataStreams = getDataStream(dataStreamName);
170+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
171+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
172+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
173+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
174+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
175+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));
176+
177+
String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
178+
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));
179+
180+
assertDocCount(client(), dataStreamName, 6);
181+
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
182+
}
183+
}
184+
185+
private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
186+
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
187+
indexRequest.addParameter("refresh", "true");
188+
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
189+
Response response = null;
190+
try {
191+
response = client().performRequest(indexRequest);
192+
} catch (ResponseException re) {
193+
response = re.getResponse();
194+
}
195+
assertNotNull(response);
196+
if (expectSuccess) {
197+
assertOK(response);
198+
} else {
199+
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
200+
}
201+
}
202+
203+
private static void executeBulk(String dataStreamName) throws IOException {
204+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
205+
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
206+
bulkRequest.addParameter("refresh", "true");
207+
Response response = null;
208+
try {
209+
response = client().performRequest(bulkRequest);
210+
} catch (ResponseException re) {
211+
response = re.getResponse();
212+
}
213+
assertNotNull(response);
214+
var responseBody = entityAsMap(response);
215+
assertOK(response);
216+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
217+
}
218+
219+
static String formatInstant(Instant instant) {
220+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
221+
}
222+
223+
private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
224+
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
225+
var response = client().performRequest(getDataStreamsRequest);
226+
assertOK(response);
227+
return entityAsMap(response);
228+
}
229+
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,12 @@ public DataStream(
214214
lifecycle,
215215
dataStreamOptions,
216216
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
217-
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
217+
new DataStreamIndices(
218+
FAILURE_STORE_PREFIX,
219+
List.copyOf(failureIndices),
220+
(replicated == false && failureIndices.isEmpty()),
221+
null
222+
)
218223
);
219224
}
220225

@@ -283,8 +288,15 @@ public static DataStream read(StreamInput in) throws IOException {
283288
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
284289
}
285290
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
286-
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
291+
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
292+
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
293+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
287294
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
295+
} else {
296+
// If we are reading from an older version that does not have these fields, just default
297+
// to a reasonable value for rollover on write for the failure store
298+
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
299+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
288300
}
289301
DataStreamOptions dataStreamOptions;
290302
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
@@ -1490,7 +1502,11 @@ public void writeTo(StreamOutput out) throws IOException {
14901502
new DataStreamIndices(
14911503
FAILURE_STORE_PREFIX,
14921504
args[13] != null ? (List<Index>) args[13] : List.of(),
1493-
args[14] != null && (boolean) args[14],
1505+
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
1506+
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
1507+
// then use the rollover on write value (args[14]) present in the parser.
1508+
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
1509+
|| (args[14] != null && (boolean) args[14]),
14941510
(DataStreamAutoShardingEvent) args[15]
14951511
)
14961512
)

server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,18 @@ private IndexMetadata newIndexMetadata(String indexName, AliasMetadata aliasMeta
201201

202202
private static DataStream newDataStreamInstance(List<Index> backingIndices, List<Index> failureStoreIndices) {
203203
boolean isSystem = randomBoolean();
204+
boolean isReplicated = randomBoolean();
204205
return DataStream.builder(randomAlphaOfLength(50), backingIndices)
205-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build())
206+
.setFailureIndices(
207+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices)
208+
.setRolloverOnWrite(isReplicated == false && failureStoreIndices.isEmpty())
209+
.build()
210+
)
206211
.setGeneration(randomLongBetween(1, 1000))
207212
.setMetadata(Map.of())
208213
.setSystem(isSystem)
209214
.setHidden(isSystem || randomBoolean())
210-
.setReplicated(randomBoolean())
215+
.setReplicated(isReplicated)
211216
.build();
212217
}
213218
}

server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2473,7 +2473,7 @@ public void testToXContent() throws IOException {
24732473
"system": false,
24742474
"allow_custom_routing": false,
24752475
"settings" : { },
2476-
"failure_rollover_on_write": false,
2476+
"failure_rollover_on_write": true,
24772477
"rollover_on_write": false
24782478
}
24792479
},
@@ -2740,7 +2740,7 @@ public void testToXContentMultiProject() throws IOException {
27402740
"system": false,
27412741
"allow_custom_routing": false,
27422742
"settings" : { },
2743-
"failure_rollover_on_write": false,
2743+
"failure_rollover_on_write": true,
27442744
"rollover_on_write": false
27452745
}
27462746
},

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,11 @@ public static DataStream newInstance(
184184
.setReplicated(replicated)
185185
.setLifecycle(lifecycle)
186186
.setDataStreamOptions(dataStreamOptions)
187-
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
187+
.setFailureIndices(
188+
DataStream.DataStreamIndices.failureIndicesBuilder(failureStores)
189+
.setRolloverOnWrite((replicated == false) && (failureStores.isEmpty()))
190+
.build()
191+
)
188192
.build();
189193
}
190194

@@ -391,7 +395,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time
391395
)
392396
.build(),
393397
DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
394-
.setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
398+
.setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean()))
395399
.setAutoShardingEvent(
396400
failureStore && randomBoolean()
397401
? new DataStreamAutoShardingEvent(

0 commit comments

Comments
 (0)