Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/135834.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135834
summary: Move force merge from the downsampling request to the ILM action and allow
users to disable it.
area: Downsampling
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Phases allowed: hot, warm, cold.

Aggregates a time series (TSDS) index and stores pre-computed statistical summaries (`min`, `max`, `sum`, and `value_count`) for each metric field grouped by a configured time interval. For example, a TSDS index that contains metrics sampled every 10 seconds can be downsampled to an hourly index. All documents within an hour interval are summarized and stored as a single document and stored in the downsample index.

This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample).
This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample).

The name of the resulting downsample index is `downsample-<original-index-name>-<random-uuid>`. If {{ilm-init}} performs the `downsample` action on a backing index for a data stream, the downsample index becomes a backing index for the same stream and the source index is deleted.

Expand All @@ -19,6 +19,8 @@ To use the `downsample` action in the `hot` phase, the `rollover` action **must*

`fixed_interval`
: (Required, string) The [fixed time interval](docs-content://manage-data/lifecycle/rollup/understanding-groups.md#rollup-understanding-group-intervals) into which the data will be downsampled.
`force_merge_index` {applies_to}`stack: ga 9.3`
: (Optional, boolean) When true, the downsampled index will be [force merged](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-forcemerge) to one [segment](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-segments). Defaults to `true`.


## Example [ilm-downsample-ex]
Expand All @@ -34,7 +36,8 @@ PUT _ilm/policy/datastream_policy
"max_docs": 1
},
"downsample": {
"fixed_interval": "1h"
"fixed_interval": "1h",
"force_merge_index": false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9196000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
inference_update_trained_model_deployment_request_source,9195000
ilm_downsample_force_merge,9196000
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -130,6 +131,7 @@ public void testAction() throws Exception {
var ilmRoundsMin = new AtomicInteger(Integer.MAX_VALUE);
var ilmRoundsMax = new AtomicInteger(Integer.MIN_VALUE);
Set<String> usedPolicies = new HashSet<>();
var forceMergeEnabled = new AtomicReference<IlmForceMergeInPolicies>();

/*
* We now add a number of simulated data streams to the cluster state. We mix different combinations of:
Expand All @@ -139,7 +141,7 @@ public void testAction() throws Exception {
*/
updateClusterState(clusterState -> {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
addIlmPolicies(metadataBuilder);
forceMergeEnabled.set(addIlmPolicies(metadataBuilder));

Map<String, DataStream> dataStreamMap = new HashMap<>();
for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(10, 100); dataStreamCount++) {
Expand Down Expand Up @@ -313,22 +315,31 @@ public void testAction() throws Exception {
ilmRoundsMin.get(),
ilmRoundsMax.get()
);
var explicitlyEnabled = new AtomicInteger(0);
var explicitlyDisabled = new AtomicInteger(0);
var undefined = new AtomicInteger(0);
Map<String, Object> phasesStats = (Map<String, Object>) ilmStats.get("phases_in_use");
if (usedPolicies.contains(DOWNSAMPLING_IN_HOT_POLICY)) {
assertThat(phasesStats.get("hot"), equalTo(1));
updateForceMergeCounters(forceMergeEnabled.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined);
} else {
assertThat(phasesStats.get("hot"), nullValue());
}
if (usedPolicies.contains(DOWNSAMPLING_IN_WARM_COLD_POLICY)) {
assertThat(phasesStats.get("warm"), equalTo(1));
updateForceMergeCounters(forceMergeEnabled.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined);
assertThat(phasesStats.get("cold"), equalTo(1));
updateForceMergeCounters(forceMergeEnabled.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined);
} else {
assertThat(phasesStats.get("warm"), nullValue());
assertThat(phasesStats.get("cold"), nullValue());
}

Map<String, Object> forceMergeStats = (Map<String, Object>) ilmStats.get("force_merge");
assertThat((int) forceMergeStats.get("explicitly_enabled_count"), equalTo(explicitlyEnabled.get()));
assertThat(forceMergeStats.get("explicitly_disabled_count"), equalTo(explicitlyDisabled.get()));
assertThat(forceMergeStats.get("undefined_count"), equalTo(undefined.get()));
assertThat(forceMergeStats.get("undefined_force_merge_needed_count"), equalTo(0));
}

}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -383,6 +394,16 @@ private void assertDownsamplingStats(
}
}

private void updateForceMergeCounters(Boolean value, AtomicInteger enabled, AtomicInteger disabled, AtomicInteger undefined) {
if (value == null) {
undefined.incrementAndGet();
} else if (value) {
enabled.incrementAndGet();
} else {
disabled.incrementAndGet();
}
}

private DataStreamLifecycle maybeCreateLifecycle(boolean isDownsampled, boolean hasDlm) {
if (hasDlm == false) {
return null;
Expand Down Expand Up @@ -462,25 +483,36 @@ private List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRounds() {
return rounds;
}

private void addIlmPolicies(Metadata.Builder metadataBuilder) {
private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) {
Boolean hotForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
Boolean warmForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
Boolean coldForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
List<LifecyclePolicy> policies = List.of(
new LifecyclePolicy(
DOWNSAMPLING_IN_HOT_POLICY,
Map.of(
"hot",
new Phase("hot", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null)))
new Phase(
"hot",
TimeValue.ZERO,
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled))
)
)
),
new LifecyclePolicy(
DOWNSAMPLING_IN_WARM_COLD_POLICY,
Map.of(
"warm",
new Phase("warm", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null))),
new Phase(
"warm",
TimeValue.ZERO,
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled))
),
"cold",
new Phase(
"cold",
TimeValue.timeValueDays(3),
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null))
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled))
)
)
),
Expand All @@ -498,8 +530,11 @@ private void addIlmPolicies(Metadata.Builder metadataBuilder) {
);
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(policyMetadata, OperationMode.RUNNING);
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, newMetadata);
return new IlmForceMergeInPolicies(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled);
}

private record IlmForceMergeInPolicies(Boolean enabledInHot, Boolean enabledInWarm, Boolean enabledInCold) {}

private static String randomIlmPolicy(DownsampledBy downsampledBy, boolean ovewrittenDlm) {
if (downsampledBy == DownsampledBy.ILM || (downsampledBy == DownsampledBy.DLM && ovewrittenDlm)) {
return randomFrom(DOWNSAMPLING_IN_HOT_POLICY, DOWNSAMPLING_IN_WARM_COLD_POLICY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsage;
import org.elasticsearch.xpack.core.ilm.DownsampleAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;

import java.util.HashMap;
import java.util.LongSummaryStatistics;
Expand Down Expand Up @@ -128,7 +132,7 @@ protected void localClusterStateOperation(
tsDataStreamCount,
tsIndexCount,
ilmStats.getDownsamplingStats(),
ilmStats.getIlmPolicyStats(),
ilmStats.calculateIlmPolicyStats(),
dlmStats.getDownsamplingStats(),
indicesByInterval
)
Expand Down Expand Up @@ -167,27 +171,79 @@ TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() {
}
}

/**
* Tracks the ILM policies currently in use by time series data streams.
*/
static class IlmDownsamplingStatsTracker extends DownsamplingStatsTracker {
private final Map<String, Map<String, Phase>> policies = new HashMap<>();

void trackPolicy(LifecyclePolicy ilmPolicy) {
policies.putIfAbsent(ilmPolicy.getName(), ilmPolicy.getPhases());
}

Map<String, Long> getIlmPolicyStats() {
/**
* Calculates ILM-policy-specific statistics that help us get a better understanding on the phases that use downsampling and on
* how the force merge step in the downsample action is used. More specifically, we are tracking for downsampling we are tracking:
* - if users explicitly enabled or disable it.
* - if the force merge could be skipped with minimal impact, when the force merge flag is undefined.
* @return a IlmPolicyStats record that contains these counters.
*/
TimeSeriesFeatureSetUsage.IlmPolicyStats calculateIlmPolicyStats() {
if (policies.isEmpty()) {
return Map.of();
return TimeSeriesFeatureSetUsage.IlmPolicyStats.EMPTY;
}
long forceMergeExplicitlyEnabledCounter = 0;
long forceMergeExplicitlyDisabledCounter = 0;
long forceMergeDefaultCounter = 0;
long downsampledForceMergeNeededCounter = 0; // Meaning it's followed by a searchable snapshot with force-merge index false
Map<String, Long> downsamplingPhases = new HashMap<>();
for (String ilmPolicy : policies.keySet()) {
for (Phase phase : policies.get(ilmPolicy).values()) {
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
Long current = downsamplingPhases.computeIfAbsent(phase.getName(), ignored -> 0L);
downsamplingPhases.put(phase.getName(), current + 1);
Map<String, Phase> phases = policies.get(ilmPolicy);
boolean downsampledForceMergeNeeded = false;
for (String phase : TimeseriesLifecycleType.ORDERED_VALID_PHASES) {
if (phases.containsKey(phase) == false) {
continue;
}
Map<String, LifecycleAction> actions = phases.get(phase).getActions();
if (actions.containsKey(DownsampleAction.NAME)) {
// count the phase used
Long current = downsamplingPhases.computeIfAbsent(phase, ignored -> 0L);
downsamplingPhases.put(phase, current + 1);
// Count force merge
DownsampleAction downsampleAction = (DownsampleAction) actions.get(DownsampleAction.NAME);
if (downsampleAction.forceMergeIndex() == null) {
forceMergeDefaultCounter++;
downsampledForceMergeNeeded = true; // this default force merge could be needed depending on the following steps
} else if (downsampleAction.forceMergeIndex()) {
forceMergeExplicitlyEnabledCounter++;
} else {
forceMergeExplicitlyDisabledCounter++;
}
}

// If there is an explicit force merge action, we could consider the downsampling force merge redundant.
if (actions.containsKey(ForceMergeAction.NAME)) {
downsampledForceMergeNeeded = false;
}
if (downsampledForceMergeNeeded && actions.containsKey(SearchableSnapshotAction.NAME)) {
SearchableSnapshotAction searchableSnapshotAction = (SearchableSnapshotAction) actions.get(
SearchableSnapshotAction.NAME
);
if (searchableSnapshotAction.isForceMergeIndex() == false) {
// If there was a searchable snapshot with force index false, then the downsample force merge has impact
downsampledForceMergeNeededCounter++;
}
downsampledForceMergeNeeded = false;
}
}
}
return downsamplingPhases;
return new TimeSeriesFeatureSetUsage.IlmPolicyStats(
downsamplingPhases,
forceMergeExplicitlyEnabledCounter,
forceMergeExplicitlyDisabledCounter,
forceMergeDefaultCounter,
downsampledForceMergeNeededCounter
);
}
}
}
Loading
Loading