Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ public void testGetDataStream() throws Exception {
assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW));
assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo"));
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
assertThat(dataStream.getLifecycle(), is(lifecycle.toDataStreamLifecycle()));
assertThat(dataStream.getDataLifecycle(), is(lifecycle.toDataStreamLifecycle()));
assertThat(metricsFooDataStream.templatePreferIlmValue(), is(true));
GetDataStreamAction.Response.IndexProperties indexProperties = metricsFooDataStream.getIndexSettingsValues()
.get(dataStream.getWriteIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,22 @@ void run(ClusterState state) {
int affectedDataStreams = 0;
for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) {
clearErrorStoreForUnmanagedIndices(dataStream);
if (dataStream.getLifecycle() == null) {
if (dataStream.getDataLifecycle() == null) {
continue;
}

// the following indices should not be considered for the remainder of this service run, for various reasons.
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();

// This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
// depending on rollover criteria, for this reason we exclude it for the remaining run.
indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream));
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
// depending on rollover criteria, for this reason we exclude them for the remaining run.
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
if (failureStoreWriteIndex != null) {
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
}
}

// tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
// deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
Expand Down Expand Up @@ -805,23 +811,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
}
}

/**
* This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
* apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
* @return the write index of this data stream before rollover was requested.
*/
private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStream) {
Set<Index> currentRunWriteIndices = new HashSet<>();
currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
if (failureStoreWriteIndex != null) {
currentRunWriteIndices.add(failureStoreWriteIndex);
}
}
return currentRunWriteIndices;
}

@Nullable
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
Expand All @@ -830,10 +819,11 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
}
try {
if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata().getProject()::index)) {
DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
dataStream.getLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
rolloverFailureStore
);
transportActionsDeduplicator.executeOnce(
Expand Down Expand Up @@ -886,45 +876,66 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
Metadata metadata = state.metadata();
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(
List<Index> backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention(
metadata.getProject()::index,
nowSupplier,
globalRetention
);
List<Index> failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention(
metadata.getProject()::index,
nowSupplier,
globalRetention
);
if (backingIndicesOlderThanRetention.isEmpty()) {
if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) {
return Set.of();
}
Set<Index> indicesToBeRemoved = new HashSet<>();
// We know that there is lifecycle and retention because there are indices to be deleted
assert dataStream.getLifecycle() != null;
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.getProject().index(index);
assert backingIndex != null : "the data stream backing indices must exist";

IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus == STARTED) {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
effectiveDataRetention,
downsampleStatus
);
} else {
// UNKNOWN is the default value, and has no real use. So index should be deleted
// SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
if (backingIndicesOlderThanRetention.isEmpty() == false) {
assert dataStream.getDataLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.getProject().index(index);
assert backingIndex != null : "the data stream backing indices must exist";

IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus == STARTED) {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
dataRetention,
downsampleStatus
);
} else {
// UNKNOWN is the default value, and has no real use. So index should be deleted
// SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period");
}
}
}
}
if (failureIndicesOlderThanRetention.isEmpty() == false) {
assert dataStream.getFailuresLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
for (Index index : failureIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata failureIndex = metadata.getProject().index(index);
assert failureIndex != null : "the data stream failure indices must exist";
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
String indexName = failureIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void masterOperation(
idxMetadata.getCreationDate(),
rolloverInfo == null ? null : rolloverInfo.getTime(),
generationDate,
parentDataStream.getLifecycle(),
parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()),
errorStore.getError(state.projectId(), index)
);
explainIndices.add(explainIndexDataStreamLifecycle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected void localClusterStateOperation(
.map(
dataStream -> new GetDataStreamLifecycleAction.Response.DataStreamLifecycle(
dataStream.getName(),
dataStream.getLifecycle(),
dataStream.getDataLifecycle(),
dataStream.isSystem()
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ProjectMetadata project)
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(project.id());
List<GetDataStreamLifecycleStatsAction.Response.DataStreamStats> dataStreamStats = new ArrayList<>();
for (DataStream dataStream : project.dataStreams().values()) {
if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
int total = 0;
int inError = 0;
for (Index index : dataStream.getIndices()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,10 @@ public XContentBuilder toXContent(
if (indexTemplate != null) {
builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);
}
if (dataStream.getLifecycle() != null) {
if (dataStream.getDataLifecycle() != null) {
builder.field(LIFECYCLE_FIELD.getPreferredName());
dataStream.getLifecycle().toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
dataStream.getDataLifecycle()
.toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
}
if (ilmPolicyName != null) {
builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
Expand Down Expand Up @@ -471,15 +472,15 @@ private void addAutoShardingEvent(XContentBuilder builder, Params params, DataSt
*/
public ManagedBy getNextGenerationManagedBy() {
// both ILM and DSL are configured so let's check the prefer_ilm setting to see which system takes precedence
if (ilmPolicyName != null && dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
if (ilmPolicyName != null && dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
return templatePreferIlmValue ? ManagedBy.ILM : ManagedBy.LIFECYCLE;
}

if (ilmPolicyName != null) {
return ManagedBy.ILM;
}

if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
return ManagedBy.LIFECYCLE;
}

Expand Down
Loading