Skip to content

Commit ec0efaf

Browse files
authored
Support choosing the downsampling method in data stream lifecycle (#137023)
Following #136813, we expose to data stream lifecycle the new sampling method config in the downsampling API. This will allow users to configure the sampling method directly in the lifecycle configuration. For example: ``` PUT _data_stream/my-ds/_lifecycle { "data_retention": "10d", "downsampling_method": "last_value", "downsampling": [ { "after": "1d", "fixed_interval": "5m } ] } ```
1 parent 0b8a37a commit ec0efaf

File tree

36 files changed

+871
-389
lines changed

36 files changed

+871
-389
lines changed

docs/changelog/137023.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137023
2+
summary: Support choosing the downsampling method in data stream lifecycle
3+
area: "Data streams"
4+
type: enhancement
5+
issues: []

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,10 @@ Set<Index> maybeExecuteDownsampling(ProjectState projectState, DataStream dataSt
524524
// - has matching downsample rounds
525525
// - is read-only
526526
// So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round
527-
affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, project));
527+
var downsamplingMethod = dataStream.getDataLifecycle().downsamplingMethod();
528+
affectedIndices.addAll(
529+
waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, downsamplingMethod, project)
530+
);
528531
}
529532
}
530533

@@ -541,6 +544,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
541544
DataStream dataStream,
542545
IndexMetadata backingIndex,
543546
List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds,
547+
DownsampleConfig.SamplingMethod downsamplingMethod,
544548
ProjectMetadata project
545549
) {
546550
assert dataStream.getIndices().contains(backingIndex.getIndex())
@@ -556,7 +560,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
556560
String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
557561
DOWNSAMPLED_INDEX_PREFIX,
558562
backingIndex,
559-
round.config().getFixedInterval()
563+
round.fixedInterval()
560564
);
561565
IndexMetadata targetDownsampleIndexMeta = project.index(downsampleIndexName);
562566
boolean targetDownsampleIndexExists = targetDownsampleIndexMeta != null;
@@ -568,7 +572,8 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
568572
INDEX_DOWNSAMPLE_STATUS.get(targetDownsampleIndexMeta.getSettings()),
569573
round,
570574
lastRound,
571-
index,
575+
downsamplingMethod,
576+
backingIndex,
572577
targetDownsampleIndexMeta.getIndex()
573578
);
574579
if (downsamplingNotComplete.isEmpty() == false) {
@@ -580,7 +585,7 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
580585
// no maintenance needed for previously started downsampling actions and we are on the last matching round so it's time
581586
// to kick off downsampling
582587
affectedIndices.add(index);
583-
downsampleIndexOnce(round, project.id(), indexName, downsampleIndexName);
588+
downsampleIndexOnce(round, downsamplingMethod, project.id(), backingIndex, downsampleIndexName);
584589
}
585590
}
586591
}
@@ -592,16 +597,30 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
592597
*/
593598
private void downsampleIndexOnce(
594599
DataStreamLifecycle.DownsamplingRound round,
600+
DownsampleConfig.SamplingMethod requestedDownsamplingMethod,
595601
ProjectId projectId,
596-
String sourceIndex,
602+
IndexMetadata sourceIndexMetadata,
597603
String downsampleIndexName
598604
) {
605+
// When an index is already downsampled with a method, we require all later downsampling rounds to use the same method.
606+
// This is necessary to preserve the relation of the downsampled index to the raw data. For example, if an index is already
607+
// downsampled and downsampled it again to 1 hour; we know that a document represents either the aggregated raw data of an hour
608+
// or the last value of the raw data within this hour. If we mix the methods, we cannot derive any meaning from them.
609+
// Furthermore, data stream lifecycle is configured on the data stream level and not on the individual index level, meaning that
610+
// when a user changes downsampling method, some indices would not be able to be downsampled anymore.
611+
// For this reason, when we encounter an already downsampled index, we use the source downsampling method which might be different
612+
// from the requested one.
613+
var sourceIndexSamplingMethod = DownsampleConfig.SamplingMethod.fromIndexMetadata(sourceIndexMetadata);
614+
String sourceIndex = sourceIndexMetadata.getIndex().getName();
599615
DownsampleAction.Request request = new DownsampleAction.Request(
600616
TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */,
601617
sourceIndex,
602618
downsampleIndexName,
603619
null,
604-
round.config()
620+
new DownsampleConfig(
621+
round.fixedInterval(),
622+
sourceIndexSamplingMethod == null ? requestedDownsamplingMethod : sourceIndexSamplingMethod
623+
)
605624
);
606625
transportActionsDeduplicator.executeOnce(
607626
Tuple.tuple(projectId, request),
@@ -632,11 +651,12 @@ private Set<Index> evaluateDownsampleStatus(
632651
IndexMetadata.DownsampleTaskStatus downsampleStatus,
633652
DataStreamLifecycle.DownsamplingRound currentRound,
634653
DataStreamLifecycle.DownsamplingRound lastRound,
635-
Index backingIndex,
654+
DownsampleConfig.SamplingMethod downsamplingMethod,
655+
IndexMetadata backingIndex,
636656
Index downsampleIndex
637657
) {
638658
Set<Index> affectedIndices = new HashSet<>();
639-
String indexName = backingIndex.getName();
659+
String indexName = backingIndex.getIndex().getName();
640660
String downsampleIndexName = downsampleIndex.getName();
641661
return switch (downsampleStatus) {
642662
case UNKNOWN -> {
@@ -683,15 +703,15 @@ private Set<Index> evaluateDownsampleStatus(
683703
// NOTE that the downsample request is made through the deduplicator so it will only really be executed if
684704
// there isn't one already in-flight. This can happen if a previous request timed-out, failed, or there was a
685705
// master failover and data stream lifecycle needed to restart
686-
downsampleIndexOnce(currentRound, projectId, indexName, downsampleIndexName);
687-
affectedIndices.add(backingIndex);
706+
downsampleIndexOnce(currentRound, downsamplingMethod, projectId, backingIndex, downsampleIndexName);
707+
affectedIndices.add(backingIndex.getIndex());
688708
yield affectedIndices;
689709
}
690710
case SUCCESS -> {
691711
if (dataStream.getIndices().contains(downsampleIndex) == false) {
692712
// at this point the source index is part of the data stream and the downsample index is complete but not
693713
// part of the data stream. we need to replace the source index with the downsample index in the data stream
694-
affectedIndices.add(backingIndex);
714+
affectedIndices.add(backingIndex.getIndex());
695715
replaceBackingIndexWithDownsampleIndexOnce(projectId, dataStream, indexName, downsampleIndexName);
696716
}
697717
yield affectedIndices;

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamLifecycleAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.util.List;
24+
import java.util.Set;
2425

2526
import static org.elasticsearch.rest.RestRequest.Method.PUT;
2627
import static org.elasticsearch.rest.RestUtils.getAckTimeout;
@@ -29,6 +30,9 @@
2930
@ServerlessScope(Scope.PUBLIC)
3031
public class RestPutDataStreamLifecycleAction extends BaseRestHandler {
3132

33+
private static final String SUPPORTS_DOWNSAMPLING_METHOD = "dlm.downsampling_method";
34+
private static final Set<String> CAPABILITIES = Set.of(SUPPORTS_DOWNSAMPLING_METHOD);
35+
3236
@Override
3337
public String getName() {
3438
return "put_data_lifecycles_action";
@@ -44,13 +48,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4448
try (XContentParser parser = request.contentParser()) {
4549
PutDataStreamLifecycleAction.Request putLifecycleRequest = PutDataStreamLifecycleAction.Request.parseRequest(
4650
parser,
47-
(dataRetention, enabled, downsampling) -> new PutDataStreamLifecycleAction.Request(
51+
(dataRetention, enabled, downsamplingRounds, downsamplingMethod) -> new PutDataStreamLifecycleAction.Request(
4852
getMasterNodeTimeout(request),
4953
getAckTimeout(request),
5054
Strings.splitStringByCommaToArray(request.param("name")),
5155
dataRetention,
5256
enabled,
53-
downsampling
57+
downsamplingRounds,
58+
downsamplingMethod
5459
)
5560
);
5661
putLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, putLifecycleRequest.indicesOptions()));
@@ -61,4 +66,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
6166
);
6267
}
6368
}
69+
70+
@Override
71+
public Set<String> supportedCapabilities() {
72+
return CAPABILITIES;
73+
}
6474
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.composeDataLifecycles;
4444
import static org.elasticsearch.common.settings.Settings.builder;
4545
import static org.elasticsearch.datastreams.MetadataDataStreamRolloverServiceTests.createSettingsProvider;
46+
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.randomResettable;
47+
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.randomSamplingMethod;
4648
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
4749
import static org.hamcrest.Matchers.containsString;
4850
import static org.hamcrest.Matchers.equalTo;
@@ -146,51 +148,63 @@ public void testLifecycleComposition() {
146148
List<DataStreamLifecycle.Template> lifecycles = List.of();
147149
assertThat(composeDataLifecycles(lifecycles), nullValue());
148150
}
149-
// One lifecycle results to this lifecycle as the final
151+
// One lifecycle results in this lifecycle as the final
150152
{
153+
ResettableValue<List<DataStreamLifecycle.DownsamplingRound>> downsamplingRounds = randomDownsampling();
151154
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.createDataLifecycleTemplate(
152155
true,
153156
randomRetention(),
154-
randomDownsampling()
157+
downsamplingRounds,
158+
randomResettable(() -> randomSamplingMethod(downsamplingRounds.get()))
155159
);
156160
List<DataStreamLifecycle.Template> lifecycles = List.of(lifecycle);
157161
DataStreamLifecycle result = composeDataLifecycles(lifecycles).build();
158162
// Defaults to true
159163
assertThat(result.enabled(), equalTo(true));
160164
assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention().get()));
161-
assertThat(result.downsampling(), equalTo(lifecycle.downsampling().get()));
165+
assertThat(result.downsamplingRounds(), equalTo(lifecycle.downsamplingRounds().get()));
166+
assertThat(result.downsamplingMethod(), equalTo(lifecycle.downsamplingMethod().get()));
162167
}
163168
// If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones
164169
// Enabled is always true unless it's explicitly set to false
165170
{
171+
List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds = randomRounds();
166172
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.createDataLifecycleTemplate(
167173
false,
168174
randomPositiveTimeValue(),
169-
randomRounds()
175+
downsamplingRounds,
176+
randomSamplingMethod(downsamplingRounds)
170177
);
171178
List<DataStreamLifecycle.Template> lifecycles = List.of(lifecycle, DataStreamLifecycle.Template.DATA_DEFAULT);
172179
DataStreamLifecycle result = composeDataLifecycles(lifecycles).build();
173180
assertThat(result.enabled(), equalTo(true));
174181
assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention().get()));
175-
assertThat(result.downsampling(), equalTo(lifecycle.downsampling().get()));
182+
assertThat(result.downsamplingRounds(), equalTo(lifecycle.downsamplingRounds().get()));
183+
assertThat(result.downsamplingMethod(), equalTo(lifecycle.downsamplingMethod().get()));
176184
}
177-
// If both lifecycle have all properties, then the latest one overwrites all the others
185+
// If both lifecycles have all properties, then the latest one overwrites all the others
178186
{
187+
DownsampleConfig.SamplingMethod downsamplingMethod1 = randomFrom(DownsampleConfig.SamplingMethod.LAST_VALUE);
179188
DataStreamLifecycle.Template lifecycle1 = DataStreamLifecycle.createDataLifecycleTemplate(
180189
false,
181190
randomPositiveTimeValue(),
182-
randomRounds()
191+
randomRounds(),
192+
downsamplingMethod1
183193
);
184194
DataStreamLifecycle.Template lifecycle2 = DataStreamLifecycle.createDataLifecycleTemplate(
185195
true,
186196
randomPositiveTimeValue(),
187-
randomRounds()
197+
randomRounds(),
198+
downsamplingMethod1 == DownsampleConfig.SamplingMethod.LAST_VALUE
199+
? DownsampleConfig.SamplingMethod.AGGREGATE
200+
: DownsampleConfig.SamplingMethod.LAST_VALUE
188201
);
189202
List<DataStreamLifecycle.Template> lifecycles = List.of(lifecycle1, lifecycle2);
190203
DataStreamLifecycle result = composeDataLifecycles(lifecycles).build();
191204
assertThat(result.enabled(), equalTo(lifecycle2.enabled()));
192205
assertThat(result.dataRetention(), equalTo(lifecycle2.dataRetention().get()));
193-
assertThat(result.downsampling(), equalTo(lifecycle2.downsampling().get()));
206+
assertThat(result.downsamplingRounds(), equalTo(lifecycle2.downsamplingRounds().get()));
207+
assertThat(result.downsamplingMethod(), equalTo(lifecycle2.downsamplingMethod().get()));
194208
}
195209
}
196210

@@ -255,7 +269,7 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomRounds() {
255269
List<DataStreamLifecycle.DownsamplingRound> rounds = new ArrayList<>();
256270
var previous = new DataStreamLifecycle.DownsamplingRound(
257271
TimeValue.timeValueDays(randomIntBetween(1, 365)),
258-
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
272+
new DateHistogramInterval(randomIntBetween(1, 24) + "h")
259273
);
260274
rounds.add(previous);
261275
for (int i = 0; i < count; i++) {
@@ -268,9 +282,7 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomRounds() {
268282

269283
private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) {
270284
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
271-
var fixedInterval = new DownsampleConfig(
272-
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
273-
);
285+
var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms");
274286
return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval);
275287
}
276288

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
154154
.setGeneration(3)
155155
.setAllowCustomRouting(true)
156156
.setIndexMode(IndexMode.STANDARD)
157-
.setLifecycle(DataStreamLifecycle.createDataLifecycle(false, null, null))
157+
.setLifecycle(DataStreamLifecycle.createDataLifecycle(false, null, null, null))
158158
.setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED)
159159
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
160160
.build();

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838

3939
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
4040
import static org.elasticsearch.test.ESIntegTestCase.client;
41+
import static org.elasticsearch.test.ESTestCase.between;
4142
import static org.elasticsearch.test.ESTestCase.frequently;
43+
import static org.elasticsearch.test.ESTestCase.randomFrom;
4244
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
4345
import static org.junit.Assert.assertTrue;
4446

@@ -143,14 +145,18 @@ static void putComposableIndexTemplate(
143145
}
144146

145147
static DataStreamLifecycle.Template randomDataLifecycleTemplate() {
148+
ResettableValue<List<DataStreamLifecycle.DownsamplingRound>> downsampling = randomResettable(
149+
DataStreamLifecycleFixtures::randomDownsamplingRounds
150+
);
146151
return DataStreamLifecycle.createDataLifecycleTemplate(
147152
frequently(),
148153
randomResettable(ESTestCase::randomTimeValue),
149-
randomResettable(DataStreamLifecycleFixtures::randomDownsamplingRounds)
154+
downsampling,
155+
randomResettable(() -> randomSamplingMethod(downsampling.get()))
150156
);
151157
}
152158

153-
private static <T> ResettableValue<T> randomResettable(Supplier<T> supplier) {
159+
public static <T> ResettableValue<T> randomResettable(Supplier<T> supplier) {
154160
return switch (randomIntBetween(0, 2)) {
155161
case 0 -> ResettableValue.undefined();
156162
case 1 -> ResettableValue.reset();
@@ -164,7 +170,7 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRou
164170
List<DataStreamLifecycle.DownsamplingRound> rounds = new ArrayList<>();
165171
var previous = new DataStreamLifecycle.DownsamplingRound(
166172
TimeValue.timeValueDays(randomIntBetween(1, 365)),
167-
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
173+
new DateHistogramInterval(randomIntBetween(1, 24) + "h")
168174
);
169175
rounds.add(previous);
170176
for (int i = 0; i < count; i++) {
@@ -177,9 +183,19 @@ private static List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRou
177183

178184
private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) {
179185
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
180-
var fixedInterval = new DownsampleConfig(
181-
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
182-
);
186+
var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms");
183187
return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval);
184188
}
189+
190+
/**
191+
* In order to produce valid data stream lifecycle configurations, the sampling method can be defined only when
192+
* the downsampling rounds are also defined.
193+
*/
194+
public static DownsampleConfig.SamplingMethod randomSamplingMethod(List<DataStreamLifecycle.DownsamplingRound> downsamplingRounds) {
195+
if (downsamplingRounds == null || between(0, DownsampleConfig.SamplingMethod.values().length) == 0) {
196+
return null;
197+
} else {
198+
return randomFrom(DownsampleConfig.SamplingMethod.values());
199+
}
200+
}
185201
}

0 commit comments

Comments
 (0)