Skip to content

Commit b0ef2b4

Browse files
authored
Move force merge from the downsmapling request to the ILM action (#135834)
In this PR we move the force-merge operation from the downsampling request to the ILM action. Our goal is to decouple the downsampling operation from the force-merge operation. With this change the downsampling request is responsible to ensure that the downsampled index is refreshed and flushed but not to force merge it. We believe that most of the time this is not necessary, and executing the force-merge operation unnecessarily can increase the load on the cluster. To preserve backwards compatibility we move the responsibility to execute the existing force merge to the downsample ILM action and we make it configurable. By default, it will run but a user can disable it just as they can with a searchable snapshot. ``` "downsample": { "fixed_interval": "1h", "force_merge_index": false } ``` **Update** As a follow up of this PR, we pose the question is the force merge in the downsample action intentional and useful? To answer this question, we extend time series telemetry. We define that the force merge step in the downsample ILM action is useful, if this is the only force merge step operation before a searchable snapshot. Effectively, by this definition, we argue that the force merge in downsampling is not an intentional operation the user has requested but only the result of the implementation. We identify the biggest impact of removing it to be a searchable snapshot, but if the searchable snapshot performs its own force merge (and more performant force merge #133954) then we could skip this operation in the downsample action altogether. Fixes: #135618
1 parent 167a731 commit b0ef2b4

File tree

18 files changed

+584
-254
lines changed

18 files changed

+584
-254
lines changed

docs/changelog/135834.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135834
2+
summary: Move force merge from the downsampling request to the ILM action and allow
3+
users to disable it.
4+
area: Downsampling
5+
type: enhancement
6+
issues: []

docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Phases allowed: hot, warm, cold.
99

1010
Aggregates a time series (TSDS) index and stores pre-computed statistical summaries (`min`, `max`, `sum`, and `value_count`) for each metric field grouped by a configured time interval. For example, a TSDS index that contains metrics sampled every 10 seconds can be downsampled to an hourly index. All documents within an hour interval are summarized and stored as a single document and stored in the downsample index.
1111

12-
This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample).
12+
This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample).
1313

1414
The name of the resulting downsample index is `downsample-<original-index-name>-<random-uuid>`. If {{ilm-init}} performs the `downsample` action on a backing index for a data stream, the downsample index becomes a backing index for the same stream and the source index is deleted.
1515

@@ -19,6 +19,8 @@ To use the `downsample` action in the `hot` phase, the `rollover` action **must*
1919

2020
`fixed_interval`
2121
: (Required, string) The [fixed time interval](docs-content://manage-data/lifecycle/rollup/understanding-groups.md#rollup-understanding-group-intervals) into which the data will be downsampled.
22+
`force_merge_index` {applies_to}`stack: ga 9.3`
23+
: (Optional, boolean) When true, the downsampled index will be [force merged](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-forcemerge) to one [segment](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-segments). Defaults to `true`.
2224

2325

2426
## Example [ilm-downsample-ex]
@@ -34,7 +36,8 @@ PUT _ilm/policy/datastream_policy
3436
"max_docs": 1
3537
},
3638
"downsample": {
37-
"fixed_interval": "1h"
39+
"fixed_interval": "1h",
40+
"force_merge_index": false
3841
}
3942
}
4043
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9196000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
inference_update_trained_model_deployment_request_source,9195000
1+
ilm_downsample_force_merge,9196000

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.Map;
5858
import java.util.Set;
5959
import java.util.concurrent.atomic.AtomicInteger;
60+
import java.util.concurrent.atomic.AtomicReference;
6061
import java.util.function.Function;
6162
import java.util.stream.Collectors;
6263

@@ -130,6 +131,7 @@ public void testAction() throws Exception {
130131
var ilmRoundsMin = new AtomicInteger(Integer.MAX_VALUE);
131132
var ilmRoundsMax = new AtomicInteger(Integer.MIN_VALUE);
132133
Set<String> usedPolicies = new HashSet<>();
134+
var forceMergeEnabled = new AtomicReference<IlmForceMergeInPolicies>();
133135

134136
/*
135137
* We now add a number of simulated data streams to the cluster state. We mix different combinations of:
@@ -139,7 +141,7 @@ public void testAction() throws Exception {
139141
*/
140142
updateClusterState(clusterState -> {
141143
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
142-
addIlmPolicies(metadataBuilder);
144+
forceMergeEnabled.set(addIlmPolicies(metadataBuilder));
143145

144146
Map<String, DataStream> dataStreamMap = new HashMap<>();
145147
for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(10, 100); dataStreamCount++) {
@@ -313,22 +315,31 @@ public void testAction() throws Exception {
313315
ilmRoundsMin.get(),
314316
ilmRoundsMax.get()
315317
);
318+
var explicitlyEnabled = new AtomicInteger(0);
319+
var explicitlyDisabled = new AtomicInteger(0);
320+
var undefined = new AtomicInteger(0);
316321
Map<String, Object> phasesStats = (Map<String, Object>) ilmStats.get("phases_in_use");
317322
if (usedPolicies.contains(DOWNSAMPLING_IN_HOT_POLICY)) {
318323
assertThat(phasesStats.get("hot"), equalTo(1));
324+
updateForceMergeCounters(forceMergeEnabled.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined);
319325
} else {
320326
assertThat(phasesStats.get("hot"), nullValue());
321327
}
322328
if (usedPolicies.contains(DOWNSAMPLING_IN_WARM_COLD_POLICY)) {
323329
assertThat(phasesStats.get("warm"), equalTo(1));
330+
updateForceMergeCounters(forceMergeEnabled.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined);
324331
assertThat(phasesStats.get("cold"), equalTo(1));
332+
updateForceMergeCounters(forceMergeEnabled.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined);
325333
} else {
326334
assertThat(phasesStats.get("warm"), nullValue());
327335
assertThat(phasesStats.get("cold"), nullValue());
328336
}
329-
337+
Map<String, Object> forceMergeStats = (Map<String, Object>) ilmStats.get("force_merge");
338+
assertThat((int) forceMergeStats.get("explicitly_enabled_count"), equalTo(explicitlyEnabled.get()));
339+
assertThat(forceMergeStats.get("explicitly_disabled_count"), equalTo(explicitlyDisabled.get()));
340+
assertThat(forceMergeStats.get("undefined_count"), equalTo(undefined.get()));
341+
assertThat(forceMergeStats.get("undefined_force_merge_needed_count"), equalTo(0));
330342
}
331-
332343
}
333344

334345
@SuppressWarnings("unchecked")
@@ -383,6 +394,16 @@ private void assertDownsamplingStats(
383394
}
384395
}
385396

397+
private void updateForceMergeCounters(Boolean value, AtomicInteger enabled, AtomicInteger disabled, AtomicInteger undefined) {
398+
if (value == null) {
399+
undefined.incrementAndGet();
400+
} else if (value) {
401+
enabled.incrementAndGet();
402+
} else {
403+
disabled.incrementAndGet();
404+
}
405+
}
406+
386407
private DataStreamLifecycle maybeCreateLifecycle(boolean isDownsampled, boolean hasDlm) {
387408
if (hasDlm == false) {
388409
return null;
@@ -462,25 +483,36 @@ private List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRounds() {
462483
return rounds;
463484
}
464485

465-
private void addIlmPolicies(Metadata.Builder metadataBuilder) {
486+
private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) {
487+
Boolean hotForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
488+
Boolean warmForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
489+
Boolean coldForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
466490
List<LifecyclePolicy> policies = List.of(
467491
new LifecyclePolicy(
468492
DOWNSAMPLING_IN_HOT_POLICY,
469493
Map.of(
470494
"hot",
471-
new Phase("hot", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null)))
495+
new Phase(
496+
"hot",
497+
TimeValue.ZERO,
498+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled))
499+
)
472500
)
473501
),
474502
new LifecyclePolicy(
475503
DOWNSAMPLING_IN_WARM_COLD_POLICY,
476504
Map.of(
477505
"warm",
478-
new Phase("warm", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null))),
506+
new Phase(
507+
"warm",
508+
TimeValue.ZERO,
509+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled))
510+
),
479511
"cold",
480512
new Phase(
481513
"cold",
482514
TimeValue.timeValueDays(3),
483-
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null))
515+
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled))
484516
)
485517
)
486518
),
@@ -498,8 +530,11 @@ private void addIlmPolicies(Metadata.Builder metadataBuilder) {
498530
);
499531
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(policyMetadata, OperationMode.RUNNING);
500532
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, newMetadata);
533+
return new IlmForceMergeInPolicies(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled);
501534
}
502535

536+
private record IlmForceMergeInPolicies(Boolean enabledInHot, Boolean enabledInWarm, Boolean enabledInCold) {}
537+
503538
private static String randomIlmPolicy(DownsampledBy downsampledBy, boolean ovewrittenDlm) {
504539
if (downsampledBy == DownsampledBy.ILM || (downsampledBy == DownsampledBy.DLM && ovewrittenDlm)) {
505540
return randomFrom(DOWNSAMPLING_IN_HOT_POLICY, DOWNSAMPLING_IN_WARM_COLD_POLICY);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@
2525
import org.elasticsearch.transport.TransportService;
2626
import org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsage;
2727
import org.elasticsearch.xpack.core.ilm.DownsampleAction;
28+
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
2829
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
30+
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
2931
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
3032
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
3133
import org.elasticsearch.xpack.core.ilm.Phase;
34+
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
35+
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
3236

3337
import java.util.HashMap;
3438
import java.util.LongSummaryStatistics;
@@ -128,7 +132,7 @@ protected void localClusterStateOperation(
128132
tsDataStreamCount,
129133
tsIndexCount,
130134
ilmStats.getDownsamplingStats(),
131-
ilmStats.getIlmPolicyStats(),
135+
ilmStats.calculateIlmPolicyStats(),
132136
dlmStats.getDownsamplingStats(),
133137
indicesByInterval
134138
)
@@ -167,27 +171,79 @@ TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() {
167171
}
168172
}
169173

174+
/**
175+
* Tracks the ILM policies currently in use by time series data streams.
176+
*/
170177
static class IlmDownsamplingStatsTracker extends DownsamplingStatsTracker {
171178
private final Map<String, Map<String, Phase>> policies = new HashMap<>();
172179

173180
void trackPolicy(LifecyclePolicy ilmPolicy) {
174181
policies.putIfAbsent(ilmPolicy.getName(), ilmPolicy.getPhases());
175182
}
176183

177-
Map<String, Long> getIlmPolicyStats() {
184+
/**
185+
* Calculates ILM-policy-specific statistics that help us get a better understanding on the phases that use downsampling and on
186+
* how the force merge step in the downsample action is used. More specifically, for downsampling we are tracking:
187+
* - if users explicitly enabled or disabled force-merge after downsampling.
188+
* - if the force merge could be skipped with minimal impact, when the force merge flag is undefined.
189+
* @return a IlmPolicyStats record that contains these counters.
190+
*/
191+
TimeSeriesFeatureSetUsage.IlmPolicyStats calculateIlmPolicyStats() {
178192
if (policies.isEmpty()) {
179-
return Map.of();
193+
return TimeSeriesFeatureSetUsage.IlmPolicyStats.EMPTY;
180194
}
195+
long forceMergeExplicitlyEnabledCounter = 0;
196+
long forceMergeExplicitlyDisabledCounter = 0;
197+
long forceMergeDefaultCounter = 0;
198+
long downsampledForceMergeNeededCounter = 0; // Meaning it's followed by a searchable snapshot with force-merge index false
181199
Map<String, Long> downsamplingPhases = new HashMap<>();
182200
for (String ilmPolicy : policies.keySet()) {
183-
for (Phase phase : policies.get(ilmPolicy).values()) {
184-
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
185-
Long current = downsamplingPhases.computeIfAbsent(phase.getName(), ignored -> 0L);
186-
downsamplingPhases.put(phase.getName(), current + 1);
201+
Map<String, Phase> phases = policies.get(ilmPolicy);
202+
boolean downsampledForceMergeNeeded = false;
203+
for (String phase : TimeseriesLifecycleType.ORDERED_VALID_PHASES) {
204+
if (phases.containsKey(phase) == false) {
205+
continue;
206+
}
207+
Map<String, LifecycleAction> actions = phases.get(phase).getActions();
208+
if (actions.containsKey(DownsampleAction.NAME)) {
209+
// count the phase used
210+
Long current = downsamplingPhases.computeIfAbsent(phase, ignored -> 0L);
211+
downsamplingPhases.put(phase, current + 1);
212+
// Count force merge
213+
DownsampleAction downsampleAction = (DownsampleAction) actions.get(DownsampleAction.NAME);
214+
if (downsampleAction.forceMergeIndex() == null) {
215+
forceMergeDefaultCounter++;
216+
downsampledForceMergeNeeded = true; // this default force merge could be needed depending on the following steps
217+
} else if (downsampleAction.forceMergeIndex()) {
218+
forceMergeExplicitlyEnabledCounter++;
219+
} else {
220+
forceMergeExplicitlyDisabledCounter++;
221+
}
222+
}
223+
224+
// If there is an explicit force merge action, we could consider the downsampling force merge redundant.
225+
if (actions.containsKey(ForceMergeAction.NAME)) {
226+
downsampledForceMergeNeeded = false;
227+
}
228+
if (downsampledForceMergeNeeded && actions.containsKey(SearchableSnapshotAction.NAME)) {
229+
SearchableSnapshotAction searchableSnapshotAction = (SearchableSnapshotAction) actions.get(
230+
SearchableSnapshotAction.NAME
231+
);
232+
if (searchableSnapshotAction.isForceMergeIndex() == false) {
233+
// If there was a searchable snapshot with force index false, then the downsample force merge has impact
234+
downsampledForceMergeNeededCounter++;
235+
}
236+
downsampledForceMergeNeeded = false;
187237
}
188238
}
189239
}
190-
return downsamplingPhases;
240+
return new TimeSeriesFeatureSetUsage.IlmPolicyStats(
241+
downsamplingPhases,
242+
forceMergeExplicitlyEnabledCounter,
243+
forceMergeExplicitlyDisabledCounter,
244+
forceMergeDefaultCounter,
245+
downsampledForceMergeNeededCounter
246+
);
191247
}
192248
}
193249
}

0 commit comments

Comments
 (0)