Skip to content

Commit 013f363

Browse files
[8.x] Add cluster setting to enable failure store (#118662) (#119132)
This setting enables or disables the failure store for data streams based on matching the data stream name against a list of patterns. It acts as a default, and is overridden if the failure store is explicitly enabled or disabled either in a component template or using the data stream options API. (See the PR for explanations of some of the changes here.) (cherry picked from commit 97e6bb6) # Conflicts: # server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java # server/src/main/java/org/elasticsearch/node/NodeConstruction.java
1 parent 7ffbcaf commit 013f363

File tree

37 files changed

+1213
-168
lines changed

37 files changed

+1213
-168
lines changed

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.elasticsearch.client.Request;
1313
import org.elasticsearch.client.Response;
1414
import org.elasticsearch.client.ResponseException;
15+
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.rest.RestStatus;
1518
import org.junit.Before;
1619

1720
import java.io.IOException;
@@ -122,13 +125,25 @@ public void testExplicitlyResetDataStreamOptions() throws IOException {
122125
assertOK(client().performRequest(otherRequest));
123126
}
124127

125-
public void testEnableDisableFailureStore() throws IOException {
128+
public void testBehaviorWithEachFailureStoreOptionAndClusterSetting() throws IOException {
126129
{
130+
// Default data stream options
127131
assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));
128-
assertFailureStore(false, 1);
132+
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME);
129133
assertDataStreamOptions(null);
134+
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
135+
assertRedirectsDocWithBadMappingToFailureStore();
136+
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream");
137+
assertDataStreamOptions(null);
138+
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
139+
assertFailsDocWithBadMapping();
140+
setDataStreamFailureStoreClusterSetting(null); // should get same behaviour as when we set it to something non-matching
141+
assertDataStreamOptions(null);
142+
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
143+
assertFailsDocWithBadMapping();
130144
}
131145
{
146+
// Data stream options with failure store enabled
132147
Request enableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
133148
enableRequest.setJsonEntity("""
134149
{
@@ -137,11 +152,21 @@ public void testEnableDisableFailureStore() throws IOException {
137152
}
138153
}""");
139154
assertAcknowledged(client().performRequest(enableRequest));
140-
assertFailureStore(true, 1);
155+
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME);
156+
assertDataStreamOptions(true);
157+
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
158+
assertRedirectsDocWithBadMappingToFailureStore();
159+
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream"); // should have no effect as enabled in options
141160
assertDataStreamOptions(true);
161+
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
162+
assertRedirectsDocWithBadMappingToFailureStore();
163+
setDataStreamFailureStoreClusterSetting(null); // same as previous
164+
assertDataStreamOptions(true);
165+
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
166+
assertRedirectsDocWithBadMappingToFailureStore();
142167
}
143-
144168
{
169+
// Data stream options with failure store disabled
145170
Request disableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
146171
disableRequest.setJsonEntity("""
147172
{
@@ -150,13 +175,23 @@ public void testEnableDisableFailureStore() throws IOException {
150175
}
151176
}""");
152177
assertAcknowledged(client().performRequest(disableRequest));
153-
assertFailureStore(false, 1);
178+
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME); // should have no effect as disabled in options
154179
assertDataStreamOptions(false);
180+
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
181+
assertFailsDocWithBadMapping();
182+
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream");
183+
assertDataStreamOptions(false);
184+
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
185+
assertFailsDocWithBadMapping();
186+
setDataStreamFailureStoreClusterSetting(null);
187+
assertDataStreamOptions(false);
188+
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
189+
assertFailsDocWithBadMapping();
155190
}
156191
}
157192

158193
@SuppressWarnings("unchecked")
159-
private void assertFailureStore(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
194+
private void assertFailureStoreValuesInGetDataStreamResponse(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
160195
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
161196
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
162197
assertThat(dataStreams.size(), is(1));
@@ -198,4 +233,32 @@ private List<String> getIndices(Map<String, Object> response) {
198233
List<Map<String, String>> indices = (List<Map<String, String>>) response.get("indices");
199234
return indices.stream().map(index -> index.get("index_name")).toList();
200235
}
236+
237+
private static void setDataStreamFailureStoreClusterSetting(String value) throws IOException {
238+
updateClusterSettings(
239+
Settings.builder().put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), value).build()
240+
);
241+
}
242+
243+
private Response putDocumentWithBadMapping() throws IOException {
244+
Request request = new Request("POST", DATA_STREAM_NAME + "/_doc");
245+
request.setJsonEntity("""
246+
{
247+
"@timestamp": "not a timestamp",
248+
"foo": "bar"
249+
}
250+
""");
251+
return client().performRequest(request);
252+
}
253+
254+
private void assertRedirectsDocWithBadMappingToFailureStore() throws IOException {
255+
Response response = putDocumentWithBadMapping();
256+
String failureStoreResponse = (String) entityAsMap(response).get("failure_store");
257+
assertThat(failureStoreResponse, is("used"));
258+
}
259+
260+
private void assertFailsDocWithBadMapping() {
261+
ResponseException e = assertThrows(ResponseException.class, this::putDocumentWithBadMapping);
262+
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(RestStatus.BAD_REQUEST.getStatus()));
263+
}
201264
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2626
import org.elasticsearch.cluster.health.ClusterStateHealth;
2727
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
2829
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
2930
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
3031
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -64,6 +65,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction
6465
private final SystemIndices systemIndices;
6566
private final ClusterSettings clusterSettings;
6667
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
68+
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
6769
private final Client client;
6870

6971
@Inject
@@ -75,6 +77,7 @@ public TransportGetDataStreamsAction(
7577
IndexNameExpressionResolver indexNameExpressionResolver,
7678
SystemIndices systemIndices,
7779
DataStreamGlobalRetentionSettings globalRetentionSettings,
80+
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
7881
Client client
7982
) {
8083
super(
@@ -91,6 +94,7 @@ public TransportGetDataStreamsAction(
9194
this.systemIndices = systemIndices;
9295
this.globalRetentionSettings = globalRetentionSettings;
9396
clusterSettings = clusterService.getClusterSettings();
97+
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
9498
this.client = new OriginSettingClient(client, "stack");
9599
}
96100

@@ -122,6 +126,7 @@ public void onResponse(DataStreamsStatsAction.Response response) {
122126
systemIndices,
123127
clusterSettings,
124128
globalRetentionSettings,
129+
dataStreamFailureStoreSettings,
125130
maxTimestamps
126131
)
127132
);
@@ -134,7 +139,16 @@ public void onFailure(Exception e) {
134139
});
135140
} else {
136141
listener.onResponse(
137-
innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings, null)
142+
innerOperation(
143+
state,
144+
request,
145+
indexNameExpressionResolver,
146+
systemIndices,
147+
clusterSettings,
148+
globalRetentionSettings,
149+
dataStreamFailureStoreSettings,
150+
null
151+
)
138152
);
139153
}
140154
}
@@ -146,11 +160,16 @@ static GetDataStreamAction.Response innerOperation(
146160
SystemIndices systemIndices,
147161
ClusterSettings clusterSettings,
148162
DataStreamGlobalRetentionSettings globalRetentionSettings,
163+
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
149164
@Nullable Map<String, Long> maxTimestamps
150165
) {
151166
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
152167
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
153168
for (DataStream dataStream : dataStreams) {
169+
// For this action, we are returning whether the failure store is effectively enabled, either in metadata or by cluster setting.
170+
// Users can use the get data stream options API to find out whether it is explicitly enabled in metadata.
171+
boolean failureStoreEffectivelyEnabled = DataStream.isFailureStoreFeatureFlagEnabled()
172+
&& dataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings);
154173
final String indexTemplate;
155174
boolean indexTemplatePreferIlmValue = true;
156175
String ilmPolicyName = null;
@@ -254,6 +273,7 @@ public int compareTo(IndexInfo o) {
254273
dataStreamInfos.add(
255274
new GetDataStreamAction.Response.DataStreamInfo(
256275
dataStream,
276+
failureStoreEffectivelyEnabled,
257277
streamHealth.getStatus(),
258278
indexTemplate,
259279
ilmPolicyName,

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
102102

103103
Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo(
104104
logs,
105+
true,
105106
ClusterHealthStatus.GREEN,
106107
"index-template",
107108
null,
@@ -205,6 +206,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
205206

206207
Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo(
207208
logs,
209+
true,
208210
ClusterHealthStatus.GREEN,
209211
"index-template",
210212
null,
@@ -282,14 +284,15 @@ public void testManagedByDisplayValuesDontAccidentalyChange() {
282284

283285
private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) {
284286
var dataStream = instance.getDataStream();
287+
var failureStoreEffectivelyEnabled = instance.isFailureStoreEffectivelyEnabled();
285288
var status = instance.getDataStreamStatus();
286289
var indexTemplate = instance.getIndexTemplate();
287290
var ilmPolicyName = instance.getIlmPolicy();
288291
var timeSeries = instance.getTimeSeries();
289292
var indexSettings = instance.getIndexSettingsValues();
290293
var templatePreferIlm = instance.templatePreferIlmValue();
291294
var maximumTimestamp = instance.getMaximumTimestamp();
292-
switch (randomIntBetween(0, 7)) {
295+
switch (randomIntBetween(0, 8)) {
293296
case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance);
294297
case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values()));
295298
case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10);
@@ -314,9 +317,11 @@ private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance)
314317
case 7 -> maximumTimestamp = (maximumTimestamp == null)
315318
? randomNonNegativeLong()
316319
: (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null);
320+
case 8 -> failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled ? false : true;
317321
}
318322
return new Response.DataStreamInfo(
319323
dataStream,
324+
failureStoreEffectivelyEnabled,
320325
status,
321326
indexTemplate,
322327
ilmPolicyName,
@@ -355,6 +360,7 @@ private Response.DataStreamInfo generateRandomDataStreamInfo() {
355360
List<Tuple<Instant, Instant>> timeSeries = randomBoolean() ? generateRandomTimeSeries() : null;
356361
return new Response.DataStreamInfo(
357362
DataStreamTestHelper.randomInstance(),
363+
randomBoolean(),
358364
ClusterHealthStatus.GREEN,
359365
randomAlphaOfLengthBetween(2, 10),
360366
randomAlphaOfLengthBetween(2, 10),

0 commit comments

Comments
 (0)