Skip to content

Commit b96e416

Browse files
committed
Add force_merge_max_num_segments option to downsample api and downsample ilm action.
1 parent 74bb0f9 commit b96e416

File tree

14 files changed

+170
-72
lines changed

14 files changed

+170
-72
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private static DataStreamLifecycle.Downsampling randomDownsampling() {
154154
List<DataStreamLifecycle.Downsampling.Round> rounds = new ArrayList<>();
155155
var previous = new DataStreamLifecycle.Downsampling.Round(
156156
TimeValue.timeValueDays(randomIntBetween(1, 365)),
157-
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
157+
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"), null)
158158
);
159159
rounds.add(previous);
160160
for (int i = 0; i < count; i++) {
@@ -170,7 +170,7 @@ private static DataStreamLifecycle.Downsampling randomDownsampling() {
170170
private static DataStreamLifecycle.Downsampling.Round nextRound(DataStreamLifecycle.Downsampling.Round previous) {
171171
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
172172
var fixedInterval = new DownsampleConfig(
173-
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
173+
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms"), null
174174
);
175175
return new DataStreamLifecycle.Downsampling.Round(after, fixedInterval);
176176
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ static TransportVersion def(int id) {
184184
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
185185
public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
186186
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
187+
public static final TransportVersion DOWNSAMPLE_FORCE_MERGE_MAX_NUM_SEGMENTS_PARAMETER = def(9_026_0_00);
187188

188189
/*
189190
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/downsample/DownsampleConfig.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.downsample;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.common.Rounding;
1415
import org.elasticsearch.common.Strings;
@@ -31,6 +32,7 @@
3132
import java.util.Objects;
3233

3334
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
35+
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
3436

3537
/**
3638
* This class holds the configuration details of a DownsampleAction that downsamples time series
@@ -56,20 +58,24 @@ public class DownsampleConfig implements NamedWriteable, ToXContentObject {
5658

5759
private static final String NAME = "downsample/action/config";
5860
public static final String FIXED_INTERVAL = "fixed_interval";
61+
public static final String FORCE_MERGE_MAX_NUM_SEGMENTS = "force_merge_max_num_segments";
5962
public static final String TIME_ZONE = "time_zone";
6063
public static final String DEFAULT_TIMEZONE = ZoneId.of("UTC").getId();
64+
private static final int DEFAULT_MAX_NUM_SEGMENTS = 1;
6165

6266
private static final String timestampField = DataStreamTimestampFieldMapper.DEFAULT_PATH;
6367
private final DateHistogramInterval fixedInterval;
68+
private final int forceMergeMaxNumSegments;
6469
private final String timeZone = DEFAULT_TIMEZONE;
6570
private final String intervalType = FIXED_INTERVAL;
6671

6772
private static final ConstructingObjectParser<DownsampleConfig, Void> PARSER;
6873
static {
6974
PARSER = new ConstructingObjectParser<>(NAME, a -> {
7075
DateHistogramInterval fixedInterval = (DateHistogramInterval) a[0];
76+
Integer forceMergeMaxNumSegments = (Integer) a[1];
7177
if (fixedInterval != null) {
72-
return new DownsampleConfig(fixedInterval);
78+
return new DownsampleConfig(fixedInterval, forceMergeMaxNumSegments);
7379
} else {
7480
throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL + "] is required.");
7581
}
@@ -81,24 +87,40 @@ public class DownsampleConfig implements NamedWriteable, ToXContentObject {
8187
new ParseField(FIXED_INTERVAL),
8288
ObjectParser.ValueType.STRING
8389
);
90+
PARSER.declareField(
91+
optionalConstructorArg(),
92+
p -> p.intValue(),
93+
new ParseField(FORCE_MERGE_MAX_NUM_SEGMENTS),
94+
ObjectParser.ValueType.INT
95+
);
8496
}
8597

8698
/**
8799
* Create a new {@link DownsampleConfig} using the given configuration parameters.
88100
* @param fixedInterval the fixed interval to use for computing the date histogram for the rolled up documents (required).
89101
*/
90-
public DownsampleConfig(final DateHistogramInterval fixedInterval) {
102+
public DownsampleConfig(final DateHistogramInterval fixedInterval, Integer forceMergeMaxNumSegments) {
91103
if (fixedInterval == null) {
92104
throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL + "] is required.");
93105
}
94106
this.fixedInterval = fixedInterval;
95107

96108
// validate interval
97109
createRounding(this.fixedInterval.toString(), this.timeZone);
110+
111+
if (forceMergeMaxNumSegments == null) {
112+
forceMergeMaxNumSegments = 1;
113+
}
114+
this.forceMergeMaxNumSegments = forceMergeMaxNumSegments;
98115
}
99116

100117
public DownsampleConfig(final StreamInput in) throws IOException {
101118
fixedInterval = new DateHistogramInterval(in);
119+
if (in.getTransportVersion().onOrAfter(TransportVersions.DOWNSAMPLE_FORCE_MERGE_MAX_NUM_SEGMENTS_PARAMETER)) {
120+
forceMergeMaxNumSegments = in.readInt();
121+
} else {
122+
forceMergeMaxNumSegments = DEFAULT_MAX_NUM_SEGMENTS;
123+
}
102124
}
103125

104126
/**
@@ -135,6 +157,9 @@ public static void validateSourceAndTargetIntervals(DownsampleConfig source, Dow
135157
@Override
136158
public void writeTo(final StreamOutput out) throws IOException {
137159
fixedInterval.writeTo(out);
160+
if (out.getTransportVersion().onOrAfter(TransportVersions.DOWNSAMPLE_FORCE_MERGE_MAX_NUM_SEGMENTS_PARAMETER)) {
161+
out.writeInt(forceMergeMaxNumSegments);
162+
}
138163
}
139164

140165
/**
@@ -180,6 +205,10 @@ public Rounding.Prepared createRounding() {
180205
return createRounding(fixedInterval.toString(), timeZone);
181206
}
182207

208+
public int getForceMergeMaxNumSegments() {
209+
return forceMergeMaxNumSegments;
210+
}
211+
183212
@Override
184213
public String getWriteableName() {
185214
return NAME;
@@ -195,7 +224,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
195224
}
196225

197226
public XContentBuilder toXContentFragment(final XContentBuilder builder) throws IOException {
198-
return builder.field(FIXED_INTERVAL, fixedInterval.toString());
227+
builder.field(FIXED_INTERVAL, fixedInterval.toString());
228+
if (forceMergeMaxNumSegments != DEFAULT_MAX_NUM_SEGMENTS) {
229+
builder.field(FORCE_MERGE_MAX_NUM_SEGMENTS, forceMergeMaxNumSegments);
230+
}
231+
return builder;
199232
}
200233

201234
public static DownsampleConfig fromXContent(final XContentParser parser) throws IOException {
@@ -212,13 +245,14 @@ public boolean equals(final Object other) {
212245
}
213246
final DownsampleConfig that = (DownsampleConfig) other;
214247
return Objects.equals(fixedInterval, that.fixedInterval)
248+
&& Objects.equals(forceMergeMaxNumSegments, that.forceMergeMaxNumSegments)
215249
&& Objects.equals(intervalType, that.intervalType)
216250
&& ZoneId.of(timeZone, ZoneId.SHORT_IDS).getRules().equals(ZoneId.of(that.timeZone, ZoneId.SHORT_IDS).getRules());
217251
}
218252

219253
@Override
220254
public int hashCode() {
221-
return Objects.hash(fixedInterval, intervalType, ZoneId.of(timeZone));
255+
return Objects.hash(fixedInterval, forceMergeMaxNumSegments, intervalType, ZoneId.of(timeZone));
222256
}
223257

224258
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ public record Round(TimeValue after, DownsampleConfig config) implements Writeab
488488
private static final ConstructingObjectParser<Round, Void> PARSER = new ConstructingObjectParser<>(
489489
"downsampling_round",
490490
false,
491-
(args, unused) -> new Round((TimeValue) args[0], new DownsampleConfig((DateHistogramInterval) args[1]))
491+
(args, unused) -> new Round((TimeValue) args[0], new DownsampleConfig((DateHistogramInterval) args[1], null))
492492
);
493493

494494
static {

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1501,7 +1501,7 @@ public void testResolveLifecycle() throws Exception {
15011501
List.of(
15021502
new DataStreamLifecycle.Downsampling.Round(
15031503
TimeValue.timeValueDays(30),
1504-
new DownsampleConfig(new DateHistogramInterval("3h"))
1504+
new DownsampleConfig(new DateHistogramInterval("3h"), null)
15051505
)
15061506
)
15071507
)

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ public class DownsampleAction implements LifecycleAction {
5252
public static final TimeValue DEFAULT_WAIT_TIMEOUT = new TimeValue(1, TimeUnit.DAYS);
5353
private static final ParseField FIXED_INTERVAL_FIELD = new ParseField(DownsampleConfig.FIXED_INTERVAL);
5454
private static final ParseField WAIT_TIMEOUT_FIELD = new ParseField("wait_timeout");
55+
private static final ParseField FORCE_MERGE_MAX_NUM_SEGMENTS_FIELD = new ParseField("force_merge_max_num_segments");
5556

5657
private static final ConstructingObjectParser<DownsampleAction, Void> PARSER = new ConstructingObjectParser<>(
5758
NAME,
58-
a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1])
59+
a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1], (Integer) a[2])
5960
);
6061

6162
static {
@@ -71,29 +72,35 @@ public class DownsampleAction implements LifecycleAction {
7172
WAIT_TIMEOUT_FIELD,
7273
ObjectParser.ValueType.STRING
7374
);
75+
PARSER.declareField(optionalConstructorArg(), p -> p.intValue(), FORCE_MERGE_MAX_NUM_SEGMENTS_FIELD, ObjectParser.ValueType.INT);
7476
}
7577

7678
private final DateHistogramInterval fixedInterval;
7779
private final TimeValue waitTimeout;
80+
private final Integer forceMergeMaxNumSegments;
7881

7982
public static DownsampleAction parse(XContentParser parser) {
8083
return PARSER.apply(parser, null);
8184
}
8285

83-
public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout) {
86+
public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout, Integer forceMergeMaxNumSegments) {
8487
if (fixedInterval == null) {
8588
throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL_FIELD.getPreferredName() + "] is required.");
8689
}
8790
this.fixedInterval = fixedInterval;
8891
this.waitTimeout = waitTimeout == null ? DEFAULT_WAIT_TIMEOUT : waitTimeout;
92+
this.forceMergeMaxNumSegments = forceMergeMaxNumSegments;
8993
}
9094

9195
public DownsampleAction(StreamInput in) throws IOException {
9296
this(
9397
new DateHistogramInterval(in),
9498
in.getTransportVersion().onOrAfter(TransportVersions.V_8_10_X)
9599
? TimeValue.parseTimeValue(in.readString(), WAIT_TIMEOUT_FIELD.getPreferredName())
96-
: DEFAULT_WAIT_TIMEOUT
100+
: DEFAULT_WAIT_TIMEOUT,
101+
in.getTransportVersion().onOrAfter(TransportVersions.DOWNSAMPLE_FORCE_MERGE_MAX_NUM_SEGMENTS_PARAMETER)
102+
? in.readOptionalInt()
103+
: null
97104
);
98105
}
99106

@@ -105,13 +112,19 @@ public void writeTo(StreamOutput out) throws IOException {
105112
} else {
106113
out.writeString(DEFAULT_WAIT_TIMEOUT.getStringRep());
107114
}
115+
if (out.getTransportVersion().onOrAfter(TransportVersions.DOWNSAMPLE_FORCE_MERGE_MAX_NUM_SEGMENTS_PARAMETER)) {
116+
out.writeOptionalInt(forceMergeMaxNumSegments);
117+
}
108118
}
109119

110120
@Override
111121
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
112122
builder.startObject();
113123
builder.field(FIXED_INTERVAL_FIELD.getPreferredName(), fixedInterval.toString());
114124
builder.field(WAIT_TIMEOUT_FIELD.getPreferredName(), waitTimeout.getStringRep());
125+
if (forceMergeMaxNumSegments != null) {
126+
builder.field(FORCE_MERGE_MAX_NUM_SEGMENTS_FIELD.getPreferredName(), forceMergeMaxNumSegments);
127+
}
115128
builder.endObject();
116129
return builder;
117130
}
@@ -129,6 +142,10 @@ public TimeValue waitTimeout() {
129142
return waitTimeout;
130143
}
131144

145+
public Integer getForceMergeMaxNumSegments() {
146+
return forceMergeMaxNumSegments;
147+
}
148+
132149
@Override
133150
public boolean isSafeAction() {
134151
return false;
@@ -220,7 +237,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
220237
);
221238

222239
// Here is where the actual downsample action takes place
223-
DownsampleStep downsampleStep = new DownsampleStep(downsampleKey, waitForDownsampleIndexKey, client, fixedInterval, waitTimeout);
240+
DownsampleStep downsampleStep = new DownsampleStep(
241+
downsampleKey,
242+
waitForDownsampleIndexKey,
243+
client,
244+
fixedInterval,
245+
waitTimeout,
246+
forceMergeMaxNumSegments
247+
);
224248

225249
// Wait until the downsampled index is recovered. We again wait until the configured threshold is breached and
226250
// if the downsampled index has not successfully recovered until then, we rewind to the "cleanup-downsample-index"
@@ -305,12 +329,13 @@ public boolean equals(Object o) {
305329
if (o == null || getClass() != o.getClass()) return false;
306330

307331
DownsampleAction that = (DownsampleAction) o;
308-
return Objects.equals(this.fixedInterval, that.fixedInterval);
332+
return Objects.equals(this.fixedInterval, that.fixedInterval)
333+
&& Objects.equals(this.forceMergeMaxNumSegments, that.forceMergeMaxNumSegments);
309334
}
310335

311336
@Override
312337
public int hashCode() {
313-
return Objects.hash(fixedInterval);
338+
return Objects.hash(fixedInterval, forceMergeMaxNumSegments);
314339
}
315340

316341
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleStep.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,20 @@ public class DownsampleStep extends AsyncActionStep {
3333

3434
private final DateHistogramInterval fixedInterval;
3535
private final TimeValue waitTimeout;
36+
private final Integer forceMergeMaxNumSegments;
3637

3738
public DownsampleStep(
3839
final StepKey key,
3940
final StepKey nextStepKey,
4041
final Client client,
4142
final DateHistogramInterval fixedInterval,
42-
final TimeValue waitTimeout
43+
final TimeValue waitTimeout,
44+
final Integer forceMergeMaxNumSegments
4345
) {
4446
super(key, nextStepKey, client);
4547
this.fixedInterval = fixedInterval;
4648
this.waitTimeout = waitTimeout;
49+
this.forceMergeMaxNumSegments = forceMergeMaxNumSegments;
4750
}
4851

4952
@Override
@@ -89,7 +92,7 @@ public void performAction(
8992
}
9093

9194
void performDownsampleIndex(String indexName, String downsampleIndexName, ActionListener<Void> listener) {
92-
DownsampleConfig config = new DownsampleConfig(fixedInterval);
95+
DownsampleConfig config = new DownsampleConfig(fixedInterval, forceMergeMaxNumSegments);
9396
DownsampleAction.Request request = new DownsampleAction.Request(
9497
TimeValue.MAX_VALUE,
9598
indexName,
@@ -109,9 +112,13 @@ public TimeValue getWaitTimeout() {
109112
return waitTimeout;
110113
}
111114

115+
public Integer getForceMergeMaxNumSegments() {
116+
return forceMergeMaxNumSegments;
117+
}
118+
112119
@Override
113120
public int hashCode() {
114-
return Objects.hash(super.hashCode(), fixedInterval, waitTimeout);
121+
return Objects.hash(super.hashCode(), fixedInterval, waitTimeout, forceMergeMaxNumSegments);
115122
}
116123

117124
@Override
@@ -126,7 +133,10 @@ public boolean equals(Object obj) {
126133
return false;
127134
}
128135
DownsampleStep other = (DownsampleStep) obj;
129-
return super.equals(obj) && Objects.equals(fixedInterval, other.fixedInterval) && Objects.equals(waitTimeout, other.waitTimeout);
136+
return super.equals(obj)
137+
&& Objects.equals(fixedInterval, other.fixedInterval)
138+
&& Objects.equals(waitTimeout, other.waitTimeout)
139+
&& Objects.equals(forceMergeMaxNumSegments, other.forceMergeMaxNumSegments);
130140
}
131141

132142
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleActionConfigTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.elasticsearch.action.downsample.DownsampleConfig;
1010
import org.elasticsearch.common.io.stream.Writeable;
11-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
1211
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
1312
import org.elasticsearch.xcontent.XContentParser;
1413
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
@@ -30,7 +29,8 @@ protected DownsampleConfig mutateInstance(DownsampleConfig instance) {
3029
}
3130

3231
public static DownsampleConfig randomConfig() {
33-
return new DownsampleConfig(ConfigTestHelpers.randomInterval());
32+
Integer forceMergeMaxNumSegments = randomBoolean() ? null : randomIntBetween(-1, 128);
33+
return new DownsampleConfig(ConfigTestHelpers.randomInterval(), forceMergeMaxNumSegments);
3434
}
3535

3636
@Override
@@ -44,12 +44,13 @@ protected DownsampleConfig doParseInstance(final XContentParser parser) throws I
4444
}
4545

4646
public void testEmptyFixedInterval() {
47-
Exception e = expectThrows(IllegalArgumentException.class, () -> new DownsampleConfig((DateHistogramInterval) null));
47+
Exception e = expectThrows(IllegalArgumentException.class, () -> new DownsampleConfig(null, null));
4848
assertThat(e.getMessage(), equalTo("Parameter [fixed_interval] is required."));
4949
}
5050

5151
public void testEmptyTimezone() {
52-
DownsampleConfig config = new DownsampleConfig(ConfigTestHelpers.randomInterval());
52+
Integer forceMergeMaxNumSegments = randomBoolean() ? null : randomIntBetween(-1, 128);
53+
DownsampleConfig config = new DownsampleConfig(ConfigTestHelpers.randomInterval(), forceMergeMaxNumSegments);
5354
assertEquals("UTC", config.getTimeZone());
5455
}
5556
}

0 commit comments

Comments
 (0)