Skip to content

Commit fe36c42

Browse files
authored
[Failure store] Introduce default retention for failure indices (elastic#127573)
We introduce a new global retention setting `data_streams.lifecycle.retention.failures_default` which is used by the data stream lifecycle management as the default retention when the failure store lifecycle of the data stream does not specify one. Elasticsearch comes with the default value of 30 days. The value can be changed via the settings API to any time value higher than 10 seconds or -1 to indicate no default retention should apply. The failures default retention can be set to values higher than the max retention, but then the max retention will be effective. The reason for this choice it to ensure that no deployments will be broken, if the user has already set up max retention less than 30 days.
1 parent 8fad975 commit fe36c42

File tree

23 files changed

+412
-89
lines changed

23 files changed

+412
-89
lines changed

docs/changelog/127573.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127573
2+
summary: "[Failure store] Introduce default retention for failure indices"
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ public void testSystemDataStreamRetention() throws Exception {
248248
builder,
249249
withEffectiveRetention,
250250
getDataStreamResponse.getRolloverConfiguration(),
251-
getDataStreamResponse.getGlobalRetention()
251+
getDataStreamResponse.getDataGlobalRetention(),
252+
getDataStreamResponse.getFailuresGlobalRetention()
252253
);
253254
String serialized = Strings.toString(builder);
254255
Map<String, Object> resultMap = XContentHelper.convertToMap(

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ public void testDataStreamRetention() throws Exception {
147147
@SuppressWarnings("unchecked")
148148
public void testDefaultRetention() throws Exception {
149149
// Set default global retention
150-
updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build());
150+
updateClusterSettings(
151+
Settings.builder()
152+
.put("data_streams.lifecycle.retention.default", "10s")
153+
.put("data_streams.lifecycle.retention.failures_default", "10s")
154+
.build()
155+
);
151156

152157
// Verify that the effective retention matches the default retention
153158
{
@@ -163,7 +168,7 @@ public void testDefaultRetention() throws Exception {
163168
assertThat(lifecycle.get("data_retention"), nullValue());
164169
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
165170
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
166-
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
171+
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention"));
167172
assertThat(failuresLifecycle.get("data_retention"), nullValue());
168173
}
169174

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ public int compareTo(IndexInfo o) {
370370
return new GetDataStreamAction.Response(
371371
dataStreamInfos,
372372
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
373-
globalRetentionSettings.get()
373+
globalRetentionSettings.get(false),
374+
globalRetentionSettings.get(true)
374375
);
375376
}
376377

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.elasticsearch.cluster.SimpleBatchedExecutor;
4646
import org.elasticsearch.cluster.block.ClusterBlockLevel;
4747
import org.elasticsearch.cluster.metadata.DataStream;
48-
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
4948
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
5049
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
5150
import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -372,13 +371,18 @@ private void run(ProjectState projectState) {
372371
continue;
373372
}
374373

374+
// Retrieve the effective retention to ensure the same retention is used for this data stream
375+
// through all operations.
376+
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
377+
var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);
378+
375379
// the following indices should not be considered for the remainder of this service run, for various reasons.
376380
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
377381

378382
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
379383
// depending on rollover criteria, for this reason we exclude them for the remaining run.
380-
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, false));
381-
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, true);
384+
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, dataRetention, false));
385+
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, failuresRetention, true);
382386
if (failureStoreWriteIndex != null) {
383387
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
384388
}
@@ -394,7 +398,9 @@ private void run(ProjectState projectState) {
394398
);
395399

396400
try {
397-
indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(project, dataStream, indicesToExcludeForRemainingRun));
401+
indicesToExcludeForRemainingRun.addAll(
402+
maybeExecuteRetention(project, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun)
403+
);
398404
} catch (Exception e) {
399405
// individual index errors would be reported via the API action listener for every delete call
400406
// we could potentially record errors at a data stream level and expose it via the _data_stream API?
@@ -840,7 +846,12 @@ private void clearErrorStoreForUnmanagedIndices(ProjectMetadata project, DataStr
840846
}
841847

842848
@Nullable
843-
private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStream, boolean rolloverFailureStore) {
849+
private Index maybeExecuteRollover(
850+
ProjectMetadata project,
851+
DataStream dataStream,
852+
TimeValue effectiveRetention,
853+
boolean rolloverFailureStore
854+
) {
844855
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
845856
if (currentRunWriteIndex == null) {
846857
return null;
@@ -851,7 +862,7 @@ private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStrea
851862
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
852863
rolloverConfiguration,
853864
dataStream.getName(),
854-
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
865+
effectiveRetention,
855866
rolloverFailureStore
856867
);
857868
transportActionsDeduplicator.executeOnce(
@@ -903,10 +914,13 @@ private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStrea
903914
* @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
904915
* @return The set of indices that delete requests have been sent for
905916
*/
906-
Set<Index> maybeExecuteRetention(ProjectMetadata project, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
907-
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
908-
var dataRetention = getRetention(dataStream, globalRetention, false);
909-
var failureRetention = getRetention(dataStream, globalRetention, true);
917+
Set<Index> maybeExecuteRetention(
918+
ProjectMetadata project,
919+
DataStream dataStream,
920+
TimeValue dataRetention,
921+
TimeValue failureRetention,
922+
Set<Index> indicesToExcludeForRemainingRun
923+
) {
910924
if (dataRetention == null && failureRetention == null) {
911925
return Set.of();
912926
}
@@ -1361,11 +1375,15 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
13611375
}
13621376

13631377
@Nullable
1364-
private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) {
1378+
private static TimeValue getEffectiveRetention(
1379+
DataStream dataStream,
1380+
DataStreamGlobalRetentionSettings globalRetentionSettings,
1381+
boolean failureStore
1382+
) {
13651383
DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
13661384
return lifecycle == null || lifecycle.enabled() == false
13671385
? null
1368-
: lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
1386+
: lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal());
13691387
}
13701388

13711389
/**

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ protected void masterOperation(
123123
new ExplainDataStreamLifecycleAction.Response(
124124
explainIndices,
125125
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
126-
globalRetentionSettings.get()
126+
globalRetentionSettings.get(false),
127+
globalRetentionSettings.get(true)
127128
)
128129
);
129130
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ public void testPassingGlobalRetention() {
362362
new IndexSettingProviders(Set.of()),
363363
null
364364
);
365-
assertThat(response.getGlobalRetention(), nullValue());
366-
DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
365+
assertThat(response.getDataGlobalRetention(), nullValue());
366+
DataStreamGlobalRetention dataGlobalRetention = new DataStreamGlobalRetention(
367367
TimeValue.timeValueDays(randomIntBetween(1, 5)),
368368
TimeValue.timeValueDays(randomIntBetween(5, 10))
369369
);
@@ -372,9 +372,9 @@ public void testPassingGlobalRetention() {
372372
Settings.builder()
373373
.put(
374374
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(),
375-
globalRetention.defaultRetention()
375+
dataGlobalRetention.defaultRetention()
376376
)
377-
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), globalRetention.maxRetention())
377+
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), dataGlobalRetention.maxRetention())
378378
.build()
379379
)
380380
);
@@ -389,7 +389,9 @@ public void testPassingGlobalRetention() {
389389
new IndexSettingProviders(Set.of()),
390390
null
391391
);
392-
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
392+
assertThat(response.getDataGlobalRetention(), equalTo(dataGlobalRetention));
393+
// We used the default failures retention here which is greater than the max
394+
assertThat(response.getFailuresGlobalRetention(), equalTo(new DataStreamGlobalRetention(null, dataGlobalRetention.maxRetention())));
393395
}
394396

395397
public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() {

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,9 +1554,16 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() {
15541554
final var project = state.metadata().getProject(projectId);
15551555
DataStream dataStream = project.dataStreams().get(dataStreamName);
15561556
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1557+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15571558

15581559
// Executing the method to be tested:
1559-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1560+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1561+
project,
1562+
dataStream,
1563+
dataRetention,
1564+
null,
1565+
Set.of()
1566+
);
15601567
assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex()));
15611568
}
15621569

@@ -1566,10 +1573,16 @@ public void testMaybeExecuteRetentionDownsampledIndexInProgress() {
15661573
ClusterState state = downsampleSetup(projectId, dataStreamName, STARTED);
15671574
final var project = state.metadata().getProject(projectId);
15681575
DataStream dataStream = project.dataStreams().get(dataStreamName);
1569-
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1576+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15701577

15711578
// Executing the method to be tested:
1572-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1579+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1580+
project,
1581+
dataStream,
1582+
dataRetention,
1583+
null,
1584+
Set.of()
1585+
);
15731586
assertThat(indicesToBeRemoved, empty());
15741587
}
15751588

@@ -1580,9 +1593,16 @@ public void testMaybeExecuteRetentionDownsampledUnknown() {
15801593
final var project = state.metadata().getProject(projectId);
15811594
DataStream dataStream = project.dataStreams().get(dataStreamName);
15821595
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1596+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15831597

15841598
// Executing the method to be tested:
1585-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1599+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1600+
project,
1601+
dataStream,
1602+
dataRetention,
1603+
null,
1604+
Set.of()
1605+
);
15861606
assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex()));
15871607
}
15881608

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ teardown:
189189
- match: { data_streams.0.template: 'my-template1' }
190190
- match: { data_streams.0.failure_store.enabled: true }
191191
- match: { data_streams.0.failure_store.lifecycle.enabled: false }
192+
- is_false: data_streams.0.failure_store.lifecycle.data_retention
193+
- is_false: data_streams.0.failure_store.lifecycle.effective_retention
194+
- is_false: data_streams.0.failure_store.lifecycle.retention_determined_by
192195
- length: { data_streams.0.failure_store.indices: 1 }
193196
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
194197
- is_false: data_streams.0.failure_store.indices.0.prefer_ilm
@@ -212,6 +215,9 @@ teardown:
212215
- match: { data_streams.0.template: 'my-template2' }
213216
- match: { data_streams.0.failure_store.enabled: true }
214217
- match: { data_streams.0.failure_store.lifecycle.enabled: true }
218+
- is_false: data_streams.0.failure_store.lifecycle.data_retention
219+
- match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
220+
- match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
215221
- match: { data_streams.0.failure_store.indices: [] }
216222

217223
# Initialize failure store
@@ -234,6 +240,9 @@ teardown:
234240
- match: { data_streams.0.template: 'my-template2' }
235241
- match: { data_streams.0.failure_store.enabled: true }
236242
- match: { data_streams.0.failure_store.lifecycle.enabled: true }
243+
- is_false: data_streams.0.failure_store.lifecycle.data_retention
244+
- match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
245+
- match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
237246
- length: { data_streams.0.failure_store.indices: 1 }
238247
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-default-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
239248
- is_false: data_streams.0.failure_store.indices.0.prefer_ilm

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ static TransportVersion def(int id) {
167167
public static final TransportVersion PINNED_RETRIEVER_8_19 = def(8_841_0_23);
168168
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_BLOCK_8_19 = def(8_841_0_24);
169169
public static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE_BACKPORT_8_19 = def(8_841_0_25);
170+
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19 = def(8_841_0_26);
170171
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
171172
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
172173
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
@@ -239,6 +240,7 @@ static TransportVersion def(int id) {
239240
public static final TransportVersion PINNED_RETRIEVER = def(9_068_0_00);
240241
public static final TransportVersion ML_INFERENCE_SAGEMAKER = def(9_069_0_00);
241242
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
243+
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
242244

243245
/*
244246
* STOP! READ THIS FIRST! No, really,

0 commit comments

Comments
 (0)