diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 125d3fedec60b..7c0ccbfc116a9 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -1400,7 +1400,7 @@ public void testGetDataStream() throws Exception { assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW)); assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo")); assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue())); - assertThat(dataStream.getLifecycle(), is(lifecycle.toDataStreamLifecycle())); + assertThat(dataStream.getDataLifecycle(), is(lifecycle.toDataStreamLifecycle())); assertThat(metricsFooDataStream.templatePreferIlmValue(), is(true)); GetDataStreamAction.Response.IndexProperties indexProperties = metricsFooDataStream.getIndexSettingsValues() .get(dataStream.getWriteIndex()); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 29d5d13dea346..bf57bceab1b2f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -349,16 +349,22 @@ void run(ClusterState state) { int affectedDataStreams = 0; for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) { clearErrorStoreForUnmanagedIndices(dataStream); - if (dataStream.getLifecycle() == null) { + if (dataStream.getDataLifecycle() == null) { continue; } // the following indices should not be considered for the remainder of this service run, for various reasons. Set indicesToExcludeForRemainingRun = new HashSet<>(); - // This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed, - // depending on rollover criteria, for this reason we exclude it for the remaining run. - indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream)); + // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed, + // depending on rollover criteria, for this reason we exclude them for the remaining run. + indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false)); + if (DataStream.isFailureStoreFeatureFlagEnabled()) { + Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true); + if (failureStoreWriteIndex != null) { + indicesToExcludeForRemainingRun.add(failureStoreWriteIndex); + } + } // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes @@ -805,23 +811,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { } } - /** - * This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions - * apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps. - * @return the write index of this data stream before rollover was requested. - */ - private Set maybeExecuteRollover(ClusterState state, DataStream dataStream) { - Set currentRunWriteIndices = new HashSet<>(); - currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false)); - if (DataStream.isFailureStoreFeatureFlagEnabled()) { - Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true); - if (failureStoreWriteIndex != null) { - currentRunWriteIndices.add(failureStoreWriteIndex); - } - } - return currentRunWriteIndices; - } - @Nullable private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) { Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex(); @@ -830,10 +819,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo } try { if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata().getProject()::index)) { + DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle(); RolloverRequest rolloverRequest = getDefaultRolloverRequest( rolloverConfiguration, dataStream.getName(), - dataStream.getLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()), + lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()), rolloverFailureStore ); transportActionsDeduplicator.executeOnce( @@ -886,45 +876,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set indicesToExcludeForRemainingRun) { Metadata metadata = state.metadata(); DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get(); - List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention( + List backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention( + metadata.getProject()::index, + nowSupplier, + globalRetention + ); + List failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention( metadata.getProject()::index, nowSupplier, globalRetention ); - if (backingIndicesOlderThanRetention.isEmpty()) { + if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) { return Set.of(); } Set indicesToBeRemoved = new HashSet<>(); - // We know that there is lifecycle and retention because there are indices to be deleted - assert dataStream.getLifecycle() != null; - TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); - for (Index index : backingIndicesOlderThanRetention) { - if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata backingIndex = metadata.getProject().index(index); - assert backingIndex != null : "the data stream backing indices must exist"; - - IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); - // we don't want to delete the source index if they have an in-progress downsampling operation because the - // target downsample index will remain in the system as a standalone index - if (downsampleStatus == STARTED) { - // there's an opportunity here to cancel downsampling and delete the source index now - logger.trace( - "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " - + "because there's a downsampling operation currently in progress for this index. Current downsampling " - + "status is [{}]. When downsampling completes, DSL will delete this index.", - index.getName(), - effectiveDataRetention, - downsampleStatus - ); - } else { - // UNKNOWN is the default value, and has no real use. So index should be deleted - // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete + if (backingIndicesOlderThanRetention.isEmpty() == false) { + assert dataStream.getDataLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices"; + TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); + for (Index index : backingIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata backingIndex = metadata.getProject().index(index); + assert backingIndex != null : "the data stream backing indices must exist"; + + IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); + // we don't want to delete the source index if they have an in-progress downsampling operation because the + // target downsample index will remain in the system as a standalone index + if (downsampleStatus == STARTED) { + // there's an opportunity here to cancel downsampling and delete the source index now + logger.trace( + "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " + + "because there's a downsampling operation currently in progress for this index. Current downsampling " + + "status is [{}]. When downsampling completes, DSL will delete this index.", + index.getName(), + dataRetention, + downsampleStatus + ); + } else { + // UNKNOWN is the default value, and has no real use. So index should be deleted + // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete + indicesToBeRemoved.add(index); + + // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) + // let's start simple and reevaluate + String indexName = backingIndex.getIndex().getName(); + deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period"); + } + } + } + } + if (failureIndicesOlderThanRetention.isEmpty() == false) { + assert dataStream.getFailuresLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices"; + var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); + for (Index index : failureIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata failureIndex = metadata.getProject().index(index); + assert failureIndex != null : "the data stream failure indices must exist"; indicesToBeRemoved.add(index); - // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) // let's start simple and reevaluate - String indexName = backingIndex.getIndex().getName(); - deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period"); + String indexName = failureIndex.getIndex().getName(); + deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period"); } } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java index db7c70c127bb0..c7e098cdf74da 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java @@ -112,7 +112,7 @@ protected void masterOperation( idxMetadata.getCreationDate(), rolloverInfo == null ? null : rolloverInfo.getTime(), generationDate, - parentDataStream.getLifecycle(), + parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()), errorStore.getError(state.projectId(), index) ); explainIndices.add(explainIndexDataStreamLifecycle); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java index a7f05852ed87d..e564ddf8dddae 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java @@ -108,7 +108,7 @@ protected void localClusterStateOperation( .map( dataStream -> new GetDataStreamLifecycleAction.Response.DataStreamLifecycle( dataStream.getName(), - dataStream.getLifecycle(), + dataStream.getDataLifecycle(), dataStream.isSystem() ) ) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java index bc34d0cfed3b9..c81666873920d 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java @@ -78,7 +78,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ProjectMetadata project) Set indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(project.id()); List dataStreamStats = new ArrayList<>(); for (DataStream dataStream : project.dataStreams().values()) { - if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { int total = 0; int inError = 0; for (Index index : dataStream.getIndices()) { diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index c13f93acaf290..147c34222138e 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -380,9 +380,10 @@ public XContentBuilder toXContent( if (indexTemplate != null) { builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate); } - if (dataStream.getLifecycle() != null) { + if (dataStream.getDataLifecycle() != null) { builder.field(LIFECYCLE_FIELD.getPreferredName()); - dataStream.getLifecycle().toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal()); + dataStream.getDataLifecycle() + .toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal()); } if (ilmPolicyName != null) { builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName); @@ -471,7 +472,7 @@ private void addAutoShardingEvent(XContentBuilder builder, Params params, DataSt */ public ManagedBy getNextGenerationManagedBy() { // both ILM and DSL are configured so let's check the prefer_ilm setting to see which system takes precedence - if (ilmPolicyName != null && dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (ilmPolicyName != null && dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { return templatePreferIlmValue ? ManagedBy.ILM : ManagedBy.LIFECYCLE; } @@ -479,7 +480,7 @@ public ManagedBy getNextGenerationManagedBy() { return ManagedBy.ILM; } - if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { return ManagedBy.LIFECYCLE; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 75296fa2a352f..c09f1d48f1583 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -517,11 +517,37 @@ public IndexMode getIndexMode() { return indexMode; } + /** + * Retrieves the lifecycle configuration meant for the backing indices. + */ @Nullable - public DataStreamLifecycle getLifecycle() { + public DataStreamLifecycle getDataLifecycle() { return lifecycle; } + /** + * Retrieves the lifecycle configuration meant for the failure store. Currently, it's the same with {@link #getDataLifecycle()} + * but it will change. + */ + @Nullable + public DataStreamLifecycle getFailuresLifecycle() { + return lifecycle; + } + + /** + * Retrieves the correct lifecycle for the provided index. Returns null if the index does not belong to this data stream + */ + @Nullable + public DataStreamLifecycle getDataLifecycleForIndex(Index index) { + if (backingIndices.containsIndex(index.getName())) { + return getDataLifecycle(); + } + if (failureIndices.containsIndex(index.getName())) { + return getFailuresLifecycle(); + } + return null; + } + /** * Returns the latest auto sharding event that happened for this data stream */ @@ -935,24 +961,53 @@ private static boolean isAnyIndexMissing(List indices, Metadata.Builder b } /** - * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the configured - * retention in their lifecycle. + * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the + * configured retention in their lifecycle. + * NOTE that this specifically does not return the write index of the data stream as usually retention + * is treated differently for the write index (i.e. they first need to be rolled over) + */ + public List getBackingIndicesPastRetention( + Function indexMetadataSupplier, + LongSupplier nowSupplier, + DataStreamGlobalRetention globalRetention + ) { + if (getDataLifecycle() == null + || getDataLifecycle().enabled() == false + || getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) { + return List.of(); + } + + List indicesPastRetention = getNonWriteIndicesOlderThan( + getIndices(), + getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()), + indexMetadataSupplier, + this::isIndexManagedByDataStreamLifecycle, + nowSupplier + ); + return indicesPastRetention; + } + + /** + * Iterate over the failure indices and return the ones that are managed by the data stream lifecycle and past the + * configured retention in their lifecycle. * NOTE that this specifically does not return the write index of the data stream as usually retention * is treated differently for the write index (i.e. they first need to be rolled over) */ - public List getIndicesPastRetention( + public List getFailureIndicesPastRetention( Function indexMetadataSupplier, LongSupplier nowSupplier, DataStreamGlobalRetention globalRetention ) { - if (lifecycle == null - || lifecycle.enabled() == false - || lifecycle.getEffectiveDataRetention(globalRetention, isInternal()) == null) { + if (DataStream.isFailureStoreFeatureFlagEnabled() == false + || getFailuresLifecycle() == null + || getFailuresLifecycle().enabled() == false + || getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) { return List.of(); } List indicesPastRetention = getNonWriteIndicesOlderThan( - lifecycle.getEffectiveDataRetention(globalRetention, isInternal()), + getFailureIndices(), + getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()), indexMetadataSupplier, this::isIndexManagedByDataStreamLifecycle, nowSupplier @@ -998,37 +1053,28 @@ public List getDownsamplingRoundsFor( } /** - * Returns the non-write backing indices and failure store indices that are older than the provided age, + * Filters the given indices that are older than the provided age and populates olderIndices, * excluding the write indices. The index age is calculated from the rollover or index creation date (or * the origination date if present). If an indices predicate is provided the returned list of indices will * be filtered according to the predicate definition. This is useful for things like "return only - * the backing indices that are managed by the data stream lifecycle". + * the indices that are managed by the data stream lifecycle". */ - public List getNonWriteIndicesOlderThan( + private List getNonWriteIndicesOlderThan( + List indices, TimeValue retentionPeriod, Function indexMetadataSupplier, @Nullable Predicate indicesPredicate, LongSupplier nowSupplier ) { + if (indices.isEmpty()) { + return List.of(); + } List olderIndices = new ArrayList<>(); - for (Index index : backingIndices.getIndices()) { + for (Index index : indices) { if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) { olderIndices.add(index); } } - if (DataStream.isFailureStoreFeatureFlagEnabled() && failureIndices.getIndices().isEmpty() == false) { - for (Index index : failureIndices.getIndices()) { - if (isIndexOlderThan( - index, - retentionPeriod.getMillis(), - nowSupplier.getAsLong(), - indicesPredicate, - indexMetadataSupplier - )) { - olderIndices.add(index); - } - } - } return olderIndices; } @@ -1095,7 +1141,7 @@ private boolean isIndexManagedByDataStreamLifecycle(IndexMetadata indexMetadata) */ @Nullable public TimeValue getGenerationLifecycleDate(IndexMetadata indexMetadata) { - if (indexMetadata.getIndex().equals(getWriteIndex())) { + if (indexMetadata.getIndex().equals(getWriteIndex()) || indexMetadata.getIndex().equals(getWriteFailureIndex())) { return null; } Long originationDate = indexMetadata.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, null); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java index 85d8a81aa126d..5b8864309f7e1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java @@ -1079,7 +1079,12 @@ public boolean isIndexManagedByILM(IndexMetadata indexMetadata) { } DataStream parentDataStream = indexAbstraction.getParentDataStream(); - if (parentDataStream != null && parentDataStream.getLifecycle() != null && parentDataStream.getLifecycle().enabled()) { + // Only data streams can be managed by data stream lifecycle + if (parentDataStream == null) { + return true; + } + DataStreamLifecycle lifecycle = parentDataStream.getDataLifecycleForIndex(indexMetadata.getIndex()); + if (lifecycle != null && lifecycle.enabled()) { // index has both ILM and data stream lifecycle configured so let's check which is preferred return PREFER_ILM_SETTING.get(indexMetadata.getSettings()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java index acc2a56794d19..75b504e64117d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java @@ -148,7 +148,7 @@ public void testUpdatingLifecycleOnADataStream() { ProjectMetadata after = metadataDataStreamsService.updateDataLifecycle(before, List.of(dataStream), DataStreamLifecycle.DEFAULT); DataStream updatedDataStream = after.dataStreams().get(dataStream); assertNotNull(updatedDataStream); - assertThat(updatedDataStream.getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(updatedDataStream.getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); Map> responseHeaders = threadContext.getResponseHeaders(); assertThat(responseHeaders.size(), is(1)); assertThat( diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 2ed217ee336ee..84841d0caea07 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -49,10 +49,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; +import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomGlobalRetention; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomIndexInstances; @@ -99,7 +100,7 @@ protected DataStream mutateInstance(DataStream instance) { var isSystem = instance.isSystem(); var allowsCustomRouting = instance.isAllowCustomRouting(); var indexMode = instance.getIndexMode(); - var lifecycle = instance.getLifecycle(); + var lifecycle = instance.getDataLifecycle(); var dataStreamOptions = instance.getDataStreamOptions(); var failureIndices = instance.getFailureIndices(); var rolloverOnWrite = instance.rolloverOnWrite(); @@ -1235,15 +1236,30 @@ public void testGetGenerationLifecycleDate() { { // for a write index that has not been rolled over yet, we get null even if the index has an origination date long originTimeMillis = creationTimeMillis - 3000L; - IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1)) + IndexMetadata.Builder backingIndexMetadataBuilder = IndexMetadata.builder( + DataStream.getDefaultBackingIndexName(dataStreamName, 1) + ) .settings(settings(IndexVersion.current()).put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis)) .numberOfShards(1) .numberOfReplicas(1) .creationDate(creationTimeMillis); - IndexMetadata indexMetadata = indexMetaBuilder.build(); - DataStream dataStream = createDataStream(dataStreamName, List.of(indexMetadata.getIndex())); + IndexMetadata backingIndexMetadata = backingIndexMetadataBuilder.build(); + IndexMetadata.Builder failureIndexMetadataBuilder = IndexMetadata.builder( + DataStream.getDefaultBackingIndexName(dataStreamName, 1) + ) + .settings(settings(IndexVersion.current()).put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis)) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(creationTimeMillis); + IndexMetadata failureIndexMetadata = failureIndexMetadataBuilder.build(); + DataStream dataStream = createDataStream( + dataStreamName, + List.of(backingIndexMetadata.getIndex()), + List.of(failureIndexMetadata.getIndex()) + ); - assertNull(dataStream.getGenerationLifecycleDate(indexMetadata)); + assertNull(dataStream.getGenerationLifecycleDate(backingIndexMetadata)); + assertNull(dataStream.getGenerationLifecycleDate(failureIndexMetadata)); } { // If the index is not the write index and has origination date set, we get the origination date even if it has not been @@ -1323,107 +1339,109 @@ private DataStream createDataStream(String name, List indices) { .build(); } - public void testGetIndicesOlderThan() { + private DataStream createDataStream(String name, List backingIndices, List failureIndices) { + return DataStream.builder(name, backingIndices) + .setMetadata(Map.of()) + .setReplicated(randomBoolean()) + .setAllowCustomRouting(randomBoolean()) + .setIndexMode(IndexMode.STANDARD) + .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices).build()) + .build(); + } + + public void testGetBackingIndicesPastRetention() { String dataStreamName = "metrics-foo"; long now = System.currentTimeMillis(); List creationAndRolloverTimes = List.of( - DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), - DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), - DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), - DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), + DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000), + DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000), + DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000), + DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000), DataStreamMetadata.dataStreamMetadata(now, null) ); + { + { + // no lifecycle configured so we expect an empty list + Metadata.Builder builder = Metadata.builder(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + null + ); + Metadata metadata = builder.build(); + + assertThat( + dataStream.getBackingIndicesPastRetention(metadata.getProject()::index, () -> now, randomGlobalRetention()).isEmpty(), + is(true) + ); + } + } + Metadata.Builder builder = Metadata.builder(); + AtomicReference retention = new AtomicReference<>(); DataStream dataStream = createDataStream( builder, dataStreamName, creationAndRolloverTimes, settings(IndexVersion.current()), - new DataStreamLifecycle() + new DataStreamLifecycle() { + public TimeValue dataRetention() { + return retention.get(); + } + } ); Metadata metadata = builder.build(); + { - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(2500), + // Mix of indices younger and older than retention, data stream retention is effective retention + retention.set(TimeValue.timeValueSeconds(2500)); + List backingIndices = dataStream.getBackingIndicesPastRetention( metadata.getProject()::index, - null, - () -> now + () -> now, + randomBoolean() ? randomGlobalRetention() : null ); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } { - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(0), - metadata.getProject()::index, - null, - () -> now - ); + // All indices past retention, but we keep the write index + retention.set(TimeValue.timeValueSeconds(0)); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata.getProject()::index, () -> now, null); assertThat(backingIndices.size(), is(4)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3))); - assertThat(backingIndices.get(3).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 4))); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } { - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(6000), - metadata.getProject()::index, - null, - () -> now - ); + // All indices younger than retention + retention.set(TimeValue.timeValueSeconds(6000)); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata.getProject()::index, () -> now, null); assertThat(backingIndices.isEmpty(), is(true)); } { - Predicate genThreeAndFivePredicate = indexMetadata -> indexMetadata.getIndex().getName().endsWith("00003") - || indexMetadata.getIndex().getName().endsWith("00005"); - - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(0), - metadata.getProject()::index, - genThreeAndFivePredicate, - () -> now - ); + // Test predicate that influences which indices are candidates for a retention check + Function indexMetadataWithSomeLifecycleSupplier = indexName -> { + IndexMetadata indexMetadata = metadata.getProject().index(indexName); + if (indexName.endsWith("00003") || indexName.endsWith("00005")) { + return indexMetadata; + } + return IndexMetadata.builder(indexMetadata) + .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build()) + .build(); + }; + retention.set(TimeValue.timeValueSeconds(0)); + List backingIndices = dataStream.getBackingIndicesPastRetention(indexMetadataWithSomeLifecycleSupplier, () -> now, null); assertThat(backingIndices.size(), is(1)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3))); - } - - } - - public void testGetIndicesPastRetention() { - String dataStreamName = "metrics-foo"; - long now = System.currentTimeMillis(); - - List creationAndRolloverTimes = List.of( - DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000), - DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000), - DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000), - DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000), - DataStreamMetadata.dataStreamMetadata(now, null) - ); - - { - // no lifecycle configured so we expect an empty list - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - null - ); - Metadata metadata = builder.build(); - - assertThat( - dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, randomGlobalRetention()).isEmpty(), - is(true) - ); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(2).getName())); } { @@ -1432,131 +1450,186 @@ public void testGetIndicesPastRetention() { TimeValue.timeValueSeconds(2500), randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null ); - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - new DataStreamLifecycle() - ); - Metadata metadata = builder.build(); + retention.set(null); - List backingIndices = dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, globalRetention); + List backingIndices = dataStream.getBackingIndicesPastRetention( + metadata.getProject()::index, + () -> now, + globalRetention + ); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } { - // no retention configured but we have max retention + // no retention or too large retention configured and we have max retention DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500)); - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - new DataStreamLifecycle() + retention.set(randomBoolean() ? TimeValue.timeValueDays(6000) : null); + List backingIndices = dataStream.getBackingIndicesPastRetention( + metadata.getProject()::index, + () -> now, + globalRetention ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, globalRetention); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } { - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, + // no indices are returned as even though all pass retention age none are managed by data stream lifecycle + Metadata.Builder builderWithIlm = Metadata.builder(); + DataStream dataStreamWithIlm = createDataStream( + builderWithIlm, dataStreamName, creationAndRolloverTimes, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.timeValueSeconds(2500)).build() + settings(IndexVersion.current()).put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy"), + DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build() ); - Metadata metadata = builder.build(); + Metadata metadataWithIlm = builderWithIlm.build(); - List backingIndices = dataStream.getIndicesPastRetention( - metadata.getProject()::index, + List backingIndices = dataStreamWithIlm.getBackingIndicesPastRetention( + metadataWithIlm.getProject()::index, () -> now, randomGlobalRetention() ); - assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + assertThat(backingIndices.isEmpty(), is(true)); } + } + + public void testGetFailureIndicesPastRetention() { + String dataStreamName = "metrics-foo"; + long now = System.currentTimeMillis(); + + List creationAndRolloverTimes = List.of( + DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000), + DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000), + DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000), + DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000), + DataStreamMetadata.dataStreamMetadata(now, null) + ); { - // even though all indices match the write index should not be returned - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build() - ); - Metadata metadata = builder.build(); + { + // no lifecycle configured so we expect an empty list + Metadata.Builder builder = Metadata.builder(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + null + ); + Metadata metadata = builder.build(); - List backingIndices = dataStream.getIndicesPastRetention( + assertThat( + dataStream.getFailureIndicesPastRetention(metadata.getProject()::index, () -> now, randomGlobalRetention()).isEmpty(), + is(true) + ); + } + } + + Metadata.Builder builder = Metadata.builder(); + AtomicReference retention = new AtomicReference<>(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + new DataStreamLifecycle() { + public TimeValue dataRetention() { + return retention.get(); + } + } + ); + Metadata metadata = builder.build(); + + { + // Mix of indices younger and older than retention, data stream retention is effective retention + retention.set(TimeValue.timeValueSeconds(2500)); + List failureIndices = dataStream.getFailureIndicesPastRetention( metadata.getProject()::index, () -> now, - randomGlobalRetention() + randomBoolean() ? randomGlobalRetention() : null ); + assertThat(failureIndices.size(), is(2)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } + } - assertThat(backingIndices.size(), is(4)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); - assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName())); - assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName())); + { + // All indices past retention, but we keep the write index + retention.set(TimeValue.timeValueSeconds(0)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata.getProject()::index, () -> now, null); + assertThat(failureIndices.size(), is(4)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } { - // no index matches the retention age - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.timeValueSeconds(6000)).build() + // All indices younger than retention + retention.set(TimeValue.timeValueSeconds(6000)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata.getProject()::index, () -> now, null); + assertThat(failureIndices.isEmpty(), is(true)); + } + + { + // Test predicate that influences which indices are candidates for a retention check + Function indexMetadataWithSomeLifecycleSupplier = indexName -> { + IndexMetadata indexMetadata = metadata.getProject().index(indexName); + if (indexName.endsWith("00003") || indexName.endsWith("00005")) { + return indexMetadata; + } + return IndexMetadata.builder(indexMetadata) + .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build()) + .build(); + }; + retention.set(TimeValue.timeValueSeconds(0)); + List failureIndices = dataStream.getFailureIndicesPastRetention(indexMetadataWithSomeLifecycleSupplier, () -> now, null); + assertThat(failureIndices.size(), is(1)); + assertThat(failureIndices.get(0).getName(), is(dataStream.getFailureIndices().get(2).getName())); + } + + { + // no retention configured but we have default retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( + TimeValue.timeValueSeconds(2500), + randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null ); - Metadata metadata = builder.build(); + retention.set(null); - List backingIndices = dataStream.getIndicesPastRetention( + List failureIndices = dataStream.getFailureIndicesPastRetention( metadata.getProject()::index, () -> now, - randomGlobalRetention() + globalRetention ); - assertThat(backingIndices.isEmpty(), is(true)); + assertThat(failureIndices.size(), is(2)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } { - // no indices are returned as even though all pass retention age none are managed by data stream lifecycle - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - Settings.builder() - .put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy") - .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build() - ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention( + // no retention or too large retention configured and we have max retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500)); + retention.set(randomBoolean() ? TimeValue.timeValueDays(6000) : null); + List failureIndices = dataStream.getFailureIndicesPastRetention( metadata.getProject()::index, () -> now, - randomGlobalRetention() + globalRetention ); - assertThat(backingIndices.isEmpty(), is(true)); + assertThat(failureIndices.size(), is(2)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } } - public void testGetIndicesPastRetentionWithOriginationDate() { + public void testBackingIndicesPastRetentionWithOriginationDate() { // First, build an ordinary data stream: String dataStreamName = "metrics-foo"; long now = System.currentTimeMillis(); @@ -1586,13 +1659,13 @@ public TimeValue dataRetention() { { // no retention configured so we expect an empty list testRetentionReference.set(null); - assertThat(dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, null).isEmpty(), is(true)); + assertThat(dataStream.getBackingIndicesPastRetention(metadata.getProject()::index, () -> now, null).isEmpty(), is(true)); } { - // retention period where oldIndex is too old, but newIndex should be retained + // retention period where first and second index is too old, and 5th has old origination date. testRetentionReference.set(TimeValue.timeValueMillis(2500)); - List backingIndices = dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, null); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata.getProject()::index, () -> now, null); assertThat(backingIndices.size(), is(3)); assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); @@ -1600,24 +1673,61 @@ public TimeValue dataRetention() { } { - // even though all indices match the write index should not be returned - testRetentionReference.set(TimeValue.timeValueMillis(0)); - List backingIndices = dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, null); + // no index matches the retention age + testRetentionReference.set(TimeValue.timeValueMillis(9000)); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata.getProject()::index, () -> now, null); + assertThat(backingIndices.isEmpty(), is(true)); + } + } - assertThat(backingIndices.size(), is(6)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); - assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName())); - assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName())); - assertThat(backingIndices.get(4).getName(), is(dataStream.getIndices().get(4).getName())); - assertThat(backingIndices.get(5).getName(), is(dataStream.getIndices().get(5).getName())); + public void testFailureIndicesPastRetentionWithOriginationDate() { + // First, build an ordinary data stream: + String dataStreamName = "metrics-foo"; + long now = System.currentTimeMillis(); + List creationAndRolloverTimes = List.of( + DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), + DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), + DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), + DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), + DataStreamMetadata.dataStreamMetadata(now, null, now - 8000), // origination date older than retention + DataStreamMetadata.dataStreamMetadata(now, null, now - 1000), // origination date within retention + DataStreamMetadata.dataStreamMetadata(now, null) + ); + Metadata.Builder metadataBuilder = Metadata.builder(); + AtomicReference testRetentionReference = new AtomicReference<>(null); + DataStream dataStream = createDataStream( + metadataBuilder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + new DataStreamLifecycle() { + public TimeValue dataRetention() { + return testRetentionReference.get(); + } + } + ); + Metadata metadata = metadataBuilder.build(); + { + // no retention configured so we expect an empty list + testRetentionReference.set(null); + assertThat(dataStream.getFailureIndicesPastRetention(metadata.getProject()::index, () -> now, null).isEmpty(), is(true)); + } + + { + // retention period where first and second index is too old, and 5th has old origination date. + testRetentionReference.set(TimeValue.timeValueMillis(2500)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata.getProject()::index, () -> now, null); + assertThat(failureIndices.size(), is(3)); + assertThat(failureIndices.get(0).getName(), is(dataStream.getFailureIndices().get(0).getName())); + assertThat(failureIndices.get(1).getName(), is(dataStream.getFailureIndices().get(1).getName())); + assertThat(failureIndices.get(2).getName(), is(dataStream.getFailureIndices().get(5).getName())); } { // no index matches the retention age testRetentionReference.set(TimeValue.timeValueMillis(9000)); - List backingIndices = dataStream.getIndicesPastRetention(metadata.getProject()::index, () -> now, null); - assertThat(backingIndices.isEmpty(), is(true)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata.getProject()::index, () -> now, null); + assertThat(failureIndices.isEmpty(), is(true)); } } @@ -1884,55 +1994,48 @@ public void testIsIndexManagedByDataStreamLifecycle() { } } - public void testGetIndicesOlderThanWithOriginationDate() { - // First, build an ordinary datastream: - String dataStreamName = "metrics-foo"; - long now = System.currentTimeMillis(); - List creationAndRolloverTimes = List.of( - DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), - DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), - DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), - DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), - DataStreamMetadata.dataStreamMetadata(now, null, now - 7000), // origination date older than retention - DataStreamMetadata.dataStreamMetadata(now, null, now - 1000), // origination date within retention - DataStreamMetadata.dataStreamMetadata(now, null, now - 7000) // write index origination date older than retention - ); - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( + private DataStream createDataStream( + Metadata.Builder builder, + String dataStreamName, + List creationAndRolloverTimes, + Settings.Builder backingIndicesSettings, + @Nullable DataStreamLifecycle lifecycle + ) { + int backingIndicesCount = creationAndRolloverTimes.size(); + final List backingIndices = createDataStreamIndices( builder, dataStreamName, creationAndRolloverTimes, - settings(IndexVersion.current()), - new DataStreamLifecycle() + backingIndicesSettings, + backingIndicesCount, + false ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(2500), - metadata.getProject()::index, - null, - () -> now + final List failureIndices = createDataStreamIndices( + builder, + dataStreamName, + creationAndRolloverTimes, + backingIndicesSettings, + backingIndicesCount, + true ); - // We expect to see the index with the really old origination date, but not the one with the more recent origination date (and - // not the write index) - assertThat(backingIndices.size(), is(3)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 6))); + return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle, failureIndices); } - private DataStream createDataStream( + private static List createDataStreamIndices( Metadata.Builder builder, String dataStreamName, List creationAndRolloverTimes, Settings.Builder backingIndicesSettings, - @Nullable DataStreamLifecycle lifecycle + int backingIndicesCount, + boolean isFailureStore ) { - int backingIndicesCount = creationAndRolloverTimes.size(); - final List backingIndices = new ArrayList<>(); + List indices = new ArrayList<>(backingIndicesCount); for (int k = 1; k <= backingIndicesCount; k++) { DataStreamMetadata creationRolloverTime = creationAndRolloverTimes.get(k - 1); - IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k)) + String indexName = isFailureStore + ? getDefaultFailureStoreName(dataStreamName, k, System.currentTimeMillis()) + : DataStream.getDefaultBackingIndexName(dataStreamName, k); + IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexName) .settings(backingIndicesSettings) .numberOfShards(1) .numberOfReplicas(1) @@ -1948,12 +2051,15 @@ private DataStream createDataStream( Long originationTimeInMillis = creationRolloverTime.originationTimeInMillis; if (originationTimeInMillis != null) { backingIndicesSettings.put(LIFECYCLE_ORIGINATION_DATE, originationTimeInMillis); + } else { + // We reuse the backingIndicesSettings, so it's important to reset it + backingIndicesSettings.putNull(LIFECYCLE_ORIGINATION_DATE); } IndexMetadata indexMetadata = indexMetaBuilder.build(); builder.put(indexMetadata, false); - backingIndices.add(indexMetadata.getIndex()); + indices.add(indexMetadata.getIndex()); } - return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle); + return indices; } public void testXContentSerializationWithRolloverAndEffectiveRetention() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index 24090c378dc5d..7557f84eaac83 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -80,7 +80,7 @@ public void testCreateDataStream() throws Exception { assertThat(project.dataStreams().get(dataStreamName).isSystem(), is(false)); assertThat(project.dataStreams().get(dataStreamName).isHidden(), is(false)); assertThat(project.dataStreams().get(dataStreamName).isReplicated(), is(false)); - assertThat(project.dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(project.dataStreams().get(dataStreamName).getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); assertThat(project.dataStreams().get(dataStreamName).getIndexMode(), nullValue()); assertThat(project.index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat( @@ -119,7 +119,7 @@ public void testCreateDataStreamLogsdb() throws Exception { assertThat(project.dataStreams().get(dataStreamName).isHidden(), is(false)); assertThat(project.dataStreams().get(dataStreamName).isReplicated(), is(false)); assertThat(project.dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.LOGSDB)); - assertThat(project.dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(project.dataStreams().get(dataStreamName).getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); assertThat(project.index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat( project.index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index f55ce269f817e..1d650175430ed 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -432,7 +432,7 @@ public void testUpdateLifecycle() { ProjectMetadata after = service.updateDataLifecycle(before, List.of(dataStream), null); DataStream updatedDataStream = after.dataStreams().get(dataStream); assertNotNull(updatedDataStream); - assertThat(updatedDataStream.getLifecycle(), nullValue()); + assertThat(updatedDataStream.getDataLifecycle(), nullValue()); before = after; } @@ -441,7 +441,7 @@ public void testUpdateLifecycle() { ProjectMetadata after = service.updateDataLifecycle(before, List.of(dataStream), lifecycle); DataStream updatedDataStream = after.dataStreams().get(dataStream); assertNotNull(updatedDataStream); - assertThat(updatedDataStream.getLifecycle(), equalTo(lifecycle)); + assertThat(updatedDataStream.getDataLifecycle(), equalTo(lifecycle)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java index f8007503eb6ca..4b110c1c130c9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java @@ -80,14 +80,14 @@ public static DataStreamLifecycleFeatureSetUsage.LifecycleStats calculateStats( LongSummaryStatistics effectiveRetentionStats = new LongSummaryStatistics(); for (DataStream dataStream : dataStreams) { - if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { dataStreamsWithLifecycles++; // Track data retention - if (dataStream.getLifecycle().dataRetention() != null) { - dataRetentionStats.accept(dataStream.getLifecycle().dataRetention().getMillis()); + if (dataStream.getDataLifecycle().dataRetention() != null) { + dataRetentionStats.accept(dataStream.getDataLifecycle().dataRetention().getMillis()); } // Track effective retention - Tuple effectiveDataRetentionWithSource = dataStream.getLifecycle() + Tuple effectiveDataRetentionWithSource = dataStream.getDataLifecycle() .getEffectiveDataRetentionWithSource(globalRetention, dataStream.isInternal()); // Track global retention usage