Skip to content

Commit e088eb3

Browse files
authored
[Failure Store] Conceptually introduce the failure store lifecycle (#125258) (#125657)
* Specify index component when retrieving lifecycle * Add getters for the failure lifecycle * Conceptually introduce the failure store lifecycle (even for now it's the same) (cherry picked from commit 6503c1b) # Conflicts: # modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java # modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java # modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java # server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java # server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java # server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
1 parent 3e493b3 commit e088eb3

File tree

13 files changed

+466
-295
lines changed

13 files changed

+466
-295
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ public void testGetDataStream() throws Exception {
13881388
assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW));
13891389
assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo"));
13901390
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
1391-
assertThat(dataStream.getLifecycle(), is(lifecycle.toDataStreamLifecycle()));
1391+
assertThat(dataStream.getDataLifecycle(), is(lifecycle.toDataStreamLifecycle()));
13921392
assertThat(metricsFooDataStream.templatePreferIlmValue(), is(true));
13931393
GetDataStreamAction.Response.IndexProperties indexProperties = metricsFooDataStream.getIndexSettingsValues()
13941394
.get(dataStream.getWriteIndex());

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 66 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,22 @@ void run(ClusterState state) {
347347
int affectedDataStreams = 0;
348348
for (DataStream dataStream : state.metadata().dataStreams().values()) {
349349
clearErrorStoreForUnmanagedIndices(dataStream);
350-
if (dataStream.getLifecycle() == null) {
350+
if (dataStream.getDataLifecycle() == null) {
351351
continue;
352352
}
353353

354354
// the following indices should not be considered for the remainder of this service run, for various reasons.
355355
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
356356

357-
// This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
358-
// depending on rollover criteria, for this reason we exclude it for the remaining run.
359-
indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream));
357+
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
358+
// depending on rollover criteria, for this reason we exclude them for the remaining run.
359+
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
360+
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
361+
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
362+
if (failureStoreWriteIndex != null) {
363+
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
364+
}
365+
}
360366

361367
// tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
362368
// deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
@@ -799,23 +805,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
799805
}
800806
}
801807

802-
/**
803-
* This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
804-
* apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
805-
* @return the write index of this data stream before rollover was requested.
806-
*/
807-
private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStream) {
808-
Set<Index> currentRunWriteIndices = new HashSet<>();
809-
currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false));
810-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
811-
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
812-
if (failureStoreWriteIndex != null) {
813-
currentRunWriteIndices.add(failureStoreWriteIndex);
814-
}
815-
}
816-
return currentRunWriteIndices;
817-
}
818-
819808
@Nullable
820809
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
821810
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
@@ -824,10 +813,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
824813
}
825814
try {
826815
if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata()::index)) {
816+
DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
827817
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
828818
rolloverConfiguration,
829819
dataStream.getName(),
830-
dataStream.getLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
820+
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
831821
rolloverFailureStore
832822
);
833823
transportActionsDeduplicator.executeOnce(
@@ -880,41 +870,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
880870
Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
881871
Metadata metadata = state.metadata();
882872
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
883-
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention);
884-
if (backingIndicesOlderThanRetention.isEmpty()) {
873+
List<Index> backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention(
874+
metadata::index,
875+
nowSupplier,
876+
globalRetention
877+
);
878+
List<Index> failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention(
879+
metadata::index,
880+
nowSupplier,
881+
globalRetention
882+
);
883+
if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) {
885884
return Set.of();
886885
}
887886
Set<Index> indicesToBeRemoved = new HashSet<>();
888-
// We know that there is lifecycle and retention because there are indices to be deleted
889-
assert dataStream.getLifecycle() != null;
890-
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
891-
for (Index index : backingIndicesOlderThanRetention) {
892-
if (indicesToExcludeForRemainingRun.contains(index) == false) {
893-
IndexMetadata backingIndex = metadata.index(index);
894-
assert backingIndex != null : "the data stream backing indices must exist";
895-
896-
IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
897-
// we don't want to delete the source index if they have an in-progress downsampling operation because the
898-
// target downsample index will remain in the system as a standalone index
899-
if (downsampleStatus == STARTED) {
900-
// there's an opportunity here to cancel downsampling and delete the source index now
901-
logger.trace(
902-
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
903-
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
904-
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
905-
index.getName(),
906-
effectiveDataRetention,
907-
downsampleStatus
908-
);
909-
} else {
910-
// UNKNOWN is the default value, and has no real use. So index should be deleted
911-
// SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
887+
if (backingIndicesOlderThanRetention.isEmpty() == false) {
888+
assert dataStream.getDataLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
889+
TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
890+
for (Index index : backingIndicesOlderThanRetention) {
891+
if (indicesToExcludeForRemainingRun.contains(index) == false) {
892+
IndexMetadata backingIndex = metadata.index(index);
893+
assert backingIndex != null : "the data stream backing indices must exist";
894+
895+
IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
896+
// we don't want to delete the source index if they have an in-progress downsampling operation because the
897+
// target downsample index will remain in the system as a standalone index
898+
if (downsampleStatus == STARTED) {
899+
// there's an opportunity here to cancel downsampling and delete the source index now
900+
logger.trace(
901+
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
902+
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
903+
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
904+
index.getName(),
905+
dataRetention,
906+
downsampleStatus
907+
);
908+
} else {
909+
// UNKNOWN is the default value, and has no real use. So index should be deleted
910+
// SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
911+
indicesToBeRemoved.add(index);
912+
913+
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
914+
// let's start simple and reevaluate
915+
String indexName = backingIndex.getIndex().getName();
916+
deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period");
917+
}
918+
}
919+
}
920+
}
921+
if (failureIndicesOlderThanRetention.isEmpty() == false) {
922+
assert dataStream.getFailuresLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
923+
var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
924+
for (Index index : failureIndicesOlderThanRetention) {
925+
if (indicesToExcludeForRemainingRun.contains(index) == false) {
926+
IndexMetadata failureIndex = metadata.index(index);
927+
assert failureIndex != null : "the data stream failure indices must exist";
912928
indicesToBeRemoved.add(index);
913-
914929
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
915930
// let's start simple and reevaluate
916-
String indexName = backingIndex.getIndex().getName();
917-
deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
931+
String indexName = failureIndex.getIndex().getName();
932+
deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period");
918933
}
919934
}
920935
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ protected void masterOperation(
109109
idxMetadata.getCreationDate(),
110110
rolloverInfo == null ? null : rolloverInfo.getTime(),
111111
generationDate,
112-
parentDataStream.getLifecycle(),
112+
parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()),
113113
errorStore.getError(index)
114114
);
115115
explainIndices.add(explainIndexDataStreamLifecycle);

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ protected void masterOperation(
9191
.map(
9292
dataStream -> new GetDataStreamLifecycleAction.Response.DataStreamLifecycle(
9393
dataStream.getName(),
94-
dataStream.getLifecycle(),
94+
dataStream.getDataLifecycle(),
9595
dataStream.isSystem()
9696
)
9797
)

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) {
7676
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices();
7777
List<GetDataStreamLifecycleStatsAction.Response.DataStreamStats> dataStreamStats = new ArrayList<>();
7878
for (DataStream dataStream : state.metadata().dataStreams().values()) {
79-
if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
79+
if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
8080
int total = 0;
8181
int inError = 0;
8282
for (Index index : dataStream.getIndices()) {

server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,9 +381,10 @@ public XContentBuilder toXContent(
381381
if (indexTemplate != null) {
382382
builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);
383383
}
384-
if (dataStream.getLifecycle() != null) {
384+
if (dataStream.getDataLifecycle() != null) {
385385
builder.field(LIFECYCLE_FIELD.getPreferredName());
386-
dataStream.getLifecycle().toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
386+
dataStream.getDataLifecycle()
387+
.toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
387388
}
388389
if (ilmPolicyName != null) {
389390
builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
@@ -468,15 +469,15 @@ private void addAutoShardingEvent(XContentBuilder builder, Params params, DataSt
468469
*/
469470
public ManagedBy getNextGenerationManagedBy() {
470471
// both ILM and DSL are configured so let's check the prefer_ilm setting to see which system takes precedence
471-
if (ilmPolicyName != null && dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
472+
if (ilmPolicyName != null && dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
472473
return templatePreferIlmValue ? ManagedBy.ILM : ManagedBy.LIFECYCLE;
473474
}
474475

475476
if (ilmPolicyName != null) {
476477
return ManagedBy.ILM;
477478
}
478479

479-
if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
480+
if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
480481
return ManagedBy.LIFECYCLE;
481482
}
482483

0 commit comments

Comments
 (0)