Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7ba1925
Move DownsamplingRounds our of the wrapper class.
gmarouli Mar 10, 2025
3117ab8
Introduce DataStreamLifecycle dedicated Template class and use it.
gmarouli Mar 11, 2025
bbf6507
Remove the need for explicit nulls from the DataStreamLifecycle
gmarouli Mar 11, 2025
cefb4c9
Fix effective retention bug
gmarouli Mar 11, 2025
f4a570d
Revert enabled flag to boolean instead of Boolean.
gmarouli Mar 11, 2025
7c5c277
Revert enabled flag to boolean instead of Boolean (also from the temp…
gmarouli Mar 11, 2025
ebd743f
Bug fix in XContent parser of the DataStreamLifecycle (downsampling f…
gmarouli Mar 11, 2025
934d155
Fix bwc serialisation errors
gmarouli Mar 12, 2025
67d10d1
Merge branch 'main' into refactor-data-stream-lifecycle
gmarouli Mar 12, 2025
f4ef99f
Fix tests
gmarouli Mar 12, 2025
21bbb31
Use getters in methods with heavier logic to facilitate testing
gmarouli Mar 12, 2025
16f48e9
Convert to the template `DataStreamLifecycle`in assertions when needed
gmarouli Mar 12, 2025
9797a7b
Merge with main
gmarouli Mar 12, 2025
f9cb602
Prepare backport patch to 8.19.0
gmarouli Mar 12, 2025
7ed4c67
Merge with main
gmarouli Mar 12, 2025
298a1cd
Remove incorrect nullable annotations
gmarouli Mar 12, 2025
b955541
Merge with main
gmarouli Mar 18, 2025
ebc6794
Remove the 8.19 patch until all previous backport versions have been …
gmarouli Mar 18, 2025
b025cb9
Make legacy reader and writer more explicit and readable
gmarouli Mar 18, 2025
4f75227
Merge branch 'main' into refactor-data-stream-lifecycle
gmarouli Mar 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ public void testSearchAllResolvesDataStreams() throws Exception {

public void testGetDataStream() throws Exception {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, maximumNumberOfReplicas() + 2).build();
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder().dataRetention(randomPositiveTimeValue()).build();
putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null, null, lifecycle, false);
int numDocsFoo = randomIntBetween(2, 16);
indexDocs("metrics-foo", numDocsFoo);
Expand Down Expand Up @@ -2450,7 +2450,7 @@ static void putComposableIndexTemplate(
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable Map<String, AliasMetadata> aliases,
@Nullable DataStreamLifecycle lifecycle,
@Nullable DataStreamLifecycle.Template lifecycle,
boolean withFailureStore
) throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;

import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.putComposableIndexTemplate;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.randomLifecycle;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.randomLifecycleTemplate;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -39,7 +39,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testGetLifecycle() throws Exception {
DataStreamLifecycle lifecycle = randomLifecycle();
DataStreamLifecycle.Template lifecycle = randomLifecycleTemplate();
putComposableIndexTemplate("id1", null, List.of("with-lifecycle*"), null, null, lifecycle);
putComposableIndexTemplate("id2", null, List.of("without-lifecycle*"), null, null, null);
{
Expand Down Expand Up @@ -192,8 +192,8 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(true));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().dataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().enabled(), equalTo(true));
}

// Disable the lifecycle
Expand All @@ -220,13 +220,13 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(false));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().dataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().enabled(), equalTo(false));
}
}

public void testDeleteLifecycle() throws Exception {
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder().dataRetention(randomPositiveTimeValue()).build();
putComposableIndexTemplate("id1", null, List.of("with-lifecycle*"), null, null, lifecycle);
putComposableIndexTemplate("id2", null, List.of("without-lifecycle*"), null, null, null);
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
Template.builder()
.settings(Settings.EMPTY)
.mappings(mappings)
.lifecycle(DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build())
.lifecycle(DataStreamLifecycle.Template.builder().dataRetention(randomPositiveTimeValue()).build())
)
.dataStreamTemplate(new DataStreamTemplate())
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void cleanup() {

public void testRolloverLifecycle() throws Exception {
// empty lifecycle contains the default rollover
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.EMPTY;

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle, false);
String dataStreamName = "metrics-foo";
Expand Down Expand Up @@ -178,7 +178,7 @@ public void testRolloverLifecycle() throws Exception {
}

public void testRolloverAndRetention() throws Exception {
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder().dataRetention(0).build();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder().dataRetention(TimeValue.ZERO).build();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle, false);

Expand Down Expand Up @@ -321,7 +321,7 @@ public void testOriginationDate() throws Exception {
* days ago, and one with an origination date 1 day ago. After data stream lifecycle runs, we expect the one with the old
* origination date to have been deleted, and the one with the newer origination date to remain.
*/
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueDays(7)).build();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder().dataRetention(TimeValue.timeValueDays(7)).build();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle, false);

Expand Down Expand Up @@ -393,7 +393,7 @@ public void testOriginationDate() throws Exception {
}

public void testUpdatingLifecycleAppliesToAllBackingIndices() throws Exception {
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.DEFAULT;

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle, false);

Expand Down Expand Up @@ -437,7 +437,7 @@ public void testAutomaticForceMerge() throws Exception {
* because all necessary merging has already happened automatically. So in order to detect whether forcemerge has been called, we
* use a SendRequestBehavior in the MockTransportService to detect it.
*/
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.DEFAULT;
disableDataStreamLifecycle();
String dataStreamName = "metrics-foo";
putComposableIndexTemplate(
Expand Down Expand Up @@ -539,7 +539,7 @@ private static void disableDataStreamLifecycle() {

public void testErrorRecordingOnRollover() throws Exception {
// empty lifecycle contains the default rollover
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.EMPTY;
/*
* We set index.auto_expand_replicas to 0-1 so that if we get a single-node cluster it is not yellow. The cluster being yellow
* could result in data stream lifecycle's automatic forcemerge failing, which would result in an unexpected error in the error
Expand Down Expand Up @@ -697,7 +697,7 @@ public void testErrorRecordingOnRollover() throws Exception {
public void testErrorRecordingOnRetention() throws Exception {
// starting with a lifecycle without retention so we can rollover the data stream and manipulate the second generation index such
// that its retention execution fails
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.DEFAULT;

/*
* We set index.auto_expand_replicas to 0-1 so that if we get a single-node cluster it is not yellow. The cluster being yellow
Expand Down Expand Up @@ -871,7 +871,7 @@ public void testErrorRecordingOnRetention() throws Exception {
}

public void testDataLifecycleServiceConfiguresTheMergePolicy() throws Exception {
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.DEFAULT;

putComposableIndexTemplate(
"id1",
Expand Down Expand Up @@ -972,7 +972,7 @@ public void testDataLifecycleServiceConfiguresTheMergePolicy() throws Exception

public void testReenableDataStreamLifecycle() throws Exception {
// start with a lifecycle that's not enabled
DataStreamLifecycle lifecycle = new DataStreamLifecycle(null, null, false);
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder().enabled(false).build();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle, false);
String dataStreamName = "metrics-foo";
Expand Down Expand Up @@ -1031,15 +1031,13 @@ public void testReenableDataStreamLifecycle() throws Exception {

public void testLifecycleAppliedToFailureStore() throws Exception {
// We configure a lifecycle with downsampling to ensure it doesn't fail
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
.dataRetention(20_000)
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder()
.dataRetention(TimeValue.timeValueSeconds(20))
.downsampling(
new DataStreamLifecycle.Downsampling(
List.of(
new DataStreamLifecycle.Downsampling.Round(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("10m"))
)
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("10m"))
)
)
)
Expand Down Expand Up @@ -1205,7 +1203,7 @@ static void putComposableIndexTemplate(
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataStreamLifecycle lifecycle,
@Nullable DataStreamLifecycle.Template lifecycle,
boolean withFailureStore
) throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
Expand Down Expand Up @@ -1268,7 +1266,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
Template.builder()
.settings(Settings.EMPTY)
.lifecycle(
DataStreamLifecycle.newBuilder()
DataStreamLifecycle.Template.builder()
.dataRetention(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void cleanup() {

public void testExplainLifecycle() throws Exception {
// empty lifecycle contains the default rollover
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.EMPTY;

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
String dataStreamName = "metrics-foo";
Expand Down Expand Up @@ -134,7 +134,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());
if (internalCluster().numDataNodes() > 1) {
// If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run
assertThat(explainIndex.getError(), nullValue());
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());

if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) {
// first generation index was rolled over
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testSystemExplainLifecycle() throws Exception {
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(
explainIndex.getLifecycle().getDataStreamRetention(),
explainIndex.getLifecycle().dataRetention(),
equalTo(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS))
);
}
Expand All @@ -274,7 +274,7 @@ public void testSystemExplainLifecycle() throws Exception {

public void testExplainLifecycleForIndicesWithErrors() throws Exception {
// empty lifecycle contains the default rollover
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.EMPTY;

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);

Expand Down Expand Up @@ -329,7 +329,7 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getLifecycle().dataRetention(), nullValue());
assertThat(explainIndex.getRolloverDate(), nullValue());
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue());
// index has not been rolled over yet
Expand Down Expand Up @@ -374,7 +374,7 @@ public void testExplainDataStreamLifecycleForUnmanagedIndices() throws Exception
List.of("metrics-foo*"),
null,
null,
DataStreamLifecycle.newBuilder().enabled(false).build()
DataStreamLifecycle.Template.builder().enabled(false).build()
);
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
Expand Down Expand Up @@ -439,7 +439,7 @@ static void putComposableIndexTemplate(
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataStreamLifecycle lifecycle
@Nullable DataStreamLifecycle.Template lifecycle
) throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ Set<Index> maybeExecuteDownsampling(ClusterState state, DataStream dataStream, L
for (Index index : targetIndices) {
IndexMetadata backingIndexMeta = metadata.getProject().index(index);
assert backingIndexMeta != null : "the data stream backing indices must exist";
List<DataStreamLifecycle.Downsampling.Round> downsamplingRounds = dataStream.getDownsamplingRoundsFor(
List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds = dataStream.getDownsamplingRoundsFor(
index,
metadata.getProject()::index,
nowSupplier
Expand Down Expand Up @@ -515,18 +515,18 @@ Set<Index> maybeExecuteDownsampling(ClusterState state, DataStream dataStream, L
private Set<Index> waitForInProgressOrTriggerDownsampling(
DataStream dataStream,
IndexMetadata backingIndex,
List<DataStreamLifecycle.Downsampling.Round> downsamplingRounds,
List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds,
Metadata metadata
) {
assert dataStream.getIndices().contains(backingIndex.getIndex())
: "the provided backing index must be part of data stream:" + dataStream.getName();
assert downsamplingRounds.isEmpty() == false : "the index should be managed and have matching downsampling rounds";
Set<Index> affectedIndices = new HashSet<>();
DataStreamLifecycle.Downsampling.Round lastRound = downsamplingRounds.get(downsamplingRounds.size() - 1);
DataStreamLifecycle.DownsamplingRound lastRound = downsamplingRounds.get(downsamplingRounds.size() - 1);

Index index = backingIndex.getIndex();
String indexName = index.getName();
for (DataStreamLifecycle.Downsampling.Round round : downsamplingRounds) {
for (DataStreamLifecycle.DownsamplingRound round : downsamplingRounds) {
// the downsample index name for each round is deterministic
String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
DOWNSAMPLED_INDEX_PREFIX,
Expand Down Expand Up @@ -564,7 +564,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
/**
* Issues a request downsample the source index to the downsample index for the specified round.
*/
private void downsampleIndexOnce(DataStreamLifecycle.Downsampling.Round round, String sourceIndex, String downsampleIndexName) {
private void downsampleIndexOnce(DataStreamLifecycle.DownsamplingRound round, String sourceIndex, String downsampleIndexName) {
DownsampleAction.Request request = new DownsampleAction.Request(
TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */,
sourceIndex,
Expand Down Expand Up @@ -597,8 +597,8 @@ private void downsampleIndexOnce(DataStreamLifecycle.Downsampling.Round round, S
private Set<Index> evaluateDownsampleStatus(
DataStream dataStream,
IndexMetadata.DownsampleTaskStatus downsampleStatus,
DataStreamLifecycle.Downsampling.Round currentRound,
DataStreamLifecycle.Downsampling.Round lastRound,
DataStreamLifecycle.DownsamplingRound currentRound,
DataStreamLifecycle.DownsamplingRound lastRound,
Index backingIndex,
Index downsampleIndex
) {
Expand Down
Loading