Skip to content

Commit 0253c50

Browse files
author
elasticsearchmachine
committed
Merge remote-tracking branch 'origin/main' into lucene_snapshot
2 parents bd4edfa + 4985a61 commit 0253c50

File tree

28 files changed

+468
-110
lines changed

28 files changed

+468
-110
lines changed

docs/changelog/127573.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127573
2+
summary: "[Failure store] Introduce default retention for failure indices"
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ public void testSystemDataStreamRetention() throws Exception {
248248
builder,
249249
withEffectiveRetention,
250250
getDataStreamResponse.getRolloverConfiguration(),
251-
getDataStreamResponse.getGlobalRetention()
251+
getDataStreamResponse.getDataGlobalRetention(),
252+
getDataStreamResponse.getFailuresGlobalRetention()
252253
);
253254
String serialized = Strings.toString(builder);
254255
Map<String, Object> resultMap = XContentHelper.convertToMap(

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ public void testDataStreamRetention() throws Exception {
147147
@SuppressWarnings("unchecked")
148148
public void testDefaultRetention() throws Exception {
149149
// Set default global retention
150-
updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build());
150+
updateClusterSettings(
151+
Settings.builder()
152+
.put("data_streams.lifecycle.retention.default", "10s")
153+
.put("data_streams.lifecycle.retention.failures_default", "10s")
154+
.build()
155+
);
151156

152157
// Verify that the effective retention matches the default retention
153158
{
@@ -163,7 +168,7 @@ public void testDefaultRetention() throws Exception {
163168
assertThat(lifecycle.get("data_retention"), nullValue());
164169
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
165170
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
166-
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
171+
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention"));
167172
assertThat(failuresLifecycle.get("data_retention"), nullValue());
168173
}
169174

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ public int compareTo(IndexInfo o) {
370370
return new GetDataStreamAction.Response(
371371
dataStreamInfos,
372372
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
373-
globalRetentionSettings.get()
373+
globalRetentionSettings.get(false),
374+
globalRetentionSettings.get(true)
374375
);
375376
}
376377

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.elasticsearch.cluster.SimpleBatchedExecutor;
4646
import org.elasticsearch.cluster.block.ClusterBlockLevel;
4747
import org.elasticsearch.cluster.metadata.DataStream;
48-
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
4948
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
5049
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
5150
import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -372,13 +371,18 @@ private void run(ProjectState projectState) {
372371
continue;
373372
}
374373

374+
// Retrieve the effective retention to ensure the same retention is used for this data stream
375+
// through all operations.
376+
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
377+
var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);
378+
375379
// the following indices should not be considered for the remainder of this service run, for various reasons.
376380
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
377381

378382
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
379383
// depending on rollover criteria, for this reason we exclude them for the remaining run.
380-
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, false));
381-
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, true);
384+
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, dataRetention, false));
385+
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, failuresRetention, true);
382386
if (failureStoreWriteIndex != null) {
383387
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
384388
}
@@ -394,7 +398,9 @@ private void run(ProjectState projectState) {
394398
);
395399

396400
try {
397-
indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(project, dataStream, indicesToExcludeForRemainingRun));
401+
indicesToExcludeForRemainingRun.addAll(
402+
maybeExecuteRetention(project, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun)
403+
);
398404
} catch (Exception e) {
399405
// individual index errors would be reported via the API action listener for every delete call
400406
// we could potentially record errors at a data stream level and expose it via the _data_stream API?
@@ -840,7 +846,12 @@ private void clearErrorStoreForUnmanagedIndices(ProjectMetadata project, DataStr
840846
}
841847

842848
@Nullable
843-
private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStream, boolean rolloverFailureStore) {
849+
private Index maybeExecuteRollover(
850+
ProjectMetadata project,
851+
DataStream dataStream,
852+
TimeValue effectiveRetention,
853+
boolean rolloverFailureStore
854+
) {
844855
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
845856
if (currentRunWriteIndex == null) {
846857
return null;
@@ -851,7 +862,7 @@ private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStrea
851862
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
852863
rolloverConfiguration,
853864
dataStream.getName(),
854-
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
865+
effectiveRetention,
855866
rolloverFailureStore
856867
);
857868
transportActionsDeduplicator.executeOnce(
@@ -903,10 +914,13 @@ private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStrea
903914
* @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
904915
* @return The set of indices that delete requests have been sent for
905916
*/
906-
Set<Index> maybeExecuteRetention(ProjectMetadata project, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
907-
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
908-
var dataRetention = getRetention(dataStream, globalRetention, false);
909-
var failureRetention = getRetention(dataStream, globalRetention, true);
917+
Set<Index> maybeExecuteRetention(
918+
ProjectMetadata project,
919+
DataStream dataStream,
920+
TimeValue dataRetention,
921+
TimeValue failureRetention,
922+
Set<Index> indicesToExcludeForRemainingRun
923+
) {
910924
if (dataRetention == null && failureRetention == null) {
911925
return Set.of();
912926
}
@@ -1361,11 +1375,15 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
13611375
}
13621376

13631377
@Nullable
1364-
private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) {
1378+
private static TimeValue getEffectiveRetention(
1379+
DataStream dataStream,
1380+
DataStreamGlobalRetentionSettings globalRetentionSettings,
1381+
boolean failureStore
1382+
) {
13651383
DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
13661384
return lifecycle == null || lifecycle.enabled() == false
13671385
? null
1368-
: lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
1386+
: lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal());
13691387
}
13701388

13711389
/**

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ protected void masterOperation(
123123
new ExplainDataStreamLifecycleAction.Response(
124124
explainIndices,
125125
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
126-
globalRetentionSettings.get()
126+
globalRetentionSettings.get(false),
127+
globalRetentionSettings.get(true)
127128
)
128129
);
129130
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public class RestGetDataStreamsAction extends BaseRestHandler {
4848
public static final String FAILURES_LIFECYCLE_API_CAPABILITY = "failure_store.lifecycle";
4949
private static final Set<String> CAPABILITIES = Set.of(
5050
DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY,
51-
FAILURES_LIFECYCLE_API_CAPABILITY
51+
FAILURES_LIFECYCLE_API_CAPABILITY,
52+
"failure_store.lifecycle.default_retention"
5253
);
5354

5455
@Override

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ public void testPassingGlobalRetention() {
362362
new IndexSettingProviders(Set.of()),
363363
null
364364
);
365-
assertThat(response.getGlobalRetention(), nullValue());
366-
DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
365+
assertThat(response.getDataGlobalRetention(), nullValue());
366+
DataStreamGlobalRetention dataGlobalRetention = new DataStreamGlobalRetention(
367367
TimeValue.timeValueDays(randomIntBetween(1, 5)),
368368
TimeValue.timeValueDays(randomIntBetween(5, 10))
369369
);
@@ -372,9 +372,9 @@ public void testPassingGlobalRetention() {
372372
Settings.builder()
373373
.put(
374374
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(),
375-
globalRetention.defaultRetention()
375+
dataGlobalRetention.defaultRetention()
376376
)
377-
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), globalRetention.maxRetention())
377+
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), dataGlobalRetention.maxRetention())
378378
.build()
379379
)
380380
);
@@ -389,7 +389,9 @@ public void testPassingGlobalRetention() {
389389
new IndexSettingProviders(Set.of()),
390390
null
391391
);
392-
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
392+
assertThat(response.getDataGlobalRetention(), equalTo(dataGlobalRetention));
393+
// We used the default failures retention here which is greater than the max
394+
assertThat(response.getFailuresGlobalRetention(), equalTo(new DataStreamGlobalRetention(null, dataGlobalRetention.maxRetention())));
393395
}
394396

395397
public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() {

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,9 +1554,16 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() {
15541554
final var project = state.metadata().getProject(projectId);
15551555
DataStream dataStream = project.dataStreams().get(dataStreamName);
15561556
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1557+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15571558

15581559
// Executing the method to be tested:
1559-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1560+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1561+
project,
1562+
dataStream,
1563+
dataRetention,
1564+
null,
1565+
Set.of()
1566+
);
15601567
assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex()));
15611568
}
15621569

@@ -1566,10 +1573,16 @@ public void testMaybeExecuteRetentionDownsampledIndexInProgress() {
15661573
ClusterState state = downsampleSetup(projectId, dataStreamName, STARTED);
15671574
final var project = state.metadata().getProject(projectId);
15681575
DataStream dataStream = project.dataStreams().get(dataStreamName);
1569-
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1576+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15701577

15711578
// Executing the method to be tested:
1572-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1579+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1580+
project,
1581+
dataStream,
1582+
dataRetention,
1583+
null,
1584+
Set.of()
1585+
);
15731586
assertThat(indicesToBeRemoved, empty());
15741587
}
15751588

@@ -1580,9 +1593,16 @@ public void testMaybeExecuteRetentionDownsampledUnknown() {
15801593
final var project = state.metadata().getProject(projectId);
15811594
DataStream dataStream = project.dataStreams().get(dataStreamName);
15821595
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1596+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15831597

15841598
// Executing the method to be tested:
1585-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1599+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1600+
project,
1601+
dataStream,
1602+
dataRetention,
1603+
null,
1604+
Set.of()
1605+
);
15861606
assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex()));
15871607
}
15881608

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,23 @@ teardown:
189189
- match: { data_streams.0.template: 'my-template1' }
190190
- match: { data_streams.0.failure_store.enabled: true }
191191
- match: { data_streams.0.failure_store.lifecycle.enabled: false }
192+
- is_false: data_streams.0.failure_store.lifecycle.data_retention
193+
- is_false: data_streams.0.failure_store.lifecycle.effective_retention
194+
- is_false: data_streams.0.failure_store.lifecycle.retention_determined_by
192195
- length: { data_streams.0.failure_store.indices: 1 }
193196
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
194197
- is_false: data_streams.0.failure_store.indices.0.prefer_ilm
195198
- match: { data_streams.0.failure_store.indices.0.managed_by: 'Unmanaged' }
196199

197200
---
198201
"Get failure store info from cluster setting enabled failure store":
202+
- requires:
203+
test_runner_features: [ capabilities ]
204+
reason: "Default retention for failures was added in 9.1+"
205+
capabilities:
206+
- method: GET
207+
path: /_data_stream/{target}
208+
capabilities: [ 'failure_store.lifecycle.default_retention' ]
199209
- do:
200210
indices.create_data_stream:
201211
name: fs-default-data-stream
@@ -212,6 +222,9 @@ teardown:
212222
- match: { data_streams.0.template: 'my-template2' }
213223
- match: { data_streams.0.failure_store.enabled: true }
214224
- match: { data_streams.0.failure_store.lifecycle.enabled: true }
225+
- is_false: data_streams.0.failure_store.lifecycle.data_retention
226+
- match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
227+
- match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
215228
- match: { data_streams.0.failure_store.indices: [] }
216229

217230
# Initialize failure store
@@ -234,6 +247,9 @@ teardown:
234247
- match: { data_streams.0.template: 'my-template2' }
235248
- match: { data_streams.0.failure_store.enabled: true }
236249
- match: { data_streams.0.failure_store.lifecycle.enabled: true }
250+
- is_false: data_streams.0.failure_store.lifecycle.data_retention
251+
- match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
252+
- match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
237253
- length: { data_streams.0.failure_store.indices: 1 }
238254
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-default-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
239255
- is_false: data_streams.0.failure_store.indices.0.prefer_ilm

0 commit comments

Comments
 (0)