diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 4657b559e5a8e..a220182180c9e 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -1388,7 +1388,7 @@ public void testGetDataStream() throws Exception { assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW)); assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo")); assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue())); - assertThat(dataStream.getLifecycle(), is(lifecycle.toDataStreamLifecycle())); + assertThat(dataStream.getDataLifecycle(), is(lifecycle.toDataStreamLifecycle())); assertThat(metricsFooDataStream.templatePreferIlmValue(), is(true)); GetDataStreamAction.Response.IndexProperties indexProperties = metricsFooDataStream.getIndexSettingsValues() .get(dataStream.getWriteIndex()); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index aa0e808b470af..ee83cb13af294 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -347,16 +347,22 @@ void run(ClusterState state) { int affectedDataStreams = 0; for (DataStream dataStream : state.metadata().dataStreams().values()) { clearErrorStoreForUnmanagedIndices(dataStream); - if (dataStream.getLifecycle() == null) { + if (dataStream.getDataLifecycle() == null) { continue; } // the following indices should not be considered for the remainder of this service run, for various reasons. Set indicesToExcludeForRemainingRun = new HashSet<>(); - // This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed, - // depending on rollover criteria, for this reason we exclude it for the remaining run. - indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream)); + // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed, + // depending on rollover criteria, for this reason we exclude them for the remaining run. + indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false)); + if (DataStream.isFailureStoreFeatureFlagEnabled()) { + Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true); + if (failureStoreWriteIndex != null) { + indicesToExcludeForRemainingRun.add(failureStoreWriteIndex); + } + } // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes @@ -799,23 +805,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { } } - /** - * This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions - * apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps. - * @return the write index of this data stream before rollover was requested. - */ - private Set maybeExecuteRollover(ClusterState state, DataStream dataStream) { - Set currentRunWriteIndices = new HashSet<>(); - currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false)); - if (DataStream.isFailureStoreFeatureFlagEnabled()) { - Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true); - if (failureStoreWriteIndex != null) { - currentRunWriteIndices.add(failureStoreWriteIndex); - } - } - return currentRunWriteIndices; - } - @Nullable private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) { Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex(); @@ -824,10 +813,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo } try { if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata()::index)) { + DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle(); RolloverRequest rolloverRequest = getDefaultRolloverRequest( rolloverConfiguration, dataStream.getName(), - dataStream.getLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()), + lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()), rolloverFailureStore ); transportActionsDeduplicator.executeOnce( @@ -880,41 +870,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set indicesToExcludeForRemainingRun) { Metadata metadata = state.metadata(); DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get(); - List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention); - if (backingIndicesOlderThanRetention.isEmpty()) { + List backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention( + metadata::index, + nowSupplier, + globalRetention + ); + List failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention( + metadata::index, + nowSupplier, + globalRetention + ); + if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) { return Set.of(); } Set indicesToBeRemoved = new HashSet<>(); - // We know that there is lifecycle and retention because there are indices to be deleted - assert dataStream.getLifecycle() != null; - TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); - for (Index index : backingIndicesOlderThanRetention) { - if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata backingIndex = metadata.index(index); - assert backingIndex != null : "the data stream backing indices must exist"; - - IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); - // we don't want to delete the source index if they have an in-progress downsampling operation because the - // target downsample index will remain in the system as a standalone index - if (downsampleStatus == STARTED) { - // there's an opportunity here to cancel downsampling and delete the source index now - logger.trace( - "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " - + "because there's a downsampling operation currently in progress for this index. Current downsampling " - + "status is [{}]. When downsampling completes, DSL will delete this index.", - index.getName(), - effectiveDataRetention, - downsampleStatus - ); - } else { - // UNKNOWN is the default value, and has no real use. So index should be deleted - // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete + if (backingIndicesOlderThanRetention.isEmpty() == false) { + assert dataStream.getDataLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices"; + TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); + for (Index index : backingIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata backingIndex = metadata.index(index); + assert backingIndex != null : "the data stream backing indices must exist"; + + IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); + // we don't want to delete the source index if they have an in-progress downsampling operation because the + // target downsample index will remain in the system as a standalone index + if (downsampleStatus == STARTED) { + // there's an opportunity here to cancel downsampling and delete the source index now + logger.trace( + "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " + + "because there's a downsampling operation currently in progress for this index. Current downsampling " + + "status is [{}]. When downsampling completes, DSL will delete this index.", + index.getName(), + dataRetention, + downsampleStatus + ); + } else { + // UNKNOWN is the default value, and has no real use. So index should be deleted + // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete + indicesToBeRemoved.add(index); + + // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) + // let's start simple and reevaluate + String indexName = backingIndex.getIndex().getName(); + deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period"); + } + } + } + } + if (failureIndicesOlderThanRetention.isEmpty() == false) { + assert dataStream.getFailuresLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices"; + var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); + for (Index index : failureIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata failureIndex = metadata.index(index); + assert failureIndex != null : "the data stream failure indices must exist"; indicesToBeRemoved.add(index); - // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) // let's start simple and reevaluate - String indexName = backingIndex.getIndex().getName(); - deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period"); + String indexName = failureIndex.getIndex().getName(); + deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period"); } } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java index 84fc4ba29c9ff..67152a0d324f3 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java @@ -109,7 +109,7 @@ protected void masterOperation( idxMetadata.getCreationDate(), rolloverInfo == null ? null : rolloverInfo.getTime(), generationDate, - parentDataStream.getLifecycle(), + parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()), errorStore.getError(index) ); explainIndices.add(explainIndexDataStreamLifecycle); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java index dab43f00b910f..deaf1b0a9601d 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java @@ -91,7 +91,7 @@ protected void masterOperation( .map( dataStream -> new GetDataStreamLifecycleAction.Response.DataStreamLifecycle( dataStream.getName(), - dataStream.getLifecycle(), + dataStream.getDataLifecycle(), dataStream.isSystem() ) ) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java index 924b61a2b8864..7aa084c530d91 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java @@ -76,7 +76,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) { Set indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(); List dataStreamStats = new ArrayList<>(); for (DataStream dataStream : state.metadata().dataStreams().values()) { - if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { int total = 0; int inError = 0; for (Index index : dataStream.getIndices()) { diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 775effdf0ab00..189e426403894 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -381,9 +381,10 @@ public XContentBuilder toXContent( if (indexTemplate != null) { builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate); } - if (dataStream.getLifecycle() != null) { + if (dataStream.getDataLifecycle() != null) { builder.field(LIFECYCLE_FIELD.getPreferredName()); - dataStream.getLifecycle().toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal()); + dataStream.getDataLifecycle() + .toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal()); } if (ilmPolicyName != null) { builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName); @@ -468,7 +469,7 @@ private void addAutoShardingEvent(XContentBuilder builder, Params params, DataSt */ public ManagedBy getNextGenerationManagedBy() { // both ILM and DSL are configured so let's check the prefer_ilm setting to see which system takes precedence - if (ilmPolicyName != null && dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (ilmPolicyName != null && dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { return templatePreferIlmValue ? ManagedBy.ILM : ManagedBy.LIFECYCLE; } @@ -476,7 +477,7 @@ public ManagedBy getNextGenerationManagedBy() { return ManagedBy.ILM; } - if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { return ManagedBy.LIFECYCLE; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index c0a83bfcf5e6e..97f2503eb9011 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -510,11 +510,37 @@ public IndexMode getIndexMode() { return indexMode; } + /** + * Retrieves the lifecycle configuration meant for the backing indices. + */ @Nullable - public DataStreamLifecycle getLifecycle() { + public DataStreamLifecycle getDataLifecycle() { return lifecycle; } + /** + * Retrieves the lifecycle configuration meant for the failure store. Currently, it's the same with {@link #getDataLifecycle()} + * but it will change. + */ + @Nullable + public DataStreamLifecycle getFailuresLifecycle() { + return lifecycle; + } + + /** + * Retrieves the correct lifecycle for the provided index. Returns null if the index does not belong to this data stream + */ + @Nullable + public DataStreamLifecycle getDataLifecycleForIndex(Index index) { + if (backingIndices.containsIndex(index.getName())) { + return getDataLifecycle(); + } + if (failureIndices.containsIndex(index.getName())) { + return getFailuresLifecycle(); + } + return null; + } + /** * Returns the latest auto sharding event that happened for this data stream */ @@ -928,24 +954,53 @@ private static boolean isAnyIndexMissing(List indices, Metadata.Builder b } /** - * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the configured - * retention in their lifecycle. + * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the + * configured retention in their lifecycle. + * NOTE that this specifically does not return the write index of the data stream as usually retention + * is treated differently for the write index (i.e. they first need to be rolled over) + */ + public List getBackingIndicesPastRetention( + Function indexMetadataSupplier, + LongSupplier nowSupplier, + DataStreamGlobalRetention globalRetention + ) { + if (getDataLifecycle() == null + || getDataLifecycle().enabled() == false + || getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) { + return List.of(); + } + + List indicesPastRetention = getNonWriteIndicesOlderThan( + getIndices(), + getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()), + indexMetadataSupplier, + this::isIndexManagedByDataStreamLifecycle, + nowSupplier + ); + return indicesPastRetention; + } + + /** + * Iterate over the failure indices and return the ones that are managed by the data stream lifecycle and past the + * configured retention in their lifecycle. * NOTE that this specifically does not return the write index of the data stream as usually retention * is treated differently for the write index (i.e. they first need to be rolled over) */ - public List getIndicesPastRetention( + public List getFailureIndicesPastRetention( Function indexMetadataSupplier, LongSupplier nowSupplier, DataStreamGlobalRetention globalRetention ) { - if (lifecycle == null - || lifecycle.enabled() == false - || lifecycle.getEffectiveDataRetention(globalRetention, isInternal()) == null) { + if (DataStream.isFailureStoreFeatureFlagEnabled() == false + || getFailuresLifecycle() == null + || getFailuresLifecycle().enabled() == false + || getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) { return List.of(); } List indicesPastRetention = getNonWriteIndicesOlderThan( - lifecycle.getEffectiveDataRetention(globalRetention, isInternal()), + getFailureIndices(), + getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()), indexMetadataSupplier, this::isIndexManagedByDataStreamLifecycle, nowSupplier @@ -991,37 +1046,28 @@ public List getDownsamplingRoundsFor( } /** - * Returns the non-write backing indices and failure store indices that are older than the provided age, + * Filters the given indices that are older than the provided age and populates olderIndices, * excluding the write indices. The index age is calculated from the rollover or index creation date (or * the origination date if present). If an indices predicate is provided the returned list of indices will * be filtered according to the predicate definition. This is useful for things like "return only - * the backing indices that are managed by the data stream lifecycle". + * the indices that are managed by the data stream lifecycle". */ - public List getNonWriteIndicesOlderThan( + private List getNonWriteIndicesOlderThan( + List indices, TimeValue retentionPeriod, Function indexMetadataSupplier, @Nullable Predicate indicesPredicate, LongSupplier nowSupplier ) { + if (indices.isEmpty()) { + return List.of(); + } List olderIndices = new ArrayList<>(); - for (Index index : backingIndices.getIndices()) { + for (Index index : indices) { if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) { olderIndices.add(index); } } - if (DataStream.isFailureStoreFeatureFlagEnabled() && failureIndices.getIndices().isEmpty() == false) { - for (Index index : failureIndices.getIndices()) { - if (isIndexOlderThan( - index, - retentionPeriod.getMillis(), - nowSupplier.getAsLong(), - indicesPredicate, - indexMetadataSupplier - )) { - olderIndices.add(index); - } - } - } return olderIndices; } @@ -1088,7 +1134,7 @@ private boolean isIndexManagedByDataStreamLifecycle(IndexMetadata indexMetadata) */ @Nullable public TimeValue getGenerationLifecycleDate(IndexMetadata indexMetadata) { - if (indexMetadata.getIndex().equals(getWriteIndex())) { + if (indexMetadata.getIndex().equals(getWriteIndex()) || indexMetadata.getIndex().equals(getWriteFailureIndex())) { return null; } Long originationDate = indexMetadata.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, null); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index d00d677be1e26..ccf5d4b31f002 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -1370,7 +1370,7 @@ public boolean isIndexManagedByILM(IndexMetadata indexMetadata) { } DataStream parentDataStream = indexAbstraction.getParentDataStream(); - if (parentDataStream != null && parentDataStream.getLifecycle() != null && parentDataStream.getLifecycle().enabled()) { + if (parentDataStream != null && parentDataStream.getDataLifecycle() != null && parentDataStream.getDataLifecycle().enabled()) { // index has both ILM and data stream lifecycle configured so let's check which is preferred return PREFER_ILM_SETTING.get(indexMetadata.getSettings()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java index 521c96c84781c..3629c85fbaa21 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java @@ -147,7 +147,7 @@ public void testUpdatingLifecycleOnADataStream() { ClusterState after = metadataDataStreamsService.updateDataLifecycle(before, List.of(dataStream), DataStreamLifecycle.DEFAULT); DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream); assertNotNull(updatedDataStream); - assertThat(updatedDataStream.getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(updatedDataStream.getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); Map> responseHeaders = threadContext.getResponseHeaders(); assertThat(responseHeaders.size(), is(1)); assertThat( diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 60314b344583f..259c7867c5a0d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -49,10 +49,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; +import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomGlobalRetention; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomIndexInstances; @@ -99,7 +100,7 @@ protected DataStream mutateInstance(DataStream instance) { var isSystem = instance.isSystem(); var allowsCustomRouting = instance.isAllowCustomRouting(); var indexMode = instance.getIndexMode(); - var lifecycle = instance.getLifecycle(); + var lifecycle = instance.getDataLifecycle(); var dataStreamOptions = instance.getDataStreamOptions(); var failureIndices = instance.getFailureIndices(); var rolloverOnWrite = instance.rolloverOnWrite(); @@ -1218,15 +1219,30 @@ public void testGetGenerationLifecycleDate() { { // for a write index that has not been rolled over yet, we get null even if the index has an origination date long originTimeMillis = creationTimeMillis - 3000L; - IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1)) + IndexMetadata.Builder backingIndexMetadataBuilder = IndexMetadata.builder( + DataStream.getDefaultBackingIndexName(dataStreamName, 1) + ) .settings(settings(IndexVersion.current()).put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis)) .numberOfShards(1) .numberOfReplicas(1) .creationDate(creationTimeMillis); - IndexMetadata indexMetadata = indexMetaBuilder.build(); - DataStream dataStream = createDataStream(dataStreamName, List.of(indexMetadata.getIndex())); + IndexMetadata backingIndexMetadata = backingIndexMetadataBuilder.build(); + IndexMetadata.Builder failureIndexMetadataBuilder = IndexMetadata.builder( + DataStream.getDefaultBackingIndexName(dataStreamName, 1) + ) + .settings(settings(IndexVersion.current()).put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis)) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(creationTimeMillis); + IndexMetadata failureIndexMetadata = failureIndexMetadataBuilder.build(); + DataStream dataStream = createDataStream( + dataStreamName, + List.of(backingIndexMetadata.getIndex()), + List.of(failureIndexMetadata.getIndex()) + ); - assertNull(dataStream.getGenerationLifecycleDate(indexMetadata)); + assertNull(dataStream.getGenerationLifecycleDate(backingIndexMetadata)); + assertNull(dataStream.getGenerationLifecycleDate(failureIndexMetadata)); } { // If the index is not the write index and has origination date set, we get the origination date even if it has not been @@ -1306,80 +1322,159 @@ private DataStream createDataStream(String name, List indices) { .build(); } - public void testGetIndicesOlderThan() { + private DataStream createDataStream(String name, List backingIndices, List failureIndices) { + return DataStream.builder(name, backingIndices) + .setMetadata(Map.of()) + .setReplicated(randomBoolean()) + .setAllowCustomRouting(randomBoolean()) + .setIndexMode(IndexMode.STANDARD) + .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices).build()) + .build(); + } + + public void testGetBackingIndicesPastRetention() { String dataStreamName = "metrics-foo"; long now = System.currentTimeMillis(); List creationAndRolloverTimes = List.of( - DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), - DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), - DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), - DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), + DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000), + DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000), + DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000), + DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000), DataStreamMetadata.dataStreamMetadata(now, null) ); + { + { + // no lifecycle configured so we expect an empty list + Metadata.Builder builder = Metadata.builder(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + null + ); + Metadata metadata = builder.build(); + + assertThat( + dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(), + is(true) + ); + } + } + Metadata.Builder builder = Metadata.builder(); + AtomicReference retention = new AtomicReference<>(); DataStream dataStream = createDataStream( builder, dataStreamName, creationAndRolloverTimes, settings(IndexVersion.current()), - new DataStreamLifecycle() + new DataStreamLifecycle() { + public TimeValue dataRetention() { + return retention.get(); + } + } ); Metadata metadata = builder.build(); + { - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(2500), + // Mix of indices younger and older than retention, data stream retention is effective retention + retention.set(TimeValue.timeValueSeconds(2500)); + List backingIndices = dataStream.getBackingIndicesPastRetention( metadata::index, - null, - () -> now + () -> now, + randomBoolean() ? randomGlobalRetention() : null ); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } { - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(0), - metadata::index, - null, - () -> now - ); + // All indices past retention, but we keep the write index + retention.set(TimeValue.timeValueSeconds(0)); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.size(), is(4)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3))); - assertThat(backingIndices.get(3).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 4))); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } { - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(6000), - metadata::index, - null, - () -> now - ); + // All indices younger than retention + retention.set(TimeValue.timeValueSeconds(6000)); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.isEmpty(), is(true)); } { - Predicate genThreeAndFivePredicate = indexMetadata -> indexMetadata.getIndex().getName().endsWith("00003") - || indexMetadata.getIndex().getName().endsWith("00005"); + // Test predicate that influences which indices are candidates for a retention check + Function indexMetadataWithSomeLifecycleSupplier = indexName -> { + IndexMetadata indexMetadata = metadata.index(indexName); + if (indexName.endsWith("00003") || indexName.endsWith("00005")) { + return indexMetadata; + } + return IndexMetadata.builder(indexMetadata) + .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build()) + .build(); + }; + retention.set(TimeValue.timeValueSeconds(0)); + List backingIndices = dataStream.getBackingIndicesPastRetention(indexMetadataWithSomeLifecycleSupplier, () -> now, null); + assertThat(backingIndices.size(), is(1)); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(2).getName())); + } - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(0), - metadata::index, - genThreeAndFivePredicate, - () -> now + { + // no retention configured but we have default retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( + TimeValue.timeValueSeconds(2500), + randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null ); - assertThat(backingIndices.size(), is(1)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3))); + retention.set(null); + + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, globalRetention); + assertThat(backingIndices.size(), is(2)); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } + } + + { + // no retention or too large retention configured and we have max retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500)); + retention.set(randomBoolean() ? TimeValue.timeValueDays(6000) : null); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, globalRetention); + assertThat(backingIndices.size(), is(2)); + for (int i = 0; i < backingIndices.size(); i++) { + assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName())); + } } + { + // no indices are returned as even though all pass retention age none are managed by data stream lifecycle + Metadata.Builder builderWithIlm = Metadata.builder(); + DataStream dataStreamWithIlm = createDataStream( + builderWithIlm, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()).put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy"), + DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build() + ); + Metadata metadataWithIlm = builderWithIlm.build(); + + List backingIndices = dataStreamWithIlm.getBackingIndicesPastRetention( + metadataWithIlm::index, + () -> now, + randomGlobalRetention() + ); + assertThat(backingIndices.isEmpty(), is(true)); + } } - public void testGetIndicesPastRetention() { + public void testGetFailureIndicesPastRetention() { String dataStreamName = "metrics-foo"; long now = System.currentTimeMillis(); @@ -1392,135 +1487,116 @@ public void testGetIndicesPastRetention() { ); { - // no lifecycle configured so we expect an empty list - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - null - ); - Metadata metadata = builder.build(); + { + // no lifecycle configured so we expect an empty list + Metadata.Builder builder = Metadata.builder(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + null + ); + Metadata metadata = builder.build(); - assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(), is(true)); + assertThat( + dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(), + is(true) + ); + } } + Metadata.Builder builder = Metadata.builder(); + AtomicReference retention = new AtomicReference<>(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + new DataStreamLifecycle() { + public TimeValue dataRetention() { + return retention.get(); + } + } + ); + Metadata metadata = builder.build(); + { - // no retention configured but we have default retention - DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( - TimeValue.timeValueSeconds(2500), - randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null - ); - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - new DataStreamLifecycle() + // Mix of indices younger and older than retention, data stream retention is effective retention + retention.set(TimeValue.timeValueSeconds(2500)); + List failureIndices = dataStream.getFailureIndicesPastRetention( + metadata::index, + () -> now, + randomBoolean() ? randomGlobalRetention() : null ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention); - assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + assertThat(failureIndices.size(), is(2)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } { - // no retention configured but we have max retention - DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500)); - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - new DataStreamLifecycle() - ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention); - assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + // All indices past retention, but we keep the write index + retention.set(TimeValue.timeValueSeconds(0)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null); + assertThat(failureIndices.size(), is(4)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } { - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.timeValueSeconds(2500)).build() - ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); - assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + // All indices younger than retention + retention.set(TimeValue.timeValueSeconds(6000)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null); + assertThat(failureIndices.isEmpty(), is(true)); } { - // even though all indices match the write index should not be returned - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build() - ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); - - assertThat(backingIndices.size(), is(4)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); - assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName())); - assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName())); + // Test predicate that influences which indices are candidates for a retention check + Function indexMetadataWithSomeLifecycleSupplier = indexName -> { + IndexMetadata indexMetadata = metadata.index(indexName); + if (indexName.endsWith("00003") || indexName.endsWith("00005")) { + return indexMetadata; + } + return IndexMetadata.builder(indexMetadata) + .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build()) + .build(); + }; + retention.set(TimeValue.timeValueSeconds(0)); + List failureIndices = dataStream.getFailureIndicesPastRetention(indexMetadataWithSomeLifecycleSupplier, () -> now, null); + assertThat(failureIndices.size(), is(1)); + assertThat(failureIndices.get(0).getName(), is(dataStream.getFailureIndices().get(2).getName())); } { - // no index matches the retention age - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.timeValueSeconds(6000)).build() + // no retention configured but we have default retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( + TimeValue.timeValueSeconds(2500), + randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null ); - Metadata metadata = builder.build(); + retention.set(null); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); - assertThat(backingIndices.isEmpty(), is(true)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, globalRetention); + assertThat(failureIndices.size(), is(2)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } { - // no indices are returned as even though all pass retention age none are managed by data stream lifecycle - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - creationAndRolloverTimes, - Settings.builder() - .put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy") - .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build() - ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); - assertThat(backingIndices.isEmpty(), is(true)); + // no retention or too large retention configured and we have max retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500)); + retention.set(randomBoolean() ? TimeValue.timeValueDays(6000) : null); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, globalRetention); + assertThat(failureIndices.size(), is(2)); + for (int i = 0; i < failureIndices.size(); i++) { + assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName())); + } } } - public void testGetIndicesPastRetentionWithOriginationDate() { + public void testBackingIndicesPastRetentionWithOriginationDate() { // First, build an ordinary data stream: String dataStreamName = "metrics-foo"; long now = System.currentTimeMillis(); @@ -1550,13 +1626,13 @@ public TimeValue dataRetention() { { // no retention configured so we expect an empty list testRetentionReference.set(null); - assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true)); + assertThat(dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true)); } { - // retention period where oldIndex is too old, but newIndex should be retained + // retention period where first and second index is too old, and 5th has old origination date. testRetentionReference.set(TimeValue.timeValueMillis(2500)); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.size(), is(3)); assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); @@ -1564,24 +1640,61 @@ public TimeValue dataRetention() { } { - // even though all indices match the write index should not be returned - testRetentionReference.set(TimeValue.timeValueMillis(0)); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); + // no index matches the retention age + testRetentionReference.set(TimeValue.timeValueMillis(9000)); + List backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null); + assertThat(backingIndices.isEmpty(), is(true)); + } + } - assertThat(backingIndices.size(), is(6)); - assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); - assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); - assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName())); - assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName())); - assertThat(backingIndices.get(4).getName(), is(dataStream.getIndices().get(4).getName())); - assertThat(backingIndices.get(5).getName(), is(dataStream.getIndices().get(5).getName())); + public void testFailureIndicesPastRetentionWithOriginationDate() { + // First, build an ordinary data stream: + String dataStreamName = "metrics-foo"; + long now = System.currentTimeMillis(); + List creationAndRolloverTimes = List.of( + DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), + DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), + DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), + DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), + DataStreamMetadata.dataStreamMetadata(now, null, now - 8000), // origination date older than retention + DataStreamMetadata.dataStreamMetadata(now, null, now - 1000), // origination date within retention + DataStreamMetadata.dataStreamMetadata(now, null) + ); + Metadata.Builder metadataBuilder = Metadata.builder(); + AtomicReference testRetentionReference = new AtomicReference<>(null); + DataStream dataStream = createDataStream( + metadataBuilder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + new DataStreamLifecycle() { + public TimeValue dataRetention() { + return testRetentionReference.get(); + } + } + ); + Metadata metadata = metadataBuilder.build(); + { + // no retention configured so we expect an empty list + testRetentionReference.set(null); + assertThat(dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true)); + } + + { + // retention period where first and second index is too old, and 5th has old origination date. + testRetentionReference.set(TimeValue.timeValueMillis(2500)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null); + assertThat(failureIndices.size(), is(3)); + assertThat(failureIndices.get(0).getName(), is(dataStream.getFailureIndices().get(0).getName())); + assertThat(failureIndices.get(1).getName(), is(dataStream.getFailureIndices().get(1).getName())); + assertThat(failureIndices.get(2).getName(), is(dataStream.getFailureIndices().get(5).getName())); } { // no index matches the retention age testRetentionReference.set(TimeValue.timeValueMillis(9000)); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); - assertThat(backingIndices.isEmpty(), is(true)); + List failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null); + assertThat(failureIndices.isEmpty(), is(true)); } } @@ -1842,55 +1955,48 @@ public void testIsIndexManagedByDataStreamLifecycle() { } } - public void testGetIndicesOlderThanWithOriginationDate() { - // First, build an ordinary datastream: - String dataStreamName = "metrics-foo"; - long now = System.currentTimeMillis(); - List creationAndRolloverTimes = List.of( - DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000), - DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000), - DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000), - DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000), - DataStreamMetadata.dataStreamMetadata(now, null, now - 7000), // origination date older than retention - DataStreamMetadata.dataStreamMetadata(now, null, now - 1000), // origination date within retention - DataStreamMetadata.dataStreamMetadata(now, null, now - 7000) // write index origination date older than retention - ); - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( + private DataStream createDataStream( + Metadata.Builder builder, + String dataStreamName, + List creationAndRolloverTimes, + Settings.Builder backingIndicesSettings, + @Nullable DataStreamLifecycle lifecycle + ) { + int backingIndicesCount = creationAndRolloverTimes.size(); + final List backingIndices = createDataStreamIndices( builder, dataStreamName, creationAndRolloverTimes, - settings(IndexVersion.current()), - new DataStreamLifecycle() + backingIndicesSettings, + backingIndicesCount, + false ); - Metadata metadata = builder.build(); - - List backingIndices = dataStream.getNonWriteIndicesOlderThan( - TimeValue.timeValueMillis(2500), - metadata::index, - null, - () -> now + final List failureIndices = createDataStreamIndices( + builder, + dataStreamName, + creationAndRolloverTimes, + backingIndicesSettings, + backingIndicesCount, + true ); - // We expect to see the index with the really old origination date, but not the one with the more recent origination date (and - // not the write index) - assertThat(backingIndices.size(), is(3)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 6))); + return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle, failureIndices); } - private DataStream createDataStream( + private static List createDataStreamIndices( Metadata.Builder builder, String dataStreamName, List creationAndRolloverTimes, Settings.Builder backingIndicesSettings, - @Nullable DataStreamLifecycle lifecycle + int backingIndicesCount, + boolean isFailureStore ) { - int backingIndicesCount = creationAndRolloverTimes.size(); - final List backingIndices = new ArrayList<>(); + List indices = new ArrayList<>(backingIndicesCount); for (int k = 1; k <= backingIndicesCount; k++) { DataStreamMetadata creationRolloverTime = creationAndRolloverTimes.get(k - 1); - IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k)) + String indexName = isFailureStore + ? getDefaultFailureStoreName(dataStreamName, k, System.currentTimeMillis()) + : DataStream.getDefaultBackingIndexName(dataStreamName, k); + IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexName) .settings(backingIndicesSettings) .numberOfShards(1) .numberOfReplicas(1) @@ -1906,12 +2012,15 @@ private DataStream createDataStream( Long originationTimeInMillis = creationRolloverTime.originationTimeInMillis; if (originationTimeInMillis != null) { backingIndicesSettings.put(LIFECYCLE_ORIGINATION_DATE, originationTimeInMillis); + } else { + // We reuse the backingIndicesSettings, so it's important to reset it + backingIndicesSettings.putNull(LIFECYCLE_ORIGINATION_DATE); } IndexMetadata indexMetadata = indexMetaBuilder.build(); builder.put(indexMetadata, false); - backingIndices.add(indexMetadata.getIndex()); + indices.add(indexMetadata.getIndex()); } - return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle); + return indices; } public void testXContentSerializationWithRolloverAndEffectiveRetention() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index d5cd14b7fd96f..89a665f54550d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -77,7 +77,7 @@ public void testCreateDataStream() throws Exception { assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false)); assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); - assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), nullValue()); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat( @@ -114,7 +114,7 @@ public void testCreateDataStreamLogsdb() throws Exception { assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.LOGSDB)); - assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat( newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index fe546bd0d49af..73489073c4f06 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -478,7 +478,7 @@ public void testUpdateLifecycle() { ClusterState after = service.updateDataLifecycle(before, List.of(dataStream), null); DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream); assertNotNull(updatedDataStream); - assertThat(updatedDataStream.getLifecycle(), nullValue()); + assertThat(updatedDataStream.getDataLifecycle(), nullValue()); before = after; } @@ -487,7 +487,7 @@ public void testUpdateLifecycle() { ClusterState after = service.updateDataLifecycle(before, List.of(dataStream), lifecycle); DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream); assertNotNull(updatedDataStream); - assertThat(updatedDataStream.getLifecycle(), equalTo(lifecycle)); + assertThat(updatedDataStream.getDataLifecycle(), equalTo(lifecycle)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java index 92d1bfbb16f72..862d66c875a61 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java @@ -80,14 +80,14 @@ public static DataStreamLifecycleFeatureSetUsage.LifecycleStats calculateStats( LongSummaryStatistics effectiveRetentionStats = new LongSummaryStatistics(); for (DataStream dataStream : dataStreams) { - if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { + if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) { dataStreamsWithLifecycles++; // Track data retention - if (dataStream.getLifecycle().dataRetention() != null) { - dataRetentionStats.accept(dataStream.getLifecycle().dataRetention().getMillis()); + if (dataStream.getDataLifecycle().dataRetention() != null) { + dataRetentionStats.accept(dataStream.getDataLifecycle().dataRetention().getMillis()); } // Track effective retention - Tuple effectiveDataRetentionWithSource = dataStream.getLifecycle() + Tuple effectiveDataRetentionWithSource = dataStream.getDataLifecycle() .getEffectiveDataRetentionWithSource(globalRetention, dataStream.isInternal()); // Track global retention usage