@@ -349,16 +349,22 @@ void run(ClusterState state) {
349349 int affectedDataStreams = 0 ;
350350 for (DataStream dataStream : state .metadata ().getProject ().dataStreams ().values ()) {
351351 clearErrorStoreForUnmanagedIndices (dataStream );
352- if (dataStream .getLifecycle () == null ) {
352+ if (dataStream .getDataLifecycle () == null ) {
353353 continue ;
354354 }
355355
356356 // the following indices should not be considered for the remainder of this service run, for various reasons.
357357 Set <Index > indicesToExcludeForRemainingRun = new HashSet <>();
358358
359- // This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
360- // depending on rollover criteria, for this reason we exclude it for the remaining run.
361- indicesToExcludeForRemainingRun .addAll (maybeExecuteRollover (state , dataStream ));
359+ // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
360+ // depending on rollover criteria, for this reason we exclude them for the remaining run.
361+ indicesToExcludeForRemainingRun .add (maybeExecuteRollover (state , dataStream , false ));
362+ if (DataStream .isFailureStoreFeatureFlagEnabled ()) {
363+ Index failureStoreWriteIndex = maybeExecuteRollover (state , dataStream , true );
364+ if (failureStoreWriteIndex != null ) {
365+ indicesToExcludeForRemainingRun .add (failureStoreWriteIndex );
366+ }
367+ }
362368
363369 // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
364370 // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
@@ -805,23 +811,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
805811 }
806812 }
807813
808- /**
809- * This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
810- * apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
811- * @return the write index of this data stream before rollover was requested.
812- */
813- private Set <Index > maybeExecuteRollover (ClusterState state , DataStream dataStream ) {
814- Set <Index > currentRunWriteIndices = new HashSet <>();
815- currentRunWriteIndices .add (maybeExecuteRollover (state , dataStream , false ));
816- if (DataStream .isFailureStoreFeatureFlagEnabled ()) {
817- Index failureStoreWriteIndex = maybeExecuteRollover (state , dataStream , true );
818- if (failureStoreWriteIndex != null ) {
819- currentRunWriteIndices .add (failureStoreWriteIndex );
820- }
821- }
822- return currentRunWriteIndices ;
823- }
824-
825814 @ Nullable
826815 private Index maybeExecuteRollover (ClusterState state , DataStream dataStream , boolean rolloverFailureStore ) {
827816 Index currentRunWriteIndex = rolloverFailureStore ? dataStream .getWriteFailureIndex () : dataStream .getWriteIndex ();
@@ -830,10 +819,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
830819 }
831820 try {
832821 if (dataStream .isIndexManagedByDataStreamLifecycle (currentRunWriteIndex , state .metadata ().getProject ()::index )) {
822+ DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream .getFailuresLifecycle () : dataStream .getDataLifecycle ();
833823 RolloverRequest rolloverRequest = getDefaultRolloverRequest (
834824 rolloverConfiguration ,
835825 dataStream .getName (),
836- dataStream . getLifecycle () .getEffectiveDataRetention (globalRetentionSettings .get (), dataStream .isInternal ()),
826+ lifecycle .getEffectiveDataRetention (globalRetentionSettings .get (), dataStream .isInternal ()),
837827 rolloverFailureStore
838828 );
839829 transportActionsDeduplicator .executeOnce (
@@ -886,45 +876,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
886876 Set <Index > maybeExecuteRetention (ClusterState state , DataStream dataStream , Set <Index > indicesToExcludeForRemainingRun ) {
887877 Metadata metadata = state .metadata ();
888878 DataStreamGlobalRetention globalRetention = dataStream .isSystem () ? null : globalRetentionSettings .get ();
889- List <Index > backingIndicesOlderThanRetention = dataStream .getIndicesPastRetention (
879+ List <Index > backingIndicesOlderThanRetention = dataStream .getBackingIndicesPastRetention (
880+ metadata .getProject ()::index ,
881+ nowSupplier ,
882+ globalRetention
883+ );
884+ List <Index > failureIndicesOlderThanRetention = dataStream .getFailureIndicesPastRetention (
890885 metadata .getProject ()::index ,
891886 nowSupplier ,
892887 globalRetention
893888 );
894- if (backingIndicesOlderThanRetention .isEmpty ()) {
889+ if (backingIndicesOlderThanRetention .isEmpty () && failureIndicesOlderThanRetention . isEmpty () ) {
895890 return Set .of ();
896891 }
897892 Set <Index > indicesToBeRemoved = new HashSet <>();
898- // We know that there is lifecycle and retention because there are indices to be deleted
899- assert dataStream .getLifecycle () != null ;
900- TimeValue effectiveDataRetention = dataStream .getLifecycle ().getEffectiveDataRetention (globalRetention , dataStream .isInternal ());
901- for (Index index : backingIndicesOlderThanRetention ) {
902- if (indicesToExcludeForRemainingRun .contains (index ) == false ) {
903- IndexMetadata backingIndex = metadata .getProject ().index (index );
904- assert backingIndex != null : "the data stream backing indices must exist" ;
905-
906- IndexMetadata .DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS .get (backingIndex .getSettings ());
907- // we don't want to delete the source index if they have an in-progress downsampling operation because the
908- // target downsample index will remain in the system as a standalone index
909- if (downsampleStatus == STARTED ) {
910- // there's an opportunity here to cancel downsampling and delete the source index now
911- logger .trace (
912- "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
913- + "because there's a downsampling operation currently in progress for this index. Current downsampling "
914- + "status is [{}]. When downsampling completes, DSL will delete this index." ,
915- index .getName (),
916- effectiveDataRetention ,
917- downsampleStatus
918- );
919- } else {
920- // UNKNOWN is the default value, and has no real use. So index should be deleted
921- // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
893+ if (backingIndicesOlderThanRetention .isEmpty () == false ) {
894+ assert dataStream .getDataLifecycle () != null : "data stream should have failure lifecycle if we have 'old' indices" ;
895+ TimeValue dataRetention = dataStream .getDataLifecycle ().getEffectiveDataRetention (globalRetention , dataStream .isInternal ());
896+ for (Index index : backingIndicesOlderThanRetention ) {
897+ if (indicesToExcludeForRemainingRun .contains (index ) == false ) {
898+ IndexMetadata backingIndex = metadata .getProject ().index (index );
899+ assert backingIndex != null : "the data stream backing indices must exist" ;
900+
901+ IndexMetadata .DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS .get (backingIndex .getSettings ());
902+ // we don't want to delete the source index if they have an in-progress downsampling operation because the
903+ // target downsample index will remain in the system as a standalone index
904+ if (downsampleStatus == STARTED ) {
905+ // there's an opportunity here to cancel downsampling and delete the source index now
906+ logger .trace (
907+ "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
908+ + "because there's a downsampling operation currently in progress for this index. Current downsampling "
909+ + "status is [{}]. When downsampling completes, DSL will delete this index." ,
910+ index .getName (),
911+ dataRetention ,
912+ downsampleStatus
913+ );
914+ } else {
915+ // UNKNOWN is the default value, and has no real use. So index should be deleted
916+ // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
917+ indicesToBeRemoved .add (index );
918+
919+ // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
920+ // let's start simple and reevaluate
921+ String indexName = backingIndex .getIndex ().getName ();
922+ deleteIndexOnce (indexName , "the lapsed [" + dataRetention + "] retention period" );
923+ }
924+ }
925+ }
926+ }
927+ if (failureIndicesOlderThanRetention .isEmpty () == false ) {
928+ assert dataStream .getFailuresLifecycle () != null : "data stream should have failure lifecycle if we have 'old' indices" ;
929+ var failureRetention = dataStream .getFailuresLifecycle ().getEffectiveDataRetention (globalRetention , dataStream .isInternal ());
930+ for (Index index : failureIndicesOlderThanRetention ) {
931+ if (indicesToExcludeForRemainingRun .contains (index ) == false ) {
932+ IndexMetadata failureIndex = metadata .getProject ().index (index );
933+ assert failureIndex != null : "the data stream failure indices must exist" ;
922934 indicesToBeRemoved .add (index );
923-
924935 // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
925936 // let's start simple and reevaluate
926- String indexName = backingIndex .getIndex ().getName ();
927- deleteIndexOnce (indexName , "the lapsed [" + effectiveDataRetention + "] retention period" );
937+ String indexName = failureIndex .getIndex ().getName ();
938+ deleteIndexOnce (indexName , "the lapsed [" + failureRetention + "] retention period" );
928939 }
929940 }
930941 }
0 commit comments