Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
64b01b5
Switch interval validation to accept `DateHistogramInterval`
gmarouli Oct 22, 2025
345346d
Switch interval validation to accept `DateHistogramInterval`
gmarouli Oct 22, 2025
097115d
Do not use DownsampleConfig in DataStreamLifecycle
gmarouli Oct 22, 2025
3195327
Rename downsampling to downsampling rounds
gmarouli Oct 23, 2025
1d383f6
Add downsampling method to the data stream lifecycle config
gmarouli Oct 23, 2025
9107d76
Update rest endpoints
gmarouli Oct 23, 2025
bf80af5
Use the sampling method in the data stream lifecycle
gmarouli Oct 23, 2025
dd69548
Update docs/changelog/137023.yaml
gmarouli Oct 23, 2025
a328a4a
Update 137023.yaml
gmarouli Oct 23, 2025
ff2708c
Fix test
gmarouli Oct 23, 2025
23823bf
merge with main
gmarouli Oct 29, 2025
53e7079
Rename transport version
gmarouli Oct 29, 2025
67099d3
Fix and simplify DataStreamLifecycleTests & DataStreamLifecycleTempla…
gmarouli Oct 29, 2025
58a4994
Make random sampling method more dynamic
gmarouli Oct 29, 2025
a1d989c
Improvements based on review
gmarouli Oct 29, 2025
5340685
merge with main
gmarouli Oct 31, 2025
231506c
Add explanation about DLM using the source sampling method
gmarouli Oct 31, 2025
8d4fa3d
Test template composition of data stream lifecycle.
gmarouli Oct 31, 2025
3c10f8b
Fix test
gmarouli Oct 31, 2025
fcd8dd9
Remove unused constructor
gmarouli Oct 31, 2025
67dd8ea
Merge branch 'main' into refactor-downsampling-dlm
gmarouli Oct 31, 2025
e23f767
Fix test adding sampling method when there are no rounds
gmarouli Oct 31, 2025
4ca957f
Merge branch 'main' into refactor-downsampling-dlm
gmarouli Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137023.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137023
summary: Support choosing the downsampling method in data stream lifecycle
area: "Data streams"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,10 @@ Set<Index> maybeExecuteDownsampling(ProjectState projectState, DataStream dataSt
// - has matching downsample rounds
// - is read-only
// So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round
affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, project));
var downsamplingMethod = dataStream.getDataLifecycle().downsamplingMethod();
affectedIndices.addAll(
waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, downsamplingMethod, project)
);
}
}

Expand All @@ -541,6 +544,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
DataStream dataStream,
IndexMetadata backingIndex,
List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds,
DownsampleConfig.SamplingMethod downsamplingMethod,
ProjectMetadata project
) {
assert dataStream.getIndices().contains(backingIndex.getIndex())
Expand All @@ -556,7 +560,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
DOWNSAMPLED_INDEX_PREFIX,
backingIndex,
round.config().getFixedInterval()
round.fixedInterval()
);
IndexMetadata targetDownsampleIndexMeta = project.index(downsampleIndexName);
boolean targetDownsampleIndexExists = targetDownsampleIndexMeta != null;
Expand All @@ -568,7 +572,8 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
INDEX_DOWNSAMPLE_STATUS.get(targetDownsampleIndexMeta.getSettings()),
round,
lastRound,
index,
downsamplingMethod,
backingIndex,
targetDownsampleIndexMeta.getIndex()
);
if (downsamplingNotComplete.isEmpty() == false) {
Expand All @@ -580,7 +585,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
// no maintenance needed for previously started downsampling actions and we are on the last matching round so it's time
// to kick off downsampling
affectedIndices.add(index);
downsampleIndexOnce(round, project.id(), indexName, downsampleIndexName);
downsampleIndexOnce(round, downsamplingMethod, project.id(), backingIndex, downsampleIndexName);
}
}
}
Expand All @@ -592,16 +597,22 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
*/
private void downsampleIndexOnce(
DataStreamLifecycle.DownsamplingRound round,
DownsampleConfig.SamplingMethod requestedDownsamplingMethod,
ProjectId projectId,
String sourceIndex,
IndexMetadata sourceIndexMetadata,
String downsampleIndexName
) {
var sourceIndexSamplingMethod = DownsampleConfig.SamplingMethod.fromIndexMetadata(sourceIndexMetadata);
String sourceIndex = sourceIndexMetadata.getIndex().getName();
DownsampleAction.Request request = new DownsampleAction.Request(
TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */,
sourceIndex,
downsampleIndexName,
null,
round.config()
new DownsampleConfig(
round.fixedInterval(),
sourceIndexSamplingMethod == null ? requestedDownsamplingMethod : sourceIndexSamplingMethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is the reasoning behind this sampling method logic? Why are we specifying the sampling method from the source index metadata? Is there a reason we can't just always pass the requestedDownsamplingMethod? I'm not saying it's wrong, it's just unexpected to me. Could you explain your thinking at least in a comment in the code and optionally here in the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this is to make data stream lifecycle handle changes of the sampling method gracefully. Consider the following scenario.

  1. User has a data stream with a couple of downsampling rounds, and there are backing indices that are half way in their downsampled journey, for example they have been downsampled once, but there are still more rounds that apply to them.
  2. User decides to change the sampling method of their lifecycle for this data stream.
  3. If we use the requested method to downsample an already downsampled index, this will fail. DLM will report the errors for this indices until it's time to delete them.

The same thing applies for ILM, but in ILM a user can create a new policy with the new sampling method so it will only be applied to the newest backing indices.

Considering that we want data stream lifecycle to work seamlessly, we thought that using the downsampling method of the source index when available is closer to how the user is expecting it to work (considering the limitations).

Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that definitely makes sense. Thanks for the explanation :). Could you put that explanation somewhere in this method?

)
);
transportActionsDeduplicator.executeOnce(
Tuple.tuple(projectId, request),
Expand Down Expand Up @@ -632,11 +643,12 @@ private Set<Index> evaluateDownsampleStatus(
IndexMetadata.DownsampleTaskStatus downsampleStatus,
DataStreamLifecycle.DownsamplingRound currentRound,
DataStreamLifecycle.DownsamplingRound lastRound,
Index backingIndex,
DownsampleConfig.SamplingMethod downsamplingMethod,
IndexMetadata backingIndex,
Index downsampleIndex
) {
Set<Index> affectedIndices = new HashSet<>();
String indexName = backingIndex.getName();
String indexName = backingIndex.getIndex().getName();
String downsampleIndexName = downsampleIndex.getName();
return switch (downsampleStatus) {
case UNKNOWN -> {
Expand Down Expand Up @@ -683,15 +695,15 @@ private Set<Index> evaluateDownsampleStatus(
// NOTE that the downsample request is made through the deduplicator so it will only really be executed if
// there isn't one already in-flight. This can happen if a previous request timed-out, failed, or there was a
// master failover and data stream lifecycle needed to restart
downsampleIndexOnce(currentRound, projectId, indexName, downsampleIndexName);
affectedIndices.add(backingIndex);
downsampleIndexOnce(currentRound, downsamplingMethod, projectId, backingIndex, downsampleIndexName);
affectedIndices.add(backingIndex.getIndex());
yield affectedIndices;
}
case SUCCESS -> {
if (dataStream.getIndices().contains(downsampleIndex) == false) {
// at this point the source index is part of the data stream and the downsample index is complete but not
// part of the data stream. we need to replace the source index with the downsample index in the data stream
affectedIndices.add(backingIndex);
affectedIndices.add(backingIndex.getIndex());
replaceBackingIndexWithDownsampleIndexOnce(projectId, dataStream, indexName, downsampleIndexName);
}
yield affectedIndices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.rest.RestRequest.Method.PUT;
import static org.elasticsearch.rest.RestUtils.getAckTimeout;
Expand All @@ -29,6 +30,9 @@
@ServerlessScope(Scope.PUBLIC)
public class RestPutDataStreamLifecycleAction extends BaseRestHandler {

private static final String SUPPORTS_DOWNSAMPLING_METHOD = "dlm.downsampling_method";
private static final Set<String> CAPABILITIES = Set.of(SUPPORTS_DOWNSAMPLING_METHOD);

@Override
public String getName() {
return "put_data_lifecycles_action";
Expand All @@ -44,13 +48,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try (XContentParser parser = request.contentParser()) {
PutDataStreamLifecycleAction.Request putLifecycleRequest = PutDataStreamLifecycleAction.Request.parseRequest(
parser,
(dataRetention, enabled, downsampling) -> new PutDataStreamLifecycleAction.Request(
(dataRetention, enabled, downsamplingRounds, downsamplingMethod) -> new PutDataStreamLifecycleAction.Request(
getMasterNodeTimeout(request),
getAckTimeout(request),
Strings.splitStringByCommaToArray(request.param("name")),
dataRetention,
enabled,
downsampling
downsamplingRounds,
downsamplingMethod
)
);
putLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, putLifecycleRequest.indicesOptions()));
Expand All @@ -61,4 +66,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
}
}

@Override
public Set<String> supportedCapabilities() {
return CAPABILITIES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.datastreams;

import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
Expand All @@ -25,6 +24,7 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.indices.EmptySystemIndices;
Expand Down Expand Up @@ -151,46 +151,50 @@ public void testLifecycleComposition() {
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.createDataLifecycleTemplate(
true,
randomRetention(),
randomDownsampling()
randomDownsampling(),
ResettableValue.create(DataStreamLifecycleFixtures.randomSamplingMethod())
);
List<DataStreamLifecycle.Template> lifecycles = List.of(lifecycle);
DataStreamLifecycle result = composeDataLifecycles(lifecycles).build();
// Defaults to true
assertThat(result.enabled(), equalTo(true));
assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention().get()));
assertThat(result.downsampling(), equalTo(lifecycle.downsampling().get()));
assertThat(result.downsamplingRounds(), equalTo(lifecycle.downsamplingRounds().get()));
}
// If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones
// Enabled is always true unless it's explicitly set to false
{
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.createDataLifecycleTemplate(
false,
randomPositiveTimeValue(),
randomRounds()
randomRounds(),
DataStreamLifecycleFixtures.randomSamplingMethod()
);
List<DataStreamLifecycle.Template> lifecycles = List.of(lifecycle, DataStreamLifecycle.Template.DATA_DEFAULT);
DataStreamLifecycle result = composeDataLifecycles(lifecycles).build();
assertThat(result.enabled(), equalTo(true));
assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention().get()));
assertThat(result.downsampling(), equalTo(lifecycle.downsampling().get()));
assertThat(result.downsamplingRounds(), equalTo(lifecycle.downsamplingRounds().get()));
}
// If both lifecycle have all properties, then the latest one overwrites all the others
{
DataStreamLifecycle.Template lifecycle1 = DataStreamLifecycle.createDataLifecycleTemplate(
false,
randomPositiveTimeValue(),
randomRounds()
randomRounds(),
DataStreamLifecycleFixtures.randomSamplingMethod()
);
DataStreamLifecycle.Template lifecycle2 = DataStreamLifecycle.createDataLifecycleTemplate(
true,
randomPositiveTimeValue(),
randomRounds()
randomRounds(),
DataStreamLifecycleFixtures.randomSamplingMethod()
);
List<DataStreamLifecycle.Template> lifecycles = List.of(lifecycle1, lifecycle2);
DataStreamLifecycle result = composeDataLifecycles(lifecycles).build();
assertThat(result.enabled(), equalTo(lifecycle2.enabled()));
assertThat(result.dataRetention(), equalTo(lifecycle2.dataRetention().get()));
assertThat(result.downsampling(), equalTo(lifecycle2.downsampling().get()));
assertThat(result.downsamplingRounds(), equalTo(lifecycle2.downsamplingRounds().get()));
}
}

Expand Down Expand Up @@ -255,7 +259,7 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomRounds() {
List<DataStreamLifecycle.DownsamplingRound> rounds = new ArrayList<>();
var previous = new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueDays(randomIntBetween(1, 365)),
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
new DateHistogramInterval(randomIntBetween(1, 24) + "h")
);
rounds.add(previous);
for (int i = 0; i < count; i++) {
Expand All @@ -268,9 +272,7 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomRounds() {

private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) {
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
var fixedInterval = new DownsampleConfig(
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
);
var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms");
return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
.setGeneration(3)
.setAllowCustomRouting(true)
.setIndexMode(IndexMode.STANDARD)
.setLifecycle(DataStreamLifecycle.createDataLifecycle(false, null, null))
.setLifecycle(DataStreamLifecycle.createDataLifecycle(false, null, null, null))
.setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
import static org.elasticsearch.test.ESIntegTestCase.client;
import static org.elasticsearch.test.ESTestCase.between;
import static org.elasticsearch.test.ESTestCase.frequently;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -146,7 +147,8 @@ static DataStreamLifecycle.Template randomDataLifecycleTemplate() {
return DataStreamLifecycle.createDataLifecycleTemplate(
frequently(),
randomResettable(ESTestCase::randomTimeValue),
randomResettable(DataStreamLifecycleFixtures::randomDownsamplingRounds)
randomResettable(DataStreamLifecycleFixtures::randomDownsamplingRounds),
randomResettable(DataStreamLifecycleFixtures::randomSamplingMethod)
);
}

Expand All @@ -164,7 +166,7 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRou
List<DataStreamLifecycle.DownsamplingRound> rounds = new ArrayList<>();
var previous = new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueDays(randomIntBetween(1, 365)),
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
new DateHistogramInterval(randomIntBetween(1, 24) + "h")
);
rounds.add(previous);
for (int i = 0; i < count; i++) {
Expand All @@ -177,9 +179,16 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRou

private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) {
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
var fixedInterval = new DownsampleConfig(
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
);
var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms");
return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval);
}

public static DownsampleConfig.SamplingMethod randomSamplingMethod() {
return switch (between(0, 2)) {
case 0 -> null;
case 1 -> DownsampleConfig.SamplingMethod.AGGREGATE;
case 2 -> DownsampleConfig.SamplingMethod.LAST_VALUE;
default -> throw new IllegalStateException("Unknown randomisation path");
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1228,9 +1228,7 @@ public void testDownsampling() throws Exception {
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.put("index.routing_path", "@timestamp"),
DataStreamLifecycle.dataLifecycleBuilder()
.downsampling(
List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))))
)
.downsamplingRounds(List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m"))))
.dataRetention(TimeValue.MAX_VALUE)
.build(),
now
Expand Down Expand Up @@ -1377,9 +1375,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.put("index.routing_path", "@timestamp"),
DataStreamLifecycle.dataLifecycleBuilder()
.downsampling(
List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))))
)
.downsamplingRounds(List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m"))))
.dataRetention(TimeValue.MAX_VALUE)
.build(),
now
Expand Down Expand Up @@ -1662,9 +1658,7 @@ private ClusterState downsampleSetup(ProjectId projectId, String dataStreamName,
settings(IndexVersion.current()).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.put("index.routing_path", "@timestamp"),
DataStreamLifecycle.dataLifecycleBuilder()
.downsampling(
List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))))
)
.downsamplingRounds(List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m"))))
.dataRetention(TimeValue.timeValueMillis(1))
.build(),
now
Expand Down
Loading