Skip to content

Commit c9dcc3b

Browse files
authored
Support different downsampling methods through ILM (elastic#136951)
Following elastic#136813, we expose to ILM the new sampling method config in the downsampling API. This will allow users to configure the sampling method in their downsample action of their ILM policies. For example: ``` PUT _ilm/policy/datastream_policy { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_docs": 1 }, "downsample": { "fixed_interval": "1h", "force_merge_index": false, "sampling_method": "aggregate" } } } } } } ```
1 parent c527bbf commit c9dcc3b

File tree

17 files changed

+347
-79
lines changed

17 files changed

+347
-79
lines changed

docs/changelog/136951.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136951
2+
summary: Support different downsampling methods through ILM
3+
area: "ILM+SLM"
4+
type: enhancement
5+
issues: []

docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ To use the `downsample` action in the `hot` phase, the `rollover` action **must*
1919

2020
`fixed_interval`
2121
: (Required, string) The [fixed time interval](docs-content://manage-data/lifecycle/rollup/understanding-groups.md#rollup-understanding-group-intervals) into which the data will be downsampled.
22+
2223
`force_merge_index` {applies_to}`stack: ga 9.3`
2324
: (Optional, boolean) When true, the downsampled index will be [force merged](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-forcemerge) to one [segment](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-segments). Defaults to `true`.
2425

26+
`sampling_method` {applies_to}`stack: ga 9.3`
27+
: (Optional, string) The sampling method that will be used to sample metrics; there are two methods available `aggregate` and
28+
the `last_value`. Defaults to `aggregate`.
2529

2630
## Example [ilm-downsample-ex]
2731

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9210000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ilm_searchable_snapshot_opt_out_clone,9209000
1+
add_sample_method_downsample_ilm,9210000

server/src/test/java/org/elasticsearch/action/downsample/DownsampleConfigTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,11 @@ public static DownsampleConfig randomConfig() {
5050
}
5151

5252
private static DownsampleConfig.SamplingMethod randomSamplingMethod() {
53-
return switch (between(0, 2)) {
54-
case 0 -> null;
55-
case 1 -> DownsampleConfig.SamplingMethod.AGGREGATE;
56-
case 2 -> DownsampleConfig.SamplingMethod.LAST_VALUE;
57-
default -> throw new AssertionError("Illegal randomisation branch");
58-
};
53+
if (between(0, DownsampleConfig.SamplingMethod.values().length) == 0) {
54+
return null;
55+
} else {
56+
return randomFrom(DownsampleConfig.SamplingMethod.values());
57+
}
5958
}
6059

6160
@Override

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
495495
new Phase(
496496
"hot",
497497
TimeValue.ZERO,
498-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled))
498+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, null))
499499
)
500500
)
501501
),
@@ -506,13 +506,13 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
506506
new Phase(
507507
"warm",
508508
TimeValue.ZERO,
509-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled))
509+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, null))
510510
),
511511
"cold",
512512
new Phase(
513513
"cold",
514514
TimeValue.timeValueDays(3),
515-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled))
515+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, null))
516516
)
517517
)
518518
),

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.common.io.stream.StreamInput;
2121
import org.elasticsearch.common.io.stream.StreamOutput;
22+
import org.elasticsearch.core.Nullable;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.index.IndexMode;
2425
import org.elasticsearch.index.IndexSettings;
@@ -49,6 +50,7 @@ public class DownsampleAction implements LifecycleAction {
4950

5051
private static final Logger logger = LogManager.getLogger(DownsampleAction.class);
5152
public static final TransportVersion ILM_FORCE_MERGE_IN_DOWNSAMPLING = TransportVersion.fromName("ilm_downsample_force_merge");
53+
public static final TransportVersion ADD_SAMPLE_METHOD_DOWNSAMPLE_ILM = TransportVersion.fromName("add_sample_method_downsample_ilm");
5254

5355
public static final String NAME = "downsample";
5456
public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-";
@@ -57,6 +59,7 @@ public class DownsampleAction implements LifecycleAction {
5759
public static final TimeValue DEFAULT_WAIT_TIMEOUT = new TimeValue(1, TimeUnit.DAYS);
5860
private static final ParseField FIXED_INTERVAL_FIELD = new ParseField(DownsampleConfig.FIXED_INTERVAL);
5961
private static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
62+
private static final ParseField SAMPLING_METHOD_FIELD = new ParseField("sampling_method");
6063
private static final ParseField WAIT_TIMEOUT_FIELD = new ParseField("wait_timeout");
6164
static final String BWC_CLEANUP_TARGET_INDEX_NAME = "cleanup-target-index";
6265
private static final BiFunction<String, LifecycleExecutionState, String> DOWNSAMPLED_INDEX_NAME_SUPPLIER = (
@@ -65,7 +68,7 @@ public class DownsampleAction implements LifecycleAction {
6568

6669
private static final ConstructingObjectParser<DownsampleAction, Void> PARSER = new ConstructingObjectParser<>(
6770
NAME,
68-
a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1], (Boolean) a[2])
71+
a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1], (Boolean) a[2], (DownsampleConfig.SamplingMethod) a[3])
6972
);
7073

