@@ -347,16 +347,22 @@ void run(ClusterState state) {
347347 int affectedDataStreams = 0 ;
348348 for (DataStream dataStream : state .metadata ().dataStreams ().values ()) {
349349 clearErrorStoreForUnmanagedIndices (dataStream );
350- if (dataStream .getLifecycle () == null ) {
350+ if (dataStream .getDataLifecycle () == null ) {
351351 continue ;
352352 }
353353
354354 // the following indices should not be considered for the remainder of this service run, for various reasons.
355355 Set <Index > indicesToExcludeForRemainingRun = new HashSet <>();
356356
357- // This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
358- // depending on rollover criteria, for this reason we exclude it for the remaining run.
359- indicesToExcludeForRemainingRun .addAll (maybeExecuteRollover (state , dataStream ));
357+ // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
358+ // depending on rollover criteria, for this reason we exclude them for the remaining run.
359+ indicesToExcludeForRemainingRun .add (maybeExecuteRollover (state , dataStream , false ));
360+ if (DataStream .isFailureStoreFeatureFlagEnabled ()) {
361+ Index failureStoreWriteIndex = maybeExecuteRollover (state , dataStream , true );
362+ if (failureStoreWriteIndex != null ) {
363+ indicesToExcludeForRemainingRun .add (failureStoreWriteIndex );
364+ }
365+ }
360366
361367 // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
362368 // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
@@ -799,23 +805,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
799805 }
800806 }
801807
802- /**
803- * This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
804- * apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
805- * @return the write index of this data stream before rollover was requested.
806- */
807- private Set <Index > maybeExecuteRollover (ClusterState state , DataStream dataStream ) {
808- Set <Index > currentRunWriteIndices = new HashSet <>();
809- currentRunWriteIndices .add (maybeExecuteRollover (state , dataStream , false ));
810- if (DataStream .isFailureStoreFeatureFlagEnabled ()) {
811- Index failureStoreWriteIndex = maybeExecuteRollover (state , dataStream , true );
812- if (failureStoreWriteIndex != null ) {
813- currentRunWriteIndices .add (failureStoreWriteIndex );
814- }
815- }
816- return currentRunWriteIndices ;
817- }
818-
819808 @ Nullable
820809 private Index maybeExecuteRollover (ClusterState state , DataStream dataStream , boolean rolloverFailureStore ) {
821810 Index currentRunWriteIndex = rolloverFailureStore ? dataStream .getWriteFailureIndex () : dataStream .getWriteIndex ();
@@ -824,10 +813,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
824813 }
825814 try {
826815 if (dataStream .isIndexManagedByDataStreamLifecycle (currentRunWriteIndex , state .metadata ()::index )) {
816+ DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream .getFailuresLifecycle () : dataStream .getDataLifecycle ();
827817 RolloverRequest rolloverRequest = getDefaultRolloverRequest (
828818 rolloverConfiguration ,
829819 dataStream .getName (),
830- dataStream . getLifecycle () .getEffectiveDataRetention (globalRetentionSettings .get (), dataStream .isInternal ()),
820+ lifecycle .getEffectiveDataRetention (globalRetentionSettings .get (), dataStream .isInternal ()),
831821 rolloverFailureStore
832822 );
833823 transportActionsDeduplicator .executeOnce (
@@ -880,41 +870,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
880870 Set <Index > maybeExecuteRetention (ClusterState state , DataStream dataStream , Set <Index > indicesToExcludeForRemainingRun ) {
881871 Metadata metadata = state .metadata ();
882872 DataStreamGlobalRetention globalRetention = dataStream .isSystem () ? null : globalRetentionSettings .get ();
883- List <Index > backingIndicesOlderThanRetention = dataStream .getIndicesPastRetention (metadata ::index , nowSupplier , globalRetention );
884- if (backingIndicesOlderThanRetention .isEmpty ()) {
873+ List <Index > backingIndicesOlderThanRetention = dataStream .getBackingIndicesPastRetention (
874+ metadata ::index ,
875+ nowSupplier ,
876+ globalRetention
877+ );
878+ List <Index > failureIndicesOlderThanRetention = dataStream .getFailureIndicesPastRetention (
879+ metadata ::index ,
880+ nowSupplier ,
881+ globalRetention
882+ );
883+ if (backingIndicesOlderThanRetention .isEmpty () && failureIndicesOlderThanRetention .isEmpty ()) {
885884 return Set .of ();
886885 }
887886 Set <Index > indicesToBeRemoved = new HashSet <>();
888- // We know that there is lifecycle and retention because there are indices to be deleted
889- assert dataStream .getLifecycle () != null ;
890- TimeValue effectiveDataRetention = dataStream .getLifecycle ().getEffectiveDataRetention (globalRetention , dataStream .isInternal ());
891- for (Index index : backingIndicesOlderThanRetention ) {
892- if (indicesToExcludeForRemainingRun .contains (index ) == false ) {
893- IndexMetadata backingIndex = metadata .index (index );
894- assert backingIndex != null : "the data stream backing indices must exist" ;
895-
896- IndexMetadata .DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS .get (backingIndex .getSettings ());
897- // we don't want to delete the source index if they have an in-progress downsampling operation because the
898- // target downsample index will remain in the system as a standalone index
899- if (downsampleStatus == STARTED ) {
900- // there's an opportunity here to cancel downsampling and delete the source index now
901- logger .trace (
902- "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
903- + "because there's a downsampling operation currently in progress for this index. Current downsampling "
904- + "status is [{}]. When downsampling completes, DSL will delete this index." ,
905- index .getName (),
906- effectiveDataRetention ,
907- downsampleStatus
908- );
909- } else {
910- // UNKNOWN is the default value, and has no real use. So index should be deleted
911- // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
887+ if (backingIndicesOlderThanRetention .isEmpty () == false ) {
888+ assert dataStream .getDataLifecycle () != null : "data stream should have failure lifecycle if we have 'old' indices" ;
889+ TimeValue dataRetention = dataStream .getDataLifecycle ().getEffectiveDataRetention (globalRetention , dataStream .isInternal ());
890+ for (Index index : backingIndicesOlderThanRetention ) {
891+ if (indicesToExcludeForRemainingRun .contains (index ) == false ) {
892+ IndexMetadata backingIndex = metadata .index (index );
893+ assert backingIndex != null : "the data stream backing indices must exist" ;
894+
895+ IndexMetadata .DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS .get (backingIndex .getSettings ());
896+ // we don't want to delete the source index if they have an in-progress downsampling operation because the
897+ // target downsample index will remain in the system as a standalone index
898+ if (downsampleStatus == STARTED ) {
899+ // there's an opportunity here to cancel downsampling and delete the source index now
900+ logger .trace (
901+ "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
902+ + "because there's a downsampling operation currently in progress for this index. Current downsampling "
903+ + "status is [{}]. When downsampling completes, DSL will delete this index." ,
904+ index .getName (),
905+ dataRetention ,
906+ downsampleStatus
907+ );
908+ } else {
909+ // UNKNOWN is the default value, and has no real use. So index should be deleted
910+ // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
911+ indicesToBeRemoved .add (index );
912+
913+ // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
914+ // let's start simple and reevaluate
915+ String indexName = backingIndex .getIndex ().getName ();
916+ deleteIndexOnce (indexName , "the lapsed [" + dataRetention + "] retention period" );
917+ }
918+ }
919+ }
920+ }
921+ if (failureIndicesOlderThanRetention .isEmpty () == false ) {
922+ assert dataStream .getFailuresLifecycle () != null : "data stream should have failure lifecycle if we have 'old' indices" ;
923+ var failureRetention = dataStream .getFailuresLifecycle ().getEffectiveDataRetention (globalRetention , dataStream .isInternal ());
924+ for (Index index : failureIndicesOlderThanRetention ) {
925+ if (indicesToExcludeForRemainingRun .contains (index ) == false ) {
926+ IndexMetadata failureIndex = metadata .index (index );
927+ assert failureIndex != null : "the data stream failure indices must exist" ;
912928 indicesToBeRemoved .add (index );
913-
914929 // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
915930 // let's start simple and reevaluate
916- String indexName = backingIndex .getIndex ().getName ();
917- deleteIndexOnce (indexName , "the lapsed [" + effectiveDataRetention + "] retention period" );
931+ String indexName = failureIndex .getIndex ().getName ();
932+ deleteIndexOnce (indexName , "the lapsed [" + failureRetention + "] retention period" );
918933 }
919934 }
920935 }
0 commit comments