Skip to content

Commit a2e1925

Browse files
gmaroulielasticsearchmachine
andauthored
[8.19] [Failure store] Introduce dedicated failure store lifecycle configuration (#127314) (#127577)
* [Failure store] Introduce dedicated failure store lifecycle configuration (#127314) The failure store is a set of data stream indices that are used to store certain type of ingestion failures. Until this moment they were sharing the configuration of the backing indices. We understand that the two data sets have different lifecycle needs. We believe that typically the failures will need to be retained much less than the data. Considering this we believe the lifecycle needs of the failures also more limited and they fit better the simplicity of the data stream lifecycle feature. This allows the user to only set the desired retention and we will perform the rollover and other maintenance tasks without the user having to think about them. Furthermore, having only one lifecycle management feature allows us to ensure that these data is managed by default. This PR introduces the following: Configuration We extend the failure store configuration to allow lifecycle configuration too, this configuration reflects the user's configuration only as shown below: PUT _data_stream/*/options { "failure_store": { "lifecycle": { "data_retention": "5d" } } } GET _data_stream/*/options { "data_streams": [ { "name": "my-ds", "options": { "failure_store": { "lifecycle": { "data_retention": "5d" } } } } ] } To retrieve the effective configuration you need to use the GET data streams API, see #126668 Functionality The data stream lifecycle (DLM) will manage the failure indices regardless if the failure store is enabled or not. This will ensure that if the failure store gets disabled we will not have stagnant data. The data stream options APIs reflect only the user's configuration. The GET data stream API should be used to check the current state of the effective failure store configuration. Telemetry We extend the data stream failure store telemetry to also include the lifecycle telemetry. { "data_streams": { "available": true, "enabled": true, "data_streams": 10, "indices_count": 50, "failure_store": { "explicitly_enabled_count": 1, "effectively_enabled_count": 15, "failure_indices_count": 30 "lifecycle": { "explicitly_enabled_count": 5, "effectively_enabled_count": 20, "data_retention": { "configured_data_streams": 5, "minimum_millis": X, "maximum_millis": Y, "average_millis": Z, }, "effective_retention": { "retained_data_streams": 20, "minimum_millis": X, "maximum_millis": Y, "average_millis": Z }, "global_retention": { "max": { "defined": false }, "default": { "defined": true, <------ this is the default value applicable for the failure store "millis": X } } } } } Implementation details We ensure that partially reset failure store will create valid failure store configuration. We ensure that when a node communicates with a note with a previous version it will ensure it will not send an invalid failure store configuration enabled: null. (cherry picked from commit 03d7781) # Conflicts: # modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java # modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java # modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java # server/src/main/java/org/elasticsearch/TransportVersions.java # server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java # server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent ea83e41 commit a2e1925

File tree

45 files changed

+1913
-659
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1913
-659
lines changed

docs/changelog/127314.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127314
2+
summary: "[Failure store] Introduce dedicated failure store lifecycle configuration"
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,5 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
7878
task.skipTest("data_stream/210_rollover_failure_store/Lazily roll over a data stream's failure store after an ingest failure", "Rolling over a data stream using target_failure_store is no longer supported.")
7979
task.skipTest("data_stream/210_rollover_failure_store/Lazily roll over a data stream's failure store after a shard failure", "Rolling over a data stream using target_failure_store is no longer supported.")
8080
task.skipTest("data_stream/210_rollover_failure_store/Roll over a data stream's failure store without conditions", "Rolling over a data stream using target_failure_store is no longer supported.")
81+
task.skipTest("data_stream/240_failure_store_info/Get failure store info from explicitly enabled failure store and disabled lifecycle", "failure store lifecycle is not using anymore the data stream lifecycle")
8182
})

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,20 @@
2828
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
2929
import org.elasticsearch.action.datastreams.GetDataStreamAction;
3030
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
31+
import org.elasticsearch.action.datastreams.PutDataStreamOptionsAction;
3132
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
3233
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
3334
import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle;
3435
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
35-
import org.elasticsearch.action.downsample.DownsampleConfig;
3636
import org.elasticsearch.action.index.IndexRequest;
3737
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
3838
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
3939
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
4040
import org.elasticsearch.cluster.metadata.DataStream;
4141
import org.elasticsearch.cluster.metadata.DataStreamAction;
42+
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
4243
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
43-
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
44+
import org.elasticsearch.cluster.metadata.DataStreamOptions;
4445
import org.elasticsearch.cluster.metadata.IndexMetadata;
4546
import org.elasticsearch.cluster.metadata.Template;
4647
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -70,7 +71,6 @@
7071
import org.elasticsearch.plugins.Plugin;
7172
import org.elasticsearch.plugins.SystemIndexPlugin;
7273
import org.elasticsearch.rest.RestStatus;
73-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
7474
import org.elasticsearch.test.ESIntegTestCase;
7575
import org.elasticsearch.test.transport.MockTransportService;
7676
import org.elasticsearch.xcontent.ToXContent;
@@ -1030,17 +1030,8 @@ public void testReenableDataStreamLifecycle() throws Exception {
10301030
}
10311031

10321032
public void testLifecycleAppliedToFailureStore() throws Exception {
1033-
// We configure a lifecycle with downsampling to ensure it doesn't fail
1034-
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder()
1033+
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.failuresLifecycleBuilder()
10351034
.dataRetention(TimeValue.timeValueSeconds(20))
1036-
.downsampling(
1037-
List.of(
1038-
new DataStreamLifecycle.DownsamplingRound(
1039-
TimeValue.timeValueMillis(10),
1040-
new DownsampleConfig(new DateHistogramInterval("10m"))
1041-
)
1042-
)
1043-
)
10441035
.buildTemplate();
10451036

10461037
putComposableIndexTemplate("id1", """
@@ -1054,7 +1045,7 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
10541045
"type": "boolean"
10551046
}
10561047
}
1057-
}""", List.of("metrics-fs*"), Settings.builder().put("index.number_of_replicas", 0).build(), null, lifecycle, true);
1048+
}""", List.of("metrics-fs*"), Settings.builder().put("index.number_of_replicas", 0).build(), null, null, lifecycle, true);
10581049

10591050
String dataStreamName = "metrics-fs";
10601051
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
@@ -1109,7 +1100,7 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
11091100
);
11101101
});
11111102

1112-
updateLifecycle(dataStreamName, TimeValue.timeValueSeconds(1));
1103+
updateFailureStoreConfiguration(dataStreamName, true, TimeValue.timeValueSeconds(1));
11131104

11141105
// And finally apply retention
11151106
assertBusy(() -> {
@@ -1202,7 +1193,20 @@ static void putComposableIndexTemplate(
12021193
List<String> patterns,
12031194
@Nullable Settings settings,
12041195
@Nullable Map<String, Object> metadata,
1205-
@Nullable DataStreamLifecycle.Template lifecycle,
1196+
@Nullable DataStreamLifecycle.Template dataLifecycle,
1197+
boolean withFailureStore
1198+
) throws IOException {
1199+
putComposableIndexTemplate(id, mappings, patterns, settings, metadata, dataLifecycle, null, withFailureStore);
1200+
}
1201+
1202+
static void putComposableIndexTemplate(
1203+
String id,
1204+
@Nullable String mappings,
1205+
List<String> patterns,
1206+
@Nullable Settings settings,
1207+
@Nullable Map<String, Object> metadata,
1208+
@Nullable DataStreamLifecycle.Template dataLifecycle,
1209+
@Nullable DataStreamLifecycle.Template failuresLifecycle,
12061210
boolean withFailureStore
12071211
) throws IOException {
12081212
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
@@ -1213,8 +1217,10 @@ static void putComposableIndexTemplate(
12131217
Template.builder()
12141218
.settings(settings)
12151219
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
1216-
.lifecycle(lifecycle)
1217-
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore))
1220+
.lifecycle(dataLifecycle)
1221+
.dataStreamOptions(
1222+
new DataStreamOptions.Template(new DataStreamFailureStore.Template(withFailureStore, failuresLifecycle))
1223+
)
12181224
)
12191225
.metadata(metadata)
12201226
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
@@ -1233,6 +1239,16 @@ static void updateLifecycle(String dataStreamName, TimeValue dataRetention) {
12331239
assertAcked(client().execute(PutDataStreamLifecycleAction.INSTANCE, putDataLifecycleRequest));
12341240
}
12351241

1242+
static void updateFailureStoreConfiguration(String dataStreamName, boolean enabled, TimeValue retention) {
1243+
PutDataStreamOptionsAction.Request putDataOptionsRequest = new PutDataStreamOptionsAction.Request(
1244+
TEST_REQUEST_TIMEOUT,
1245+
TEST_REQUEST_TIMEOUT,
1246+
new String[] { dataStreamName },
1247+
new DataStreamFailureStore(enabled, DataStreamLifecycle.failuresLifecycleBuilder().dataRetention(retention).build())
1248+
);
1249+
assertAcked(client().execute(PutDataStreamOptionsAction.INSTANCE, putDataOptionsRequest));
1250+
}
1251+
12361252
/*
12371253
* This test plugin adds `.system-test` as a known system data stream. The data stream is not created by this plugin. But if it is
12381254
* created, it will be a system data stream.

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public void testExplainFailuresLifecycle() throws Exception {
264264
List.of("metrics-foo*"),
265265
null,
266266
null,
267-
DataStreamLifecycle.Template.DATA_DEFAULT,
267+
null,
268268
new DataStreamOptions.Template(DataStreamFailureStore.builder().enabled(true).buildTemplate())
269269
);
270270
String dataStreamName = "metrics-foo";
@@ -354,26 +354,30 @@ public void testExplainFailuresLifecycle() throws Exception {
354354
).actionGet();
355355
assertThat(response.getIndices().size(), is(1));
356356
for (ExplainIndexDataStreamLifecycle explainIndex : response.getIndices()) {
357-
assertThat(explainIndex.isManagedByLifecycle(), is(true));
358-
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
359-
assertThat(explainIndex.getLifecycle(), notNullValue());
360-
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());
361-
362-
if (internalCluster().numDataNodes() > 1) {
363-
// If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run
364-
assertThat(explainIndex.getError(), nullValue());
365-
}
366-
367-
if (explainIndex.getIndex().equals(firstGenerationIndex)) {
368-
// first generation index was rolled over
369-
assertThat(explainIndex.getRolloverDate(), notNullValue());
370-
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), notNullValue());
371-
assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), notNullValue());
357+
if (explainIndex.getIndex().startsWith(DataStream.BACKING_INDEX_PREFIX)) {
358+
assertThat(explainIndex.isManagedByLifecycle(), is(false));
372359
} else {
373-
// the write index has not been rolled over yet
374-
assertThat(explainIndex.getRolloverDate(), nullValue());
375-
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue());
376-
assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue());
360+
assertThat(explainIndex.isManagedByLifecycle(), is(true));
361+
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
362+
assertThat(explainIndex.getLifecycle(), notNullValue());
363+
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());
364+
365+
if (internalCluster().numDataNodes() > 1) {
366+
// If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run
367+
assertThat(explainIndex.getError(), nullValue());
368+
}
369+
370+
if (explainIndex.getIndex().equals(firstGenerationIndex)) {
371+
// first generation index was rolled over
372+
assertThat(explainIndex.getRolloverDate(), notNullValue());
373+
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), notNullValue());
374+
assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), notNullValue());
375+
} else {
376+
// the write index has not been rolled over yet
377+
assertThat(explainIndex.getRolloverDate(), nullValue());
378+
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue());
379+
assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue());
380+
}
377381
}
378382
}
379383
}

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,35 @@ public void setup() throws IOException {
4242
"index_patterns": ["my-data-stream*"],
4343
"data_stream": {},
4444
"template": {
45-
"lifecycle": {}
45+
"settings": {
46+
"number_of_replicas": 0
47+
},
48+
"mappings": {
49+
"properties": {
50+
"count": {
51+
"type": "long"
52+
}
53+
}
54+
},
55+
"lifecycle": {},
56+
"data_stream_options": {
57+
"failure_store": {
58+
"enabled": true
59+
}
60+
}
4661
}
4762
}
4863
""");
4964
assertOK(client().performRequest(putComposableIndexTemplateRequest));
5065

51-
// Create a data streams with one doc
66+
// Index one doc, this will trigger a rollover
5267
Request createDocRequest = new Request("POST", "/my-data-stream/_doc?refresh=true");
5368
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
5469
assertOK(client().performRequest(createDocRequest));
70+
// Index one doc that will fail, this will create the failure store
71+
createDocRequest = new Request("POST", "/my-data-stream/_doc?refresh=true");
72+
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\", \"count\": \"not-a-number\"}");
73+
assertOK(client().performRequest(createDocRequest));
5574
}
5675

5776
@After
@@ -64,7 +83,7 @@ public void cleanUp() throws IOException {
6483

6584
@SuppressWarnings("unchecked")
6685
public void testDataStreamRetention() throws Exception {
67-
// Set global retention and add retention to the data stream
86+
// Set global retention and add retention to the data stream & failure store
6887
{
6988
updateClusterSettings(
7089
Settings.builder()
@@ -78,6 +97,18 @@ public void testDataStreamRetention() throws Exception {
7897
"data_retention": "10s"
7998
}""");
8099
assertAcknowledged(client().performRequest(request));
100+
101+
request = new Request("PUT", "_data_stream/my-data-stream/_options");
102+
request.setJsonEntity("""
103+
{
104+
"failure_store": {
105+
"enabled": true,
106+
"lifecycle": {
107+
"data_retention": "10s"
108+
}
109+
}
110+
}""");
111+
assertAcknowledged(client().performRequest(request));
81112
}
82113

83114
// Verify that the effective retention matches the default retention
@@ -92,6 +123,10 @@ public void testDataStreamRetention() throws Exception {
92123
assertThat(lifecycle.get("effective_retention"), is("10s"));
93124
assertThat(lifecycle.get("retention_determined_by"), is("data_stream_configuration"));
94125
assertThat(lifecycle.get("data_retention"), is("10s"));
126+
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
127+
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
128+
assertThat(failuresLifecycle.get("retention_determined_by"), is("data_stream_configuration"));
129+
assertThat(failuresLifecycle.get("data_retention"), is("10s"));
95130
}
96131

97132
// Verify that the first generation index was removed
@@ -101,8 +136,11 @@ public void testDataStreamRetention() throws Exception {
101136
assertThat(dataStream.get("name"), is("my-data-stream"));
102137
List<Object> backingIndices = (List<Object>) dataStream.get("indices");
103138
assertThat(backingIndices.size(), is(1));
139+
List<Object> failureIndices = (List<Object>) ((Map<String, Object>) dataStream.get("failure_store")).get("indices");
140+
assertThat(failureIndices.size(), is(1));
104141
// 2 backing indices created + 1 for the deleted index
105-
assertThat(dataStream.get("generation"), is(3));
142+
// 2 failure indices created + 1 for the deleted failure index
143+
assertThat(dataStream.get("generation"), is(6));
106144
}, 20, TimeUnit.SECONDS);
107145
}
108146

@@ -123,6 +161,10 @@ public void testDefaultRetention() throws Exception {
123161
assertThat(lifecycle.get("effective_retention"), is("10s"));
124162
assertThat(lifecycle.get("retention_determined_by"), is("default_global_retention"));
125163
assertThat(lifecycle.get("data_retention"), nullValue());
164+
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
165+
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
166+
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
167+
assertThat(failuresLifecycle.get("data_retention"), nullValue());
126168
}
127169

128170
// Verify that the first generation index was removed
@@ -132,8 +174,11 @@ public void testDefaultRetention() throws Exception {
132174
assertThat(dataStream.get("name"), is("my-data-stream"));
133175
List<Object> backingIndices = (List<Object>) dataStream.get("indices");
134176
assertThat(backingIndices.size(), is(1));
177+
List<Object> failureIndices = (List<Object>) ((Map<String, Object>) dataStream.get("failure_store")).get("indices");
178+
assertThat(failureIndices.size(), is(1));
135179
// 2 backing indices created + 1 for the deleted index
136-
assertThat(dataStream.get("generation"), is(3));
180+
// 2 failure indices created + 1 for the deleted failure index
181+
assertThat(dataStream.get("generation"), is(6));
137182
}, 20, TimeUnit.SECONDS);
138183
}
139184

@@ -157,6 +202,24 @@ public void testMaxRetention() throws Exception {
157202
containsString("The retention provided [30d] is exceeding the max allowed data retention of this project [10s]")
158203
);
159204
}
205+
try {
206+
Request request = new Request("PUT", "_data_stream/my-data-stream/_options");
207+
request.setJsonEntity("""
208+
{
209+
"failure_store": {
210+
"lifecycle": {
211+
"data_retention": "30d"
212+
}
213+
}
214+
}""");
215+
assertAcknowledged(client().performRequest(request));
216+
fail("Should have returned a warning about data retention exceeding the max retention");
217+
} catch (WarningFailureException warningFailureException) {
218+
assertThat(
219+
warningFailureException.getMessage(),
220+
containsString("The retention provided [30d] is exceeding the max allowed data retention of this project [10s]")
221+
);
222+
}
160223
}
161224

162225
// Verify that the effective retention matches the max retention
@@ -175,6 +238,14 @@ public void testMaxRetention() throws Exception {
175238
} else {
176239
assertThat(lifecycle.get("data_retention"), nullValue());
177240
}
241+
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
242+
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
243+
assertThat(failuresLifecycle.get("retention_determined_by"), is("max_global_retention"));
244+
if (withDataStreamLevelRetention) {
245+
assertThat(failuresLifecycle.get("data_retention"), is("30d"));
246+
} else {
247+
assertThat(failuresLifecycle.get("data_retention"), nullValue());
248+
}
178249
}
179250

180251
// Verify that the first generation index was removed
@@ -184,8 +255,11 @@ public void testMaxRetention() throws Exception {
184255
assertThat(dataStream.get("name"), is("my-data-stream"));
185256
List<Object> backingIndices = (List<Object>) dataStream.get("indices");
186257
assertThat(backingIndices.size(), is(1));
258+
List<Object> failureIndices = (List<Object>) ((Map<String, Object>) dataStream.get("failure_store")).get("indices");
259+
assertThat(failureIndices.size(), is(1));
187260
// 2 backing indices created + 1 for the deleted index
188-
assertThat(dataStream.get("generation"), is(3));
261+
// 2 failure indices created + 1 for the deleted failure index
262+
assertThat(dataStream.get("generation"), is(6));
189263
}, 20, TimeUnit.SECONDS);
190264
}
191265
}

0 commit comments

Comments
 (0)