Skip to content

Commit dcb47e6

Browse files
committed
Force rollover on write to true when failure indices list is empty.
1 parent a8d6582 commit dcb47e6

File tree

2 files changed

+237
-1
lines changed

2 files changed

+237
-1
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.Response;
16+
import org.elasticsearch.client.ResponseException;
17+
import org.elasticsearch.cluster.metadata.DataStream;
18+
import org.elasticsearch.common.time.DateFormatter;
19+
import org.elasticsearch.common.time.FormatNames;
20+
import org.elasticsearch.test.rest.ObjectPath;
21+
22+
import java.io.IOException;
23+
import java.time.Instant;
24+
import java.util.Map;
25+
26+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
27+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
28+
import static org.hamcrest.Matchers.anyOf;
29+
import static org.hamcrest.Matchers.empty;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.hasSize;
32+
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.not;
34+
import static org.hamcrest.Matchers.notNullValue;
35+
36+
public class FailureStoreUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
37+
38+
public FailureStoreUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
39+
super(upgradedNodes);
40+
}
41+
42+
final String INDEX_TEMPLATE = """
43+
{
44+
"index_patterns": ["$PATTERN"],
45+
"data_stream": {},
46+
"template": {
47+
"mappings":{
48+
"properties": {
49+
"@timestamp" : {
50+
"type": "date"
51+
},
52+
"numeral": {
53+
"type": "long"
54+
}
55+
}
56+
}
57+
}
58+
}""";
59+
60+
private static final String VALID_DOC =
61+
"""
62+
{"@timestamp": "$now", "numeral": 0}
63+
""";
64+
65+
private static final String INVALID_DOC =
66+
"""
67+
{"@timestamp": "$now", "numeral": "foobar"}
68+
""";
69+
70+
private static final String BULK =
71+
"""
72+
{"create": {}}
73+
{"@timestamp": "$now", "numeral": 0}
74+
{"create": {}}
75+
{"@timestamp": "$now", "numeral": 1}
76+
{"create": {}}
77+
{"@timestamp": "$now", "numeral": 2}
78+
{"create": {}}
79+
{"@timestamp": "$now", "numeral": 3}
80+
{"create": {}}
81+
{"@timestamp": "$now", "numeral": 4}
82+
""";
83+
84+
private static final String ENABLE_FAILURE_STORE_OPTIONS =
85+
"""
86+
{
87+
"failure_store": {
88+
"enabled": true
89+
}
90+
}
91+
""";
92+
93+
public void testFailureStoreOnPreviouslyExistingDataStream() throws Exception {
94+
assumeFalse(
95+
"testing migration from data streams created before failure store feature existed",
96+
oldClusterHasFeature(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)
97+
);
98+
String dataStreamName = "fs-ds-upgrade-test";
99+
String failureStoreName = dataStreamName + "::failures";
100+
String templateName = "fs-ds-template";
101+
if (isOldCluster()) {
102+
// Create template
103+
var putIndexTemplateRequest = new Request("POST", "/_index_template/" + templateName);
104+
putIndexTemplateRequest.setJsonEntity(INDEX_TEMPLATE.replace("$PATTERN", dataStreamName));
105+
assertOK(client().performRequest(putIndexTemplateRequest));
106+
107+
// Initialize data stream
108+
executeBulk(dataStreamName);
109+
110+
// Ensure document failure
111+
indexDoc(dataStreamName, INVALID_DOC, false);
112+
113+
// Check data stream state
114+
var dataStreams = getDataStream(dataStreamName);
115+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
116+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
117+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
118+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo(templateName));
119+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
120+
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
121+
assertThat(firstBackingIndex, backingIndexEqualTo(dataStreamName, 1));
122+
123+
assertDocCount(client(), dataStreamName, 5);
124+
} else if (isMixedCluster()) {
125+
ensureHealth(dataStreamName, request -> request.addParameter("wait_for_status", "yellow"));
126+
if (isFirstMixedCluster()) {
127+
indexDoc(dataStreamName, VALID_DOC, true);
128+
indexDoc(dataStreamName, INVALID_DOC, false);
129+
}
130+
assertDocCount(client(), dataStreamName, 6);
131+
} else if (isUpgradedCluster()) {
132+
ensureGreen(dataStreamName);
133+
134+
// Ensure correct default failure store state for upgraded data stream
135+
var dataStreams = getDataStream(dataStreamName);
136+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
137+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
138+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
139+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(false));
140+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
141+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
142+
143+
// Ensure invalid document is not indexed
144+
indexDoc(dataStreamName, INVALID_DOC, false);
145+
146+
// Enable failure store on upgraded data stream
147+
var putOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
148+
putOptionsRequest.setJsonEntity(ENABLE_FAILURE_STORE_OPTIONS);
149+
assertOK(client().performRequest(putOptionsRequest));
150+
151+
// Ensure correct enabled failure store state
152+
dataStreams = getDataStream(dataStreamName);
153+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
154+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
155+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
156+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
157+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(empty()));
158+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(true));
159+
160+
// Initialize failure store
161+
int expectedFailureDocuments = 0;
162+
if (randomBoolean()) {
163+
// Index a failure via a mapping exception
164+
indexDoc(dataStreamName, INVALID_DOC, true);
165+
expectedFailureDocuments = 1;
166+
} else {
167+
// Manually rollover failure store to force initialization
168+
var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
169+
assertOK(client().performRequest(failureStoreRolloverRequest));
170+
}
171+
172+
// Ensure correct initialized failure store state
173+
dataStreams = getDataStream(dataStreamName);
174+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
175+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
176+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
177+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
178+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
179+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));
180+
181+
String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
182+
assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));
183+
184+
assertDocCount(client(), dataStreamName, 6);
185+
assertDocCount(client(), failureStoreName, expectedFailureDocuments);
186+
}
187+
}
188+
189+
private static void indexDoc(String dataStreamName, String docBody, boolean expectSuccess) throws IOException {
190+
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
191+
indexRequest.addParameter("refresh", "true");
192+
indexRequest.setJsonEntity(docBody.replace("$now", formatInstant(Instant.now())));
193+
Response response = null;
194+
try {
195+
response = client().performRequest(indexRequest);
196+
} catch (ResponseException re) {
197+
response = re.getResponse();
198+
}
199+
assertNotNull(response);
200+
System.out.println(response);
201+
System.out.println(responseAsMap(response));
202+
if (expectSuccess) {
203+
assertOK(response);
204+
} else {
205+
assertThat(response.getStatusLine().getStatusCode(), not(anyOf(equalTo(200), equalTo(201))));
206+
}
207+
}
208+
209+
private static void executeBulk(String dataStreamName) throws IOException {
210+
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
211+
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
212+
bulkRequest.addParameter("refresh", "true");
213+
Response response = null;
214+
try {
215+
response = client().performRequest(bulkRequest);
216+
} catch (ResponseException re) {
217+
response = re.getResponse();
218+
}
219+
assertNotNull(response);
220+
var responseBody = entityAsMap(response);
221+
assertOK(response);
222+
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
223+
}
224+
225+
static String formatInstant(Instant instant) {
226+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
227+
}
228+
229+
private static Map<String, Object> getDataStream(String dataStreamName) throws IOException {
230+
var getDataStreamsRequest = new Request("GET", "/_data_stream/" + dataStreamName);
231+
var response = client().performRequest(getDataStreamsRequest);
232+
assertOK(response);
233+
return entityAsMap(response);
234+
}
235+
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1859,7 +1859,8 @@ protected DataStreamIndices(
18591859
// The list of indices is expected to be an immutable list. We don't create an immutable copy here, as it might have
18601860
// impact on the performance on some usages.
18611861
this.indices = indices;
1862-
this.rolloverOnWrite = rolloverOnWrite;
1862+
// There should never be a point where rollover on write is false if there are no indices present for this set
1863+
this.rolloverOnWrite = indices.isEmpty() || rolloverOnWrite;
18631864
this.autoShardingEvent = autoShardingEvent;
18641865

18651866
assert getLookup().size() == indices.size() : "found duplicate index entries in " + indices;

0 commit comments

Comments
 (0)