Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ public void testGetDataStream() throws Exception {
assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW));
assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo"));
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
assertThat(dataStream.getLifecycle(), is(lifecycle));
assertThat(dataStream.getDataLifecycle(), is(lifecycle));
assertThat(metricsFooDataStream.templatePreferIlmValue(), is(true));
GetDataStreamAction.Response.IndexProperties indexProperties = metricsFooDataStream.getIndexSettingsValues()
.get(dataStream.getWriteIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().dataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(true));
}

Expand Down Expand Up @@ -220,7 +220,7 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().dataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(false));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());
if (internalCluster().numDataNodes() > 1) {
// If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run
assertThat(explainIndex.getError(), nullValue());
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());

if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) {
// first generation index was rolled over
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testSystemExplainLifecycle() throws Exception {
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(
explainIndex.getLifecycle().getDataStreamRetention(),
explainIndex.getLifecycle().dataRetention(),
equalTo(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS))
);
}
Expand Down Expand Up @@ -329,7 +329,7 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());
assertThat(explainIndex.getRolloverDate(), nullValue());
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue());
// index has not been rolled over yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,22 @@ void run(ClusterState state) {
int affectedDataStreams = 0;
for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) {
clearErrorStoreForUnmanagedIndices(dataStream);
if (dataStream.getLifecycle() == null) {
if (dataStream.getDataLifecycle() == null) {
continue;
}

// the following indices should not be considered for the remainder of this service run, for various reasons.
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();

// This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
// depending on rollover criteria, for this reason we exclude it for the remaining run.
indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream));
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
// depending on rollover criteria, for this reason we exclude them for the remaining run.
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
if (failureStoreWriteIndex != null) {
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
}
}

// tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
// deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
Expand Down Expand Up @@ -799,23 +805,6 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
}
}

/**
* This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
* apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
* @return the write index of this data stream before rollover was requested.
*/
private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStream) {
Set<Index> currentRunWriteIndices = new HashSet<>();
currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
if (failureStoreWriteIndex != null) {
currentRunWriteIndices.add(failureStoreWriteIndex);
}
}
return currentRunWriteIndices;
}

@Nullable
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
Expand All @@ -827,7 +816,8 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
dataStream.getLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
dataStream.getDataLifecycleForIndex(currentRunWriteIndex)
.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
rolloverFailureStore
);
transportActionsDeduplicator.executeOnce(
Expand Down Expand Up @@ -889,9 +879,6 @@ Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<
return Set.of();
}
Set<Index> indicesToBeRemoved = new HashSet<>();
// We know that there is lifecycle and retention because there are indices to be deleted
assert dataStream.getLifecycle() != null;
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.getProject().index(index);
Expand All @@ -907,7 +894,7 @@ Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
effectiveDataRetention,
dataStream.getDataLifecycleForIndex(index).getEffectiveDataRetention(globalRetention, dataStream.isInternal()),
downsampleStatus
);
} else {
Expand All @@ -918,7 +905,12 @@ Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
deleteIndexOnce(
indexName,
"the lapsed ["
+ dataStream.getDataLifecycleForIndex(index).getEffectiveDataRetention(globalRetention, dataStream.isInternal())
+ "] retention period"
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void masterOperation(
idxMetadata.getCreationDate(),
rolloverInfo == null ? null : rolloverInfo.getTime(),
generationDate,
parentDataStream.getLifecycle(),
parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()),
errorStore.getError(index)
);
explainIndices.add(explainIndexDataStreamLifecycle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void masterOperation(
.map(
dataStream -> new GetDataStreamLifecycleAction.Response.DataStreamLifecycle(
dataStream.getName(),
dataStream.getLifecycle(),
dataStream.getDataLifecycle(),
dataStream.isSystem()
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) {
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices();
List<GetDataStreamLifecycleStatsAction.Response.DataStreamStats> dataStreamStats = new ArrayList<>();
for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) {
if (dataStream.getLifecycle() != null && dataStream.getLifecycle().isEnabled()) {
if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().isEnabled()) {
int total = 0;
int inError = 0;
for (Index index : dataStream.getIndices()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.BasicDataStreamLifecycle;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -62,6 +62,6 @@ public boolean allowSystemIndexAccessByDefault() {

@Override
public Set<String> supportedCapabilities() {
return Set.of(DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY);
return Set.of(BasicDataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.BasicDataStreamLifecycle;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -60,6 +60,6 @@ public boolean allowSystemIndexAccessByDefault() {

@Override
public Set<String> supportedCapabilities() {
return Set.of(DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY, "data_stream_global_retention");
return Set.of(BasicDataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY, "data_stream_global_retention");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.BasicDataStreamLifecycle;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -74,7 +74,7 @@ public boolean allowSystemIndexAccessByDefault() {

@Override
public Set<String> supportedCapabilities() {
return Set.of(DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY);
return Set.of(BasicDataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void testLifecycleComposition() {
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
// Defaults to true
assertThat(result.isEnabled(), equalTo(true));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention()));
assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones
Expand All @@ -166,7 +166,7 @@ public void testLifecycleComposition() {
List<DataStreamLifecycle> lifecycles = List.of(lifecycle, new DataStreamLifecycle());
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.isEnabled(), equalTo(true));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention()));
assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If both lifecycle have all properties, then the latest one overwrites all the others
Expand All @@ -184,7 +184,7 @@ public void testLifecycleComposition() {
List<DataStreamLifecycle> lifecycles = List.of(lifecycle1, lifecycle2);
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.isEnabled(), equalTo(lifecycle2.isEnabled()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle2.getDataStreamRetention()));
assertThat(result.dataRetention(), equalTo(lifecycle2.dataRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle2.getDownsamplingRounds()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.BasicDataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -397,9 +397,10 @@ public XContentBuilder toXContent(
if (indexTemplate != null) {
builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);
}
if (dataStream.getLifecycle() != null) {
if (dataStream.getDataLifecycle() != null) {
builder.field(LIFECYCLE_FIELD.getPreferredName());
dataStream.getLifecycle().toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
dataStream.getDataLifecycle()
.toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
}
if (ilmPolicyName != null) {
builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
Expand Down Expand Up @@ -488,15 +489,15 @@ private void addAutoShardingEvent(XContentBuilder builder, Params params, DataSt
*/
public ManagedBy getNextGenerationManagedBy() {
// both ILM and DSL are configured so let's check the prefer_ilm setting to see which system takes precedence
if (ilmPolicyName != null && dataStream.getLifecycle() != null && dataStream.getLifecycle().isEnabled()) {
if (ilmPolicyName != null && dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().isEnabled()) {
return templatePreferIlmValue ? ManagedBy.ILM : ManagedBy.LIFECYCLE;
}

if (ilmPolicyName != null) {
return ManagedBy.ILM;
}

if (dataStream.getLifecycle() != null && dataStream.getLifecycle().isEnabled()) {
if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().isEnabled()) {
return ManagedBy.LIFECYCLE;
}

Expand Down Expand Up @@ -660,7 +661,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (DataStreamInfo dataStream : dataStreams) {
dataStream.toXContent(
builder,
DataStreamLifecycle.addEffectiveRetentionParams(params),
BasicDataStreamLifecycle.addEffectiveRetentionParams(params),
rolloverConfiguration,
globalRetention
);
Expand Down
Loading