Skip to content

Commit 5c4273e

Browse files
committed
Use the failures global retention in the data stream lifecycle
1 parent 88e49f0 commit 5c4273e

File tree

6 files changed

+88
-27
lines changed

6 files changed

+88
-27
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ public void testSystemDataStreamRetention() throws Exception {
248248
builder,
249249
withEffectiveRetention,
250250
getDataStreamResponse.getRolloverConfiguration(),
251-
getDataStreamResponse.getGlobalRetention()
251+
getDataStreamResponse.getDataGlobalRetention(),
252+
getDataStreamResponse.getFailuresGlobalRetention()
252253
);
253254
String serialized = Strings.toString(builder);
254255
Map<String, Object> resultMap = XContentHelper.convertToMap(

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.elasticsearch.cluster.SimpleBatchedExecutor;
4646
import org.elasticsearch.cluster.block.ClusterBlockLevel;
4747
import org.elasticsearch.cluster.metadata.DataStream;
48-
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
4948
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
5049
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
5150
import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -372,13 +371,18 @@ private void run(ProjectState projectState) {
372371
continue;
373372
}
374373

374+
// Retrieve the effective retention to ensure the same retention is used for this data stream
375+
// through all operations.
376+
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
377+
var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);
378+
375379
// the following indices should not be considered for the remainder of this service run, for various reasons.
376380
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
377381

378382
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
379383
// depending on rollover criteria, for this reason we exclude them for the remaining run.
380-
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, false));
381-
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, true);
384+
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, dataRetention, false));
385+
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, failuresRetention, true);
382386
if (failureStoreWriteIndex != null) {
383387
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
384388
}
@@ -394,7 +398,9 @@ private void run(ProjectState projectState) {
394398
);
395399

396400
try {
397-
indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(project, dataStream, indicesToExcludeForRemainingRun));
401+
indicesToExcludeForRemainingRun.addAll(
402+
maybeExecuteRetention(project, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun)
403+
);
398404
} catch (Exception e) {
399405
// individual index errors would be reported via the API action listener for every delete call
400406
// we could potentially record errors at a data stream level and expose it via the _data_stream API?
@@ -840,7 +846,12 @@ private void clearErrorStoreForUnmanagedIndices(ProjectMetadata project, DataStr
840846
}
841847

842848
@Nullable
843-
private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStream, boolean rolloverFailureStore) {
849+
private Index maybeExecuteRollover(
850+
ProjectMetadata project,
851+
DataStream dataStream,
852+
TimeValue effectiveRetention,
853+
boolean rolloverFailureStore
854+
) {
844855
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
845856
if (currentRunWriteIndex == null) {
846857
return null;
@@ -851,7 +862,7 @@ private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStrea
851862
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
852863
rolloverConfiguration,
853864
dataStream.getName(),
854-
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
865+
effectiveRetention,
855866
rolloverFailureStore
856867
);
857868
transportActionsDeduplicator.executeOnce(
@@ -903,10 +914,13 @@ private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStrea
903914
* @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
904915
* @return The set of indices that delete requests have been sent for
905916
*/
906-
Set<Index> maybeExecuteRetention(ProjectMetadata project, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
907-
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
908-
var dataRetention = getRetention(dataStream, globalRetention, false);
909-
var failureRetention = getRetention(dataStream, globalRetention, true);
917+
Set<Index> maybeExecuteRetention(
918+
ProjectMetadata project,
919+
DataStream dataStream,
920+
TimeValue dataRetention,
921+
TimeValue failureRetention,
922+
Set<Index> indicesToExcludeForRemainingRun
923+
) {
910924
if (dataRetention == null && failureRetention == null) {
911925
return Set.of();
912926
}
@@ -1361,11 +1375,15 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
13611375
}
13621376

13631377
@Nullable
1364-
private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) {
1378+
private static TimeValue getEffectiveRetention(
1379+
DataStream dataStream,
1380+
DataStreamGlobalRetentionSettings globalRetentionSettings,
1381+
boolean failureStore
1382+
) {
13651383
DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
13661384
return lifecycle == null || lifecycle.enabled() == false
13671385
? null
1368-
: lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
1386+
: lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal());
13691387
}
13701388

13711389
/**

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,9 +1554,16 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() {
15541554
final var project = state.metadata().getProject(projectId);
15551555
DataStream dataStream = project.dataStreams().get(dataStreamName);
15561556
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1557+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15571558

15581559
// Executing the method to be tested:
1559-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1560+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1561+
project,
1562+
dataStream,
1563+
dataRetention,
1564+
null,
1565+
Set.of()
1566+
);
15601567
assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex()));
15611568
}
15621569

@@ -1566,10 +1573,16 @@ public void testMaybeExecuteRetentionDownsampledIndexInProgress() {
15661573
ClusterState state = downsampleSetup(projectId, dataStreamName, STARTED);
15671574
final var project = state.metadata().getProject(projectId);
15681575
DataStream dataStream = project.dataStreams().get(dataStreamName);
1569-
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1576+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15701577

15711578
// Executing the method to be tested:
1572-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1579+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1580+
project,
1581+
dataStream,
1582+
dataRetention,
1583+
null,
1584+
Set.of()
1585+
);
15731586
assertThat(indicesToBeRemoved, empty());
15741587
}
15751588

@@ -1580,9 +1593,16 @@ public void testMaybeExecuteRetentionDownsampledUnknown() {
15801593
final var project = state.metadata().getProject(projectId);
15811594
DataStream dataStream = project.dataStreams().get(dataStreamName);
15821595
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
1596+
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
15831597

15841598
// Executing the method to be tested:
1585-
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of());
1599+
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
1600+
project,
1601+
dataStream,
1602+
dataRetention,
1603+
null,
1604+
Set.of()
1605+
);
15861606
assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex()));
15871607
}
15881608

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ ProjectMetadata updateDataLifecycle(ProjectMetadata project, List<String> dataSt
353353
}
354354
if (lifecycle != null) {
355355
// We don't issue any warnings if all data streams are internal data streams
356-
lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), onlyInternalDataStreams);
356+
lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), onlyInternalDataStreams);
357357
}
358358
return builder.build();
359359
}
@@ -378,7 +378,7 @@ ProjectMetadata updateDataStreamOptions(
378378
// We don't issue any warnings if all data streams are internal data streams
379379
dataStreamOptions.failureStore()
380380
.lifecycle()
381-
.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), onlyInternalDataStreams);
381+
.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(true), onlyInternalDataStreams);
382382
}
383383
return builder.build();
384384
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,14 @@ public ProjectMetadata addComponentTemplate(
354354
tempProjectWithComponentTemplateAdded,
355355
composableTemplateName,
356356
composableTemplate,
357-
globalRetentionSettings.get()
357+
globalRetentionSettings.get(false)
358+
);
359+
validateDataStreamOptions(
360+
tempProjectWithComponentTemplateAdded,
361+
composableTemplateName,
362+
composableTemplate,
363+
globalRetentionSettings.get(true)
358364
);
359-
validateDataStreamOptions(tempProjectWithComponentTemplateAdded, composableTemplateName, composableTemplate);
360365
validateIndexTemplateV2(tempProjectWithComponentTemplateAdded, composableTemplateName, composableTemplate);
361366
} catch (Exception e) {
362367
if (validationFailure == null) {
@@ -383,7 +388,7 @@ public ProjectMetadata addComponentTemplate(
383388
finalComponentTemplate.template()
384389
.lifecycle()
385390
.toDataStreamLifecycle()
386-
.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), false);
391+
.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), false);
387392
}
388393

389394
logger.info("{} component template [{}]", existing == null ? "adding" : "updating", name);
@@ -743,8 +748,8 @@ void validateIndexTemplateV2(ProjectMetadata projectMetadata, String name, Compo
743748

744749
validate(name, templateToValidate);
745750
validateDataStreamsStillReferenced(projectMetadata, name, templateToValidate);
746-
validateLifecycle(projectMetadata, name, templateToValidate, globalRetentionSettings.get());
747-
validateDataStreamOptions(projectMetadata, name, templateToValidate);
751+
validateLifecycle(projectMetadata, name, templateToValidate, globalRetentionSettings.get(false));
752+
validateDataStreamOptions(projectMetadata, name, templateToValidate, globalRetentionSettings.get(true));
748753

749754
if (templateToValidate.isDeprecated() == false) {
750755
validateUseOfDeprecatedComponentTemplates(name, templateToValidate, projectMetadata.componentTemplates());
@@ -839,7 +844,12 @@ static void validateLifecycle(
839844
}
840845

841846
// Visible for testing
842-
static void validateDataStreamOptions(ProjectMetadata projectMetadata, String indexTemplateName, ComposableIndexTemplate template) {
847+
static void validateDataStreamOptions(
848+
ProjectMetadata projectMetadata,
849+
String indexTemplateName,
850+
ComposableIndexTemplate template,
851+
DataStreamGlobalRetention globalRetention
852+
) {
843853
DataStreamOptions.Builder dataStreamOptionsBuilder = resolveDataStreamOptions(template, projectMetadata.componentTemplates());
844854
if (dataStreamOptionsBuilder != null) {
845855
if (template.getDataStreamTemplate() == null) {
@@ -849,6 +859,17 @@ static void validateDataStreamOptions(ProjectMetadata projectMetadata, String in
849859
+ "] specifies data stream options that can only be used in combination with a data stream"
850860
);
851861
}
862+
if (globalRetention != null) {
863+
// We cannot know for sure if the template will apply to internal data streams, so we use a simpler heuristic:
864+
// If all the index patterns start with a dot, we consider that all the connected data streams are internal.
865+
boolean isInternalDataStream = template.indexPatterns().stream().allMatch(indexPattern -> indexPattern.charAt(0) == '.');
866+
DataStreamOptions dataStreamOptions = dataStreamOptionsBuilder.build();
867+
if (dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().lifecycle() != null) {
868+
dataStreamOptions.failureStore()
869+
.lifecycle()
870+
.addWarningHeaderIfDataRetentionNotEffective(globalRetention, isInternalDataStream);
871+
}
872+
}
852873
}
853874
}
854875

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.stream.Stream;
3838

3939
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION;
40+
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_FAILURES_RETENTION;
4041
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION;
4142
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION;
4243
import static org.hamcrest.Matchers.containsString;
@@ -372,7 +373,7 @@ public void testEffectiveRetention() {
372373
false
373374
);
374375
assertThat(effectiveFailuresRetentionWithSource.v1(), equalTo(defaultRetention));
375-
assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
376+
assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_FAILURES_RETENTION));
376377
}
377378

378379
// With retention in the data stream lifecycle
@@ -477,7 +478,7 @@ public void testEffectiveRetention() {
477478
assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION));
478479
} else {
479480
assertThat(effectiveFailuresRetentionWithSource.v1(), equalTo(globalRetention.defaultRetention()));
480-
assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
481+
assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_FAILURES_RETENTION));
481482
}
482483

483484
// Now verify that internal data streams do not use global retention

0 commit comments

Comments
 (0)