Skip to content

Commit 700503a

Browse files
authored
[8.19] [Failure store] Introduce default retention for failure indices (#127573) (#127673)
* [Failure store] Introduce default retention for failure indices (#127573) We introduce a new global retention setting `data_streams.lifecycle.retention.failures_default` which is used by the data stream lifecycle management as the default retention when the failure store lifecycle of the data stream does not specify one. Elasticsearch comes with the default value of 30 days. The value can be changed via the settings API to any time value higher than 10 seconds or -1 to indicate no default retention should apply. The failures default retention can be set to values higher than the max retention, but then the max retention will be effective. The reason for this choice it to ensure that no deployments will be broken, if the user has already set up max retention less than 30 days.
1 parent a2e1925 commit 700503a

File tree

25 files changed

+439
-101
lines changed

25 files changed

+439
-101
lines changed

docs/changelog/127573.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127573
2+
summary: "[Failure store] Introduce default retention for failure indices"
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ public void testSystemDataStreamRetention() throws Exception {
279279
builder,
280280
withEffectiveRetention,
281281
getDataStreamResponse.getRolloverConfiguration(),
282-
getDataStreamResponse.getGlobalRetention()
282+
getDataStreamResponse.getDataGlobalRetention(),
283+
getDataStreamResponse.getFailuresGlobalRetention()
283284
);
284285
String serialized = Strings.toString(builder);
285286
Map<String, Object> resultMap = XContentHelper.convertToMap(

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ public void testDataStreamRetention() throws Exception {
147147
@SuppressWarnings("unchecked")
148148
public void testDefaultRetention() throws Exception {
149149
// Set default global retention
150-
updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build());
150+
updateClusterSettings(
151+
Settings.builder()
152+
.put("data_streams.lifecycle.retention.default", "10s")
153+
.put("data_streams.lifecycle.retention.failures_default", "10s")
154+
.build()
155+
);
151156

152157
// Verify that the effective retention matches the default retention
153158
{
@@ -163,7 +168,7 @@ public void testDefaultRetention() throws Exception {
163168
assertThat(lifecycle.get("data_retention"), nullValue());
164169
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
165170
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
166-
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
171+
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention"));
167172
assertThat(failuresLifecycle.get("data_retention"), nullValue());
168173
}
169174

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public TransportGetDataStreamsAction(
8888
threadPool,
8989
actionFilters,
9090
GetDataStreamAction.Request::new,
91-
GetDataStreamAction.Response::new,
91+
GetDataStreamAction.Response::read,
9292
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
9393
);
9494
this.indexNameExpressionResolver = indexNameExpressionResolver;
@@ -287,7 +287,8 @@ public int compareTo(IndexInfo o) {
287287
return new GetDataStreamAction.Response(
288288
dataStreamInfos,
289289
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
290-
globalRetentionSettings.get()
290+
globalRetentionSettings.get(false),
291+
globalRetentionSettings.get(true)
291292
);
292293
}
293294

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.cluster.SimpleBatchedExecutor;
4545
import org.elasticsearch.cluster.block.ClusterBlockLevel;
4646
import org.elasticsearch.cluster.metadata.DataStream;
47-
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
4847
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
4948
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
5049
import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -354,13 +353,18 @@ void run(ClusterState state) {
354353
continue;
355354
}
356355

356+
// Retrieve the effective retention to ensure the same retention is used for this data stream
357+
// through all operations.
358+
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
359+
var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);
360+
357361
// the following indices should not be considered for the remainder of this service run, for various reasons.
358362
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
359363

360364
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
361365
// depending on rollover criteria, for this reason we exclude them for the remaining run.
362-
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
363-
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
366+
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, dataRetention, false));
367+
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, failuresRetention, true);
364368
if (failureStoreWriteIndex != null) {
365369
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
366370
}
@@ -376,7 +380,9 @@ void run(ClusterState state) {
376380
);
377381

378382
try {
379-
indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(state, dataStream, indicesToExcludeForRemainingRun));
383+
indicesToExcludeForRemainingRun.addAll(
384+
maybeExecuteRetention(state, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun)
385+
);
380386
} catch (Exception e) {
381387
// individual index errors would be reported via the API action listener for every delete call
382388
// we could potentially record errors at a data stream level and expose it via the _data_stream API?
@@ -807,7 +813,12 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
807813
}
808814

809815
@Nullable
810-
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
816+
private Index maybeExecuteRollover(
817+
ClusterState state,
818+
DataStream dataStream,
819+
TimeValue effectiveRetention,
820+
boolean rolloverFailureStore
821+
) {
811822
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
812823
if (currentRunWriteIndex == null) {
813824
return null;
@@ -818,7 +829,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
818829
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
819830
rolloverConfiguration,
820831
dataStream.getName(),
821-
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
832+
effectiveRetention,
822833
rolloverFailureStore
823834
);
824835
transportActionsDeduplicator.executeOnce(
@@ -868,14 +879,17 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
868879
* @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
869880
* @return The set of indices that delete requests have been sent for
870881
*/
871-
Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
872-
Metadata metadata = state.metadata();
873-
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
874-
var dataRetention = getRetention(dataStream, globalRetention, false);
875-
var failureRetention = getRetention(dataStream, globalRetention, true);
882+
Set<Index> maybeExecuteRetention(
883+
ClusterState state,
884+
DataStream dataStream,
885+
TimeValue dataRetention,
886+
TimeValue failureRetention,
887+
Set<Index> indicesToExcludeForRemainingRun
888+
) {
876889
if (dataRetention == null && failureRetention == null) {
877890
return Set.of();
878891
}
892+
Metadata metadata = state.metadata();
879893
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(
880894
metadata::index,
881895
nowSupplier,
@@ -1320,11 +1334,15 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
13201334
}
13211335

13221336
@Nullable
1323-
private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) {
1337+
private static TimeValue getEffectiveRetention(
1338+
DataStream dataStream,
1339+
DataStreamGlobalRetentionSettings globalRetentionSettings,
1340+
boolean failureStore
1341+
) {
13241342
DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
13251343
return lifecycle == null || lifecycle.enabled() == false
13261344
? null
1327-
: lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
1345+
: lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal());
13281346
}
13291347

13301348
/**

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ protected void masterOperation(
120120
new ExplainDataStreamLifecycleAction.Response(
121121
explainIndices,
122122
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
123-
globalRetentionSettings.get()
123+
globalRetentionSettings.get(false),
124+
globalRetentionSettings.get(true)
124125
)
125126
);
126127
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public class RestGetDataStreamsAction extends BaseRestHandler {
4747
public static final String FAILURES_LIFECYCLE_API_CAPABILITY = "failure_store.lifecycle";
4848
private static final Set<String> CAPABILITIES = Set.of(
4949
DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY,
50-
FAILURES_LIFECYCLE_API_CAPABILITY
50+
FAILURES_LIFECYCLE_API_CAPABILITY,
51+
"failure_store.lifecycle.default_retention"
5152
);
5253

5354
@Override

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
4545

4646
@Override
4747
protected Writeable.Reader<Response> instanceReader() {
48-
return Response::new;
48+
return Response::read;
4949
}
5050

5151
@Override

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,8 @@ public void testPassingGlobalRetention() {
351351
emptyDataStreamFailureStoreSettings,
352352
null
353353
);
354-
assertThat(response.getGlobalRetention(), nullValue());
355-
DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
354+
assertThat(response.getDataGlobalRetention(), nullValue());
355+
DataStreamGlobalRetention dataGlobalRetention = new DataStreamGlobalRetention(
356356
TimeValue.timeValueDays(randomIntBetween(1, 5)),
357357
TimeValue.timeValueDays(randomIntBetween(5, 10))
358358
);
@@ -361,9 +361,9 @@ public void testPassingGlobalRetention() {
361361
Settings.builder()
362362
.put(
363363
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(),
364-
globalRetention.defaultRetention()
364+
dataGlobalRetention.defaultRetention()
365365
)
366-
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), globalRetention.maxRetention())
366+
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), dataGlobalRetention.maxRetention())
367367
.build()
368368
)
369369
);
@@ -377,7 +377,9 @@ public void testPassingGlobalRetention() {
377377
emptyDataStreamFailureStoreSettings,
378378
null
379379
);
380-
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
380+
assertThat(response.getDataGlobalRetention(), equalTo(dataGlobalRetention));
381+
// We used the default failures retention here which is greater than the max
382+
assertThat(response.getFailuresGlobalRetention(), equalTo(new DataStreamGlobalRetention(null, dataGlobalRetention.maxRetention())));
381383
}
382384

383385
public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() {

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,20 +1566,33 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() {
15661566
ClusterState state = downsampleSetup(dataStreamName, SUCCESS);
15671567
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
15681568
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1569+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15691570

15701571
// Executing the method to be tested:
1571-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
1572+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1573+
clusterService.state(),
1574+
dataStream,
1575+
dataRetention,
1576+
null,
1577+
Set.of()
1578+
);
15721579
assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex()));
15731580
}
15741581

15751582
public void testMaybeExecuteRetentionDownsampledIndexInProgress() {
15761583
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
15771584
ClusterState state = downsampleSetup(dataStreamName, STARTED);
15781585
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
1579-
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1586+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15801587

15811588
// Executing the method to be tested:
1582-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
1589+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1590+
clusterService.state(),
1591+
dataStream,
1592+
dataRetention,
1593+
null,
1594+
Set.of()
1595+
);
15831596
assertThat(indicesToBeRemoved, empty());
15841597
}
15851598

@@ -1588,9 +1601,16 @@ public void testMaybeExecuteRetentionDownsampledUnknown() {
15881601
ClusterState state = downsampleSetup(dataStreamName, UNKNOWN);
15891602
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
15901603
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1604+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15911605

15921606
// Executing the method to be tested:
1593-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
1607+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1608+
clusterService.state(),
1609+
dataStream,
1610+
dataRetention,
1611+
null,
1612+
Set.of()
1613+
);
15941614
assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex()));
15951615
}
15961616

0 commit comments

Comments
 (0)