7174
static {
@@ -82,23 +85,37 @@ public class DownsampleAction implements LifecycleAction {
8285
ObjectParser.ValueType.STRING
8386
);
8487
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
88+
PARSER.declareField(
89+
optionalConstructorArg(),
90+
p -> DownsampleConfig.SamplingMethod.fromString(p.text()),
91+
SAMPLING_METHOD_FIELD,
92+
ObjectParser.ValueType.STRING
93+
);
8594
}
8695

8796
private final DateHistogramInterval fixedInterval;
97+
@Nullable
98+
private final DownsampleConfig.SamplingMethod samplingMethod;
8899
private final TimeValue waitTimeout;
89100
private final Boolean forceMergeIndex;
90101

91102
public static DownsampleAction parse(XContentParser parser) {
92103
return PARSER.apply(parser, null);
93104
}
94105

95-
public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout, Boolean forceMergeIndex) {
106+
public DownsampleAction(
107+
final DateHistogramInterval fixedInterval,
108+
final TimeValue waitTimeout,
109+
final Boolean forceMergeIndex,
110+
final DownsampleConfig.SamplingMethod samplingMethod
111+
) {
96112
if (fixedInterval == null) {
97113
throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL_FIELD.getPreferredName() + "] is required.");
98114
}
99115
this.fixedInterval = fixedInterval;
100116
this.waitTimeout = waitTimeout == null ? DEFAULT_WAIT_TIMEOUT : waitTimeout;
101117
this.forceMergeIndex = forceMergeIndex;
118+
this.samplingMethod = samplingMethod;
102119
}
103120

104121
public DownsampleAction(StreamInput in) throws IOException {
@@ -107,7 +124,10 @@ public DownsampleAction(StreamInput in) throws IOException {
107124
in.getTransportVersion().onOrAfter(TransportVersions.V_8_10_X)
108125
? TimeValue.parseTimeValue(in.readString(), WAIT_TIMEOUT_FIELD.getPreferredName())
109126
: DEFAULT_WAIT_TIMEOUT,
110-
in.getTransportVersion().supports(ILM_FORCE_MERGE_IN_DOWNSAMPLING) ? in.readOptionalBoolean() : null
127+
in.getTransportVersion().supports(ILM_FORCE_MERGE_IN_DOWNSAMPLING) ? in.readOptionalBoolean() : null,
128+
in.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_ILM)
129+
? in.readOptionalWriteable(DownsampleConfig.SamplingMethod::read)
130+
: null
111131
);
112132
}
113133

@@ -122,6 +142,9 @@ public void writeTo(StreamOutput out) throws IOException {
122142
if (out.getTransportVersion().supports(ILM_FORCE_MERGE_IN_DOWNSAMPLING)) {
123143
out.writeOptionalBoolean(forceMergeIndex);
124144
}
145+
if (out.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_ILM)) {
146+
out.writeOptionalWriteable(samplingMethod);
147+
}
125148
}
126149

127150
@Override
@@ -132,6 +155,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
132155
if (forceMergeIndex != null) {
133156
builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
134157
}
158+
if (samplingMethod != null) {
159+
builder.field(SAMPLING_METHOD_FIELD.getPreferredName(), samplingMethod.toString());
160+
}
135161
builder.endObject();
136162
return builder;
137163
}
@@ -153,6 +179,21 @@ public Boolean forceMergeIndex() {
153179
return forceMergeIndex;
154180
}
155181

182+
/**
183+
* @return the sampling method configured in the ILM policy or null
184+
*/
185+
@Nullable
186+
public DownsampleConfig.SamplingMethod samplingMethod() {
187+
return samplingMethod;
188+
}
189+
190+
/**
191+
* @return the sampling method that will be applied when the downsample occurs.
192+
*/
193+
public DownsampleConfig.SamplingMethod samplingMethodOrDefault() {
194+
return DownsampleConfig.SamplingMethod.getOrDefault(samplingMethod);
195+
}
196+
156197
@Override
157198
public boolean isSafeAction() {
158199
return false;
@@ -246,7 +287,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
246287
);
247288

248289
// Here is where the actual downsample action takes place
249-
DownsampleStep downsampleStep = new DownsampleStep(downsampleKey, waitForDownsampleIndexKey, client, fixedInterval, waitTimeout);
290+
DownsampleStep downsampleStep = new DownsampleStep(
291+
downsampleKey,
292+
waitForDownsampleIndexKey,
293+
fixedInterval,
294+
waitTimeout,
295+
samplingMethod,
296+
client
297+
);
250298

251299
// Wait until the downsampled index is recovered. We again wait until the configured threshold is breached and
252300
// if the downsampled index has not successfully recovered until then, we rewind to the "cleanup-downsample-index"
@@ -345,12 +393,13 @@ public boolean equals(Object o) {
345393
DownsampleAction that = (DownsampleAction) o;
346394
return Objects.equals(this.fixedInterval, that.fixedInterval)
347395
&& Objects.equals(this.waitTimeout, that.waitTimeout)
348-
&& Objects.equals(this.forceMergeIndex, that.forceMergeIndex);
396+
&& Objects.equals(this.forceMergeIndex, that.forceMergeIndex)
397+
&& Objects.equals(this.samplingMethod, that.samplingMethod);
349398
}
350399

351400
@Override
352401
public int hashCode() {
353-
return Objects.hash(fixedInterval, waitTimeout, forceMergeIndex);
402+
return Objects.hash(fixedInterval, waitTimeout, forceMergeIndex, samplingMethod);
354403
}
355404

356405
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleStep.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,20 @@ public class DownsampleStep extends AsyncActionStep {
3434

3535
private final DateHistogramInterval fixedInterval;
3636
private final TimeValue waitTimeout;
37+
private final DownsampleConfig.SamplingMethod samplingMethod;
3738

3839
public DownsampleStep(
3940
final StepKey key,
4041
final StepKey nextStepKey,
41-
final Client client,
4242
final DateHistogramInterval fixedInterval,
43-
final TimeValue waitTimeout
43+
final TimeValue waitTimeout,
44+
final DownsampleConfig.SamplingMethod samplingMethod,
45+
final Client client
4446
) {
4547
super(key, nextStepKey, client);
4648
this.fixedInterval = fixedInterval;
4749
this.waitTimeout = waitTimeout;
50+
this.samplingMethod = samplingMethod;
4851
}
4952

5053
@Override
@@ -95,7 +98,7 @@ public void performAction(
9598
}
9699

97100
void performDownsampleIndex(ProjectId projectId, String indexName, String downsampleIndexName, ActionListener<Void> listener) {
98-
DownsampleConfig config = new DownsampleConfig(fixedInterval);
101+
DownsampleConfig config = new DownsampleConfig(fixedInterval, samplingMethod);
99102
DownsampleAction.Request request = new DownsampleAction.Request(
100103
TimeValue.MAX_VALUE,
101104
indexName,
@@ -119,9 +122,13 @@ public TimeValue getWaitTimeout() {
119122
return waitTimeout;
120123
}
121124

125+
public DownsampleConfig.SamplingMethod getSamplingMethod() {
126+
return samplingMethod;
127+
}
128+
122129
@Override
123130
public int hashCode() {
124-
return Objects.hash(super.hashCode(), fixedInterval, waitTimeout);
131+
return Objects.hash(super.hashCode(), fixedInterval, waitTimeout, samplingMethod);
125132
}
126133

127134
@Override
@@ -136,7 +143,10 @@ public boolean equals(Object obj) {
136143
return false;
137144
}
138145
DownsampleStep other = (DownsampleStep) obj;
139-
return super.equals(obj) && Objects.equals(fixedInterval, other.fixedInterval) && Objects.equals(waitTimeout, other.waitTimeout);
146+
return super.equals(obj)
147+
&& Objects.equals(fixedInterval, other.fixedInterval)
148+
&& Objects.equals(waitTimeout, other.waitTimeout)
149+
&& Objects.equals(samplingMethod, other.samplingMethod);
140150
}
141151

142152
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,22 @@ static void validateDownsamplingIntervals(Collection<Phase> phases) {
471471
+ "]"
472472
);
473473
}
474+
var firstSamplingMethod = firstDownsample.v2().samplingMethodOrDefault();
475+
var secondSamplingMethod = secondDownsample.v2().samplingMethodOrDefault();
476+
if (Objects.equals(firstSamplingMethod, secondSamplingMethod) == false) {
477+
// All phases need to use the same downsampling method
478+
throw new IllegalArgumentException(
479+
"Downsampling method ["
480+
+ secondSamplingMethod
481+
+ "] for phase ["
482+
+ secondDownsample.v1()
483+
+ "] must be compatible with the method ["
484+
+ firstSamplingMethod
485+
+ "] for phase ["
486+
+ firstDownsample.v1()
487+
+ "]"
488+
);
489+
}
474490
firstDownsample = secondDownsample;
475491
}
476492
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private LifecyclePolicy randomPolicy(Map<String, PhaseConfig> phases) {
9494
if (phaseConfig.hasDownsampling) {
9595
actions.put(
9696
DownsampleAction.NAME,
97-
new DownsampleAction(new DateHistogramInterval("1m"), null, phaseConfig.hasDownsamplingForceMerge)
97+
new DownsampleAction(new DateHistogramInterval("1m"), null, phaseConfig.hasDownsamplingForceMerge, null)
9898
);
9999
}
100100
if (phaseConfig.hasForceMerge) {

0 commit comments

Comments
 (0)