Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Phases allowed: hot, warm, cold.

Aggregates a time series (TSDS) index and stores pre-computed statistical summaries (`min`, `max`, `sum`, `value_count` and `avg`) for each metric field grouped by a configured time interval. For example, a TSDS index that contains metrics sampled every 10 seconds can be downsampled to an hourly index. All documents within an hour interval are summarized and stored as a single document and stored in the downsample index.

This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample).
This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample).

The name of the resulting downsample index is `downsample-<original-index-name>-<random-uuid>`. If {{ilm-init}} performs the `downsample` action on a backing index for a data stream, the downsample index becomes a backing index for the same stream and the source index is deleted.

Expand All @@ -19,6 +19,8 @@ To use the `downsample` action in the `hot` phase, the `rollover` action **must*

`fixed_interval`
: (Required, string) The [fixed time interval](docs-content://manage-data/lifecycle/rollup/understanding-groups.md#rollup-understanding-group-intervals) into which the data will be downsampled.
`force_merge_index`
: (Optional, boolean) When true, the downsampled index will be [force merged](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-forcemerge) to one [segment](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-segments). Defaults to `true`.


## Example [ilm-downsample-ex]
Expand All @@ -34,7 +36,8 @@ PUT _ilm/policy/datastream_policy
"max_docs": 1
},
"downsample": {
"fixed_interval": "1h"
"fixed_interval": "1h",
"force_merge_index": false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9187000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_plan_with_no_columns,9186000
ilm_downsample_force_merge,9187000
Original file line number Diff line number Diff line change
Expand Up @@ -468,19 +468,27 @@ private void addIlmPolicies(Metadata.Builder metadataBuilder) {
DOWNSAMPLING_IN_HOT_POLICY,
Map.of(
"hot",
new Phase("hot", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null)))
new Phase(
"hot",
TimeValue.ZERO,
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, randomBoolean()))
)
)
),
new LifecyclePolicy(
DOWNSAMPLING_IN_WARM_COLD_POLICY,
Map.of(
"warm",
new Phase("warm", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null))),
new Phase(
"warm",
TimeValue.ZERO,
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, randomBoolean()))
),
"cold",
new Phase(
"cold",
TimeValue.timeValueDays(3),
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null))
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, randomBoolean()))
)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -30,9 +32,11 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import static org.elasticsearch.action.downsample.DownsampleConfig.generateDownsampleIndexName;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
Expand All @@ -44,19 +48,24 @@
public class DownsampleAction implements LifecycleAction {

private static final Logger logger = LogManager.getLogger(DownsampleAction.class);
private static final TransportVersion ILM_FORCE_MERGE_IN_DOWNSAMPLING = TransportVersion.fromName("ilm_downsample_force_merge");

public static final String NAME = "downsample";
public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-";
public static final String CONDITIONAL_TIME_SERIES_CHECK_KEY = BranchingStep.NAME + "-on-timeseries-check";
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
public static final TimeValue DEFAULT_WAIT_TIMEOUT = new TimeValue(1, TimeUnit.DAYS);
private static final ParseField FIXED_INTERVAL_FIELD = new ParseField(DownsampleConfig.FIXED_INTERVAL);
private static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
private static final ParseField WAIT_TIMEOUT_FIELD = new ParseField("wait_timeout");
static final String BWC_CLEANUP_TARGET_INDEX_NAME = "cleanup-target-index";
private static final BiFunction<String, LifecycleExecutionState, String> DOWNSAMPLED_INDEX_NAME_SUPPLIER = (
indexName,
lifecycleState) -> lifecycleState.downsampleIndexName();

private static final ConstructingObjectParser<DownsampleAction, Void> PARSER = new ConstructingObjectParser<>(
NAME,
a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1])
a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1], a[2] == null || (boolean) a[2])
);

static {
Expand All @@ -72,29 +81,33 @@ public class DownsampleAction implements LifecycleAction {
WAIT_TIMEOUT_FIELD,
ObjectParser.ValueType.STRING
);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
}

private final DateHistogramInterval fixedInterval;
private final TimeValue waitTimeout;
private final boolean forceMergeIndex;

public static DownsampleAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout) {
public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout, boolean forceMergeIndex) {
if (fixedInterval == null) {
throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL_FIELD.getPreferredName() + "] is required.");
}
this.fixedInterval = fixedInterval;
this.waitTimeout = waitTimeout == null ? DEFAULT_WAIT_TIMEOUT : waitTimeout;
this.forceMergeIndex = forceMergeIndex;
}

public DownsampleAction(StreamInput in) throws IOException {
this(
new DateHistogramInterval(in),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_10_X)
? TimeValue.parseTimeValue(in.readString(), WAIT_TIMEOUT_FIELD.getPreferredName())
: DEFAULT_WAIT_TIMEOUT
: DEFAULT_WAIT_TIMEOUT,
in.getTransportVersion().onOrAfter(ILM_FORCE_MERGE_IN_DOWNSAMPLING) ? in.readBoolean() : true
);
}

Expand All @@ -106,13 +119,17 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeString(DEFAULT_WAIT_TIMEOUT.getStringRep());
}
if (out.getTransportVersion().onOrAfter(ILM_FORCE_MERGE_IN_DOWNSAMPLING)) {
out.writeBoolean(forceMergeIndex);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FIXED_INTERVAL_FIELD.getPreferredName(), fixedInterval.toString());
builder.field(WAIT_TIMEOUT_FIELD.getPreferredName(), waitTimeout.getStringRep());
builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
builder.endObject();
return builder;
}
Expand All @@ -130,6 +147,10 @@ public TimeValue waitTimeout() {
return waitTimeout;
}

public boolean forceMergeIndex() {
return forceMergeIndex;
}

@Override
public boolean isSafeAction() {
return false;
Expand All @@ -146,6 +167,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey generateDownsampleIndexNameKey = new StepKey(phase, NAME, DownsamplePrepareLifeCycleStateStep.NAME);
StepKey downsampleKey = new StepKey(phase, NAME, DownsampleStep.NAME);
StepKey waitForDownsampleIndexKey = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey forceMergeKey = new StepKey(phase, NAME, ForceMergeStep.NAME);
StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME);
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey copyIndexLifecycleKey = new StepKey(phase, NAME, CopySettingsStep.NAME);
StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY);
Expand Down Expand Up @@ -230,24 +253,30 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
ClusterStateWaitUntilThresholdStep downsampleAllocatedStep = new ClusterStateWaitUntilThresholdStep(
new WaitForIndexColorStep(
waitForDownsampleIndexKey,
copyMetadataKey,
forceMergeIndex ? forceMergeKey : copyMetadataKey,
ClusterHealthStatus.YELLOW,
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName()
DOWNSAMPLED_INDEX_NAME_SUPPLIER
),
cleanupDownsampleIndexKey
);
ForceMergeStep forceMergeStep = forceMergeIndex
? new ForceMergeStep(forceMergeKey, waitForSegmentCountKey, client, 1, DOWNSAMPLED_INDEX_NAME_SUPPLIER)
: null;
SegmentCountStep segmentCountStep = forceMergeIndex
? new SegmentCountStep(waitForSegmentCountKey, copyMetadataKey, client, 1, DOWNSAMPLED_INDEX_NAME_SUPPLIER)
: null;

CopyExecutionStateStep copyExecutionStateStep = new CopyExecutionStateStep(
copyMetadataKey,
copyIndexLifecycleKey,
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName(),
DOWNSAMPLED_INDEX_NAME_SUPPLIER,
nextStepKey
);

CopySettingsStep copyLifecycleSettingsStep = new CopySettingsStep(
copyIndexLifecycleKey,
dataStreamCheckBranchingKey,
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName(),
DOWNSAMPLED_INDEX_NAME_SUPPLIER,
LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey()
);

Expand All @@ -269,35 +298,39 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(
replaceDataStreamIndexKey,
deleteIndexKey,
(sourceIndexName, lifecycleState) -> lifecycleState.downsampleIndexName()
DOWNSAMPLED_INDEX_NAME_SUPPLIER
);
DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, nextStepKey, client);

SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep(
swapAliasesKey,
nextStepKey,
client,
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName(),
DOWNSAMPLED_INDEX_NAME_SUPPLIER,
false
);

return List.of(
isTimeSeriesIndexBranchingStep,
checkNotWriteIndexStep,
waitForNoFollowersStep,
waitUntilTimeSeriesEndTimeStep,
readOnlyStep,
cleanupDownsampleIndexStep,
generateDownsampleIndexNameStep,
downsampleStep,
downsampleAllocatedStep,
copyExecutionStateStep,
copyLifecycleSettingsStep,
isDataStreamBranchingStep,
replaceDataStreamBackingIndex,
deleteSourceIndexStep,
swapAliasesAndDeleteSourceIndexStep
);
List<Step> steps = new ArrayList<>(17);
steps.add(isTimeSeriesIndexBranchingStep);
steps.add(checkNotWriteIndexStep);
steps.add(waitForNoFollowersStep);
steps.add(waitUntilTimeSeriesEndTimeStep);
steps.add(readOnlyStep);
steps.add(cleanupDownsampleIndexStep);
steps.add(generateDownsampleIndexNameStep);
steps.add(downsampleStep);
steps.add(downsampleAllocatedStep);
if (forceMergeIndex) {
steps.add(forceMergeStep);
steps.add(segmentCountStep);
}
steps.add(copyExecutionStateStep);
steps.add(copyLifecycleSettingsStep);
steps.add(isDataStreamBranchingStep);
steps.add(replaceDataStreamBackingIndex);
steps.add(deleteSourceIndexStep);
steps.add(swapAliasesAndDeleteSourceIndexStep);
return steps;
}

@Override
Expand All @@ -306,12 +339,14 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;

DownsampleAction that = (DownsampleAction) o;
return Objects.equals(this.fixedInterval, that.fixedInterval);
return Objects.equals(this.fixedInterval, that.fixedInterval)
&& Objects.equals(this.waitTimeout, that.waitTimeout)
&& this.forceMergeIndex == that.forceMergeIndex;
}

@Override
public int hashCode() {
return Objects.hash(fixedInterval);
return Objects.hash(fixedInterval, waitTimeout, forceMergeIndex);
}

@Override
Expand Down
Loading
Loading