Skip to content

Commit 804f2fe

Browse files
committed
Conceptually introduce the failure store lifecycle (even for now it's the same)
1 parent 1036ff8 commit 804f2fe

File tree

5 files changed

+430
-285
lines changed

5 files changed

+430
-285
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,15 @@ void run(ClusterState state) {
356356
// the following indices should not be considered for the remainder of this service run, for various reasons.
357357
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
358358

359-
// This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
360-
// depending on rollover criteria, for this reason we exclude it for the remaining run.
361-
indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream));
359+
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
360+
// depending on rollover criteria, for this reason we exclude them for the remaining run.
361+
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
362+
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
363+
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
364+
if (failureStoreWriteIndex != null) {
365+
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
366+
}
367+
}
362368

363369
// tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
364370
// deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
@@ -805,23 +811,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
805811
}
806812
}
807813

808-
/**
809-
* This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
810-
* apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
811-
* @return the write index of this data stream before rollover was requested.
812-
*/
813-
private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStream) {
814-
Set<Index> currentRunWriteIndices = new HashSet<>();
815-
currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false));
816-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
817-
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
818-
if (failureStoreWriteIndex != null) {
819-
currentRunWriteIndices.add(failureStoreWriteIndex);
820-
}
821-
}
822-
return currentRunWriteIndices;
823-
}
824-
825814
@Nullable
826815
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
827816
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
@@ -830,10 +819,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
830819
}
831820
try {
832821
if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata().getProject()::index)) {
822+
DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
833823
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
834824
rolloverConfiguration,
835825
dataStream.getName(),
836-
dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
826+
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
837827
rolloverFailureStore
838828
);
839829
transportActionsDeduplicator.executeOnce(
@@ -886,44 +876,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
886876
Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
887877
Metadata metadata = state.metadata();
888878
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
889-
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(
879+
List<Index> backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention(
880+
metadata.getProject()::index,
881+
nowSupplier,
882+
globalRetention
883+
);
884+
List<Index> failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention(
890885
metadata.getProject()::index,
891886
nowSupplier,
892887
globalRetention
893888
);
894-
if (backingIndicesOlderThanRetention.isEmpty()) {
889+
if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) {
895890
return Set.of();
896891
}
897892
Set<Index> indicesToBeRemoved = new HashSet<>();
898-
TimeValue effectiveDataRetention = dataStream.getDataLifecycle()
899-
.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
900-
for (Index index : backingIndicesOlderThanRetention) {
901-
if (indicesToExcludeForRemainingRun.contains(index) == false) {
902-
IndexMetadata backingIndex = metadata.getProject().index(index);
903-
assert backingIndex != null : "the data stream backing indices must exist";
904-
905-
IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
906-
// we don't want to delete the source index if they have an in-progress downsampling operation because the
907-
// target downsample index will remain in the system as a standalone index
908-
if (downsampleStatus == STARTED) {
909-
// there's an opportunity here to cancel downsampling and delete the source index now
910-
logger.trace(
911-
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
912-
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
913-
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
914-
index.getName(),
915-
effectiveDataRetention,
916-
downsampleStatus
917-
);
918-
} else {
919-
// UNKNOWN is the default value, and has no real use. So index should be deleted
920-
// SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
893+
if (backingIndicesOlderThanRetention.isEmpty() == false) {
894+
assert dataStream.getDataLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
895+
TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
896+
for (Index index : backingIndicesOlderThanRetention) {
897+
if (indicesToExcludeForRemainingRun.contains(index) == false) {
898+
IndexMetadata backingIndex = metadata.getProject().index(index);
899+
assert backingIndex != null : "the data stream backing indices must exist";
900+
901+
IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
902+
// we don't want to delete the source index if they have an in-progress downsampling operation because the
903+
// target downsample index will remain in the system as a standalone index
904+
if (downsampleStatus == STARTED) {
905+
// there's an opportunity here to cancel downsampling and delete the source index now
906+
logger.trace(
907+
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
908+
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
909+
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
910+
index.getName(),
911+
dataRetention,
912+
downsampleStatus
913+
);
914+
} else {
915+
// UNKNOWN is the default value, and has no real use. So index should be deleted
916+
// SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
917+
indicesToBeRemoved.add(index);
918+
919+
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
920+
// let's start simple and reevaluate
921+
String indexName = backingIndex.getIndex().getName();
922+
deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period");
923+
}
924+
}
925+
}
926+
}
927+
if (failureIndicesOlderThanRetention.isEmpty() == false) {
928+
assert dataStream.getFailuresLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
929+
var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
930+
for (Index index : failureIndicesOlderThanRetention) {
931+
if (indicesToExcludeForRemainingRun.contains(index) == false) {
932+
IndexMetadata failureIndex = metadata.getProject().index(index);
933+
assert failureIndex != null : "the data stream failure indices must exist";
921934
indicesToBeRemoved.add(index);
922-
923935
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
924936
// let's start simple and reevaluate
925-
String indexName = backingIndex.getIndex().getName();
926-
deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
937+
String indexName = failureIndex.getIndex().getName();
938+
deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period");
927939
}
928940
}
929941
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected void masterOperation(
112112
idxMetadata.getCreationDate(),
113113
rolloverInfo == null ? null : rolloverInfo.getTime(),
114114
generationDate,
115-
parentDataStream.getDataLifecycle(),
115+
parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()),
116116
errorStore.getError(state.projectId(), index)
117117
);
118118
explainIndices.add(explainIndexDataStreamLifecycle);

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -961,24 +961,53 @@ private static boolean isAnyIndexMissing(List<Index> indices, Metadata.Builder b
961961
}
962962

963963
/**
964-
* Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the configured
965-
* retention in their lifecycle.
964+
* Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the
965+
* configured retention in their lifecycle.
966966
* NOTE that this specifically does not return the write index of the data stream as usually retention
967967
* is treated differently for the write index (i.e. they first need to be rolled over)
968968
*/
969-
public List<Index> getIndicesPastRetention(
969+
public List<Index> getBackingIndicesPastRetention(
970970
Function<String, IndexMetadata> indexMetadataSupplier,
971971
LongSupplier nowSupplier,
972972
DataStreamGlobalRetention globalRetention
973973
) {
974-
if (lifecycle == null
975-
|| lifecycle.enabled() == false
976-
|| lifecycle.getEffectiveDataRetention(globalRetention, isInternal()) == null) {
974+
if (getDataLifecycle() == null
975+
|| getDataLifecycle().enabled() == false
976+
|| getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) {
977977
return List.of();
978978
}
979979

980980
List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
981-
lifecycle.getEffectiveDataRetention(globalRetention, isInternal()),
981+
getIndices(),
982+
getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()),
983+
indexMetadataSupplier,
984+
this::isIndexManagedByDataStreamLifecycle,
985+
nowSupplier
986+
);
987+
return indicesPastRetention;
988+
}
989+
990+
/**
991+
* Iterate over the failure indices and return the ones that are managed by the data stream lifecycle and past the
992+
* configured retention in their lifecycle.
993+
* NOTE that this specifically does not return the write index of the data stream as usually retention
994+
* is treated differently for the write index (i.e. they first need to be rolled over)
995+
*/
996+
public List<Index> getFailureIndicesPastRetention(
997+
Function<String, IndexMetadata> indexMetadataSupplier,
998+
LongSupplier nowSupplier,
999+
DataStreamGlobalRetention globalRetention
1000+
) {
1001+
if (DataStream.isFailureStoreFeatureFlagEnabled() == false
1002+
|| getFailuresLifecycle() == null
1003+
|| getFailuresLifecycle().enabled() == false
1004+
|| getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) {
1005+
return List.of();
1006+
}
1007+
1008+
List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
1009+
getFailureIndices(),
1010+
getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()),
9821011
indexMetadataSupplier,
9831012
this::isIndexManagedByDataStreamLifecycle,
9841013
nowSupplier
@@ -1024,37 +1053,28 @@ public List<DownsamplingRound> getDownsamplingRoundsFor(
10241053
}
10251054

10261055
/**
1027-
* Returns the non-write backing indices and failure store indices that are older than the provided age,
1056+
* Filters the given <code>indices</code> that are older than the provided age and populates <code>olderIndices</code>,
10281057
* excluding the write indices. The index age is calculated from the rollover or index creation date (or
10291058
* the origination date if present). If an indices predicate is provided the returned list of indices will
10301059
* be filtered according to the predicate definition. This is useful for things like "return only
1031-
* the backing indices that are managed by the data stream lifecycle".
1060+
* the indices that are managed by the data stream lifecycle".
10321061
*/
1033-
public List<Index> getNonWriteIndicesOlderThan(
1062+
private List<Index> getNonWriteIndicesOlderThan(
1063+
List<Index> indices,
10341064
TimeValue retentionPeriod,
10351065
Function<String, IndexMetadata> indexMetadataSupplier,
10361066
@Nullable Predicate<IndexMetadata> indicesPredicate,
10371067
LongSupplier nowSupplier
10381068
) {
1069+
if (indices.isEmpty()) {
1070+
return List.of();
1071+
}
10391072
List<Index> olderIndices = new ArrayList<>();
1040-
for (Index index : backingIndices.getIndices()) {
1073+
for (Index index : indices) {
10411074
if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) {
10421075
olderIndices.add(index);
10431076
}
10441077
}
1045-
if (DataStream.isFailureStoreFeatureFlagEnabled() && failureIndices.getIndices().isEmpty() == false) {
1046-
for (Index index : failureIndices.getIndices()) {
1047-
if (isIndexOlderThan(
1048-
index,
1049-
retentionPeriod.getMillis(),
1050-
nowSupplier.getAsLong(),
1051-
indicesPredicate,
1052-
indexMetadataSupplier
1053-
)) {
1054-
olderIndices.add(index);
1055-
}
1056-
}
1057-
}
10581078
return olderIndices;
10591079
}
10601080

@@ -1121,7 +1141,7 @@ private boolean isIndexManagedByDataStreamLifecycle(IndexMetadata indexMetadata)
11211141
*/
11221142
@Nullable
11231143
public TimeValue getGenerationLifecycleDate(IndexMetadata indexMetadata) {
1124-
if (indexMetadata.getIndex().equals(getWriteIndex())) {
1144+
if (indexMetadata.getIndex().equals(getWriteIndex()) || indexMetadata.getIndex().equals(getWriteFailureIndex())) {
11251145
return null;
11261146
}
11271147
Long originationDate = indexMetadata.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, null);

server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,12 @@ public boolean isIndexManagedByILM(IndexMetadata indexMetadata) {
10791079
}
10801080

10811081
DataStream parentDataStream = indexAbstraction.getParentDataStream();
1082-
if (parentDataStream != null && parentDataStream.getDataLifecycle() != null && parentDataStream.getDataLifecycle().enabled()) {
1082+
// Only data streams can be managed by data stream lifecycle
1083+
if (parentDataStream == null) {
1084+
return true;
1085+
}
1086+
DataStreamLifecycle lifecycle = parentDataStream.getDataLifecycleForIndex(indexMetadata.getIndex());
1087+
if (lifecycle != null && lifecycle.enabled()) {
10831088
// index has both ILM and data stream lifecycle configured so let's check which is preferred
10841089
return PREFER_ILM_SETTING.get(indexMetadata.getSettings());
10851090
}

0 commit comments

Comments
 (0)