diff --git a/docs/changelog/135834.yaml b/docs/changelog/135834.yaml new file mode 100644 index 0000000000000..235453865303f --- /dev/null +++ b/docs/changelog/135834.yaml @@ -0,0 +1,6 @@ +pr: 135834 +summary: Move force merge from the downsampling request to the ILM action and allow + users to disable it. +area: Downsampling +type: enhancement +issues: [] diff --git a/docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample.md b/docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample.md index 0d9f48dea72b4..bf9e121250603 100644 --- a/docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample.md +++ b/docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample.md @@ -9,7 +9,7 @@ Phases allowed: hot, warm, cold. Aggregates a time series (TSDS) index and stores pre-computed statistical summaries (`min`, `max`, `sum`, and `value_count`) for each metric field grouped by a configured time interval. For example, a TSDS index that contains metrics sampled every 10 seconds can be downsampled to an hourly index. All documents within an hour interval are summarized and stored as a single document and stored in the downsample index. -This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample). +This action corresponds to the [downsample API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-downsample). The name of the resulting downsample index is `downsample--`. If {{ilm-init}} performs the `downsample` action on a backing index for a data stream, the downsample index becomes a backing index for the same stream and the source index is deleted. @@ -19,6 +19,8 @@ To use the `downsample` action in the `hot` phase, the `rollover` action **must* `fixed_interval` : (Required, string) The [fixed time interval](docs-content://manage-data/lifecycle/rollup/understanding-groups.md#rollup-understanding-group-intervals) into which the data will be downsampled. +`force_merge_index` {applies_to}`stack: ga 9.3` +: (Optional, boolean) When true, the downsampled index will be [force merged](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-forcemerge) to one [segment](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-segments). Defaults to `true`. ## Example [ilm-downsample-ex] @@ -34,7 +36,8 @@ PUT _ilm/policy/datastream_policy "max_docs": 1 }, "downsample": { - "fixed_interval": "1h" + "fixed_interval": "1h", + "force_merge_index": false } } } diff --git a/server/src/main/resources/transport/definitions/referable/ilm_downsample_force_merge.csv b/server/src/main/resources/transport/definitions/referable/ilm_downsample_force_merge.csv new file mode 100644 index 0000000000000..7cff1963a0863 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/ilm_downsample_force_merge.csv @@ -0,0 +1 @@ +9196000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 28cfb724b2bf0..af0ee6ebf047e 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -inference_update_trained_model_deployment_request_source,9195000 +ilm_downsample_force_merge,9196000 diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java index 466740398233b..8630accb5c3a7 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -130,6 +131,7 @@ public void testAction() throws Exception { var ilmRoundsMin = new AtomicInteger(Integer.MAX_VALUE); var ilmRoundsMax = new AtomicInteger(Integer.MIN_VALUE); Set usedPolicies = new HashSet<>(); + var forceMergeEnabled = new AtomicReference(); /* * We now add a number of simulated data streams to the cluster state. We mix different combinations of: @@ -139,7 +141,7 @@ public void testAction() throws Exception { */ updateClusterState(clusterState -> { Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); - addIlmPolicies(metadataBuilder); + forceMergeEnabled.set(addIlmPolicies(metadataBuilder)); Map dataStreamMap = new HashMap<>(); for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(10, 100); dataStreamCount++) { @@ -313,22 +315,31 @@ public void testAction() throws Exception { ilmRoundsMin.get(), ilmRoundsMax.get() ); + var explicitlyEnabled = new AtomicInteger(0); + var explicitlyDisabled = new AtomicInteger(0); + var undefined = new AtomicInteger(0); Map phasesStats = (Map) ilmStats.get("phases_in_use"); if (usedPolicies.contains(DOWNSAMPLING_IN_HOT_POLICY)) { assertThat(phasesStats.get("hot"), equalTo(1)); + updateForceMergeCounters(forceMergeEnabled.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined); } else { assertThat(phasesStats.get("hot"), nullValue()); } if (usedPolicies.contains(DOWNSAMPLING_IN_WARM_COLD_POLICY)) { assertThat(phasesStats.get("warm"), equalTo(1)); + updateForceMergeCounters(forceMergeEnabled.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined); assertThat(phasesStats.get("cold"), equalTo(1)); + updateForceMergeCounters(forceMergeEnabled.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined); } else { assertThat(phasesStats.get("warm"), nullValue()); assertThat(phasesStats.get("cold"), nullValue()); } - + Map forceMergeStats = (Map) ilmStats.get("force_merge"); + assertThat((int) forceMergeStats.get("explicitly_enabled_count"), equalTo(explicitlyEnabled.get())); + assertThat(forceMergeStats.get("explicitly_disabled_count"), equalTo(explicitlyDisabled.get())); + assertThat(forceMergeStats.get("undefined_count"), equalTo(undefined.get())); + assertThat(forceMergeStats.get("undefined_force_merge_needed_count"), equalTo(0)); } - } @SuppressWarnings("unchecked") @@ -383,6 +394,16 @@ private void assertDownsamplingStats( } } + private void updateForceMergeCounters(Boolean value, AtomicInteger enabled, AtomicInteger disabled, AtomicInteger undefined) { + if (value == null) { + undefined.incrementAndGet(); + } else if (value) { + enabled.incrementAndGet(); + } else { + disabled.incrementAndGet(); + } + } + private DataStreamLifecycle maybeCreateLifecycle(boolean isDownsampled, boolean hasDlm) { if (hasDlm == false) { return null; @@ -462,25 +483,36 @@ private List randomDownsamplingRounds() { return rounds; } - private void addIlmPolicies(Metadata.Builder metadataBuilder) { + private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) { + Boolean hotForceMergeEnabled = randomBoolean() ? randomBoolean() : null; + Boolean warmForceMergeEnabled = randomBoolean() ? randomBoolean() : null; + Boolean coldForceMergeEnabled = randomBoolean() ? randomBoolean() : null; List policies = List.of( new LifecyclePolicy( DOWNSAMPLING_IN_HOT_POLICY, Map.of( "hot", - new Phase("hot", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null))) + new Phase( + "hot", + TimeValue.ZERO, + Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled)) + ) ) ), new LifecyclePolicy( DOWNSAMPLING_IN_WARM_COLD_POLICY, Map.of( "warm", - new Phase("warm", TimeValue.ZERO, Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null))), + new Phase( + "warm", + TimeValue.ZERO, + Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled)) + ), "cold", new Phase( "cold", TimeValue.timeValueDays(3), - Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null)) + Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled)) ) ) ), @@ -498,8 +530,11 @@ private void addIlmPolicies(Metadata.Builder metadataBuilder) { ); IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(policyMetadata, OperationMode.RUNNING); metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, newMetadata); + return new IlmForceMergeInPolicies(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled); } + private record IlmForceMergeInPolicies(Boolean enabledInHot, Boolean enabledInWarm, Boolean enabledInCold) {} + private static String randomIlmPolicy(DownsampledBy downsampledBy, boolean ovewrittenDlm) { if (downsampledBy == DownsampledBy.ILM || (downsampledBy == DownsampledBy.DLM && ovewrittenDlm)) { return randomFrom(DOWNSAMPLING_IN_HOT_POLICY, DOWNSAMPLING_IN_WARM_COLD_POLICY); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java index ec812f30bc6a7..77245a396e5db 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java @@ -25,10 +25,14 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsage; import org.elasticsearch.xpack.core.ilm.DownsampleAction; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction; +import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import java.util.HashMap; import java.util.LongSummaryStatistics; @@ -128,7 +132,7 @@ protected void localClusterStateOperation( tsDataStreamCount, tsIndexCount, ilmStats.getDownsamplingStats(), - ilmStats.getIlmPolicyStats(), + ilmStats.calculateIlmPolicyStats(), dlmStats.getDownsamplingStats(), indicesByInterval ) @@ -167,6 +171,9 @@ TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() { } } + /** + * Tracks the ILM policies currently in use by time series data streams. + */ static class IlmDownsamplingStatsTracker extends DownsamplingStatsTracker { private final Map> policies = new HashMap<>(); @@ -174,20 +181,69 @@ void trackPolicy(LifecyclePolicy ilmPolicy) { policies.putIfAbsent(ilmPolicy.getName(), ilmPolicy.getPhases()); } - Map getIlmPolicyStats() { + /** + * Calculates ILM-policy-specific statistics that help us get a better understanding on the phases that use downsampling and on + * how the force merge step in the downsample action is used. More specifically, for downsampling we are tracking: + * - if users explicitly enabled or disabled force-merge after downsampling. + * - if the force merge could be skipped with minimal impact, when the force merge flag is undefined. + * @return a IlmPolicyStats record that contains these counters. + */ + TimeSeriesFeatureSetUsage.IlmPolicyStats calculateIlmPolicyStats() { if (policies.isEmpty()) { - return Map.of(); + return TimeSeriesFeatureSetUsage.IlmPolicyStats.EMPTY; } + long forceMergeExplicitlyEnabledCounter = 0; + long forceMergeExplicitlyDisabledCounter = 0; + long forceMergeDefaultCounter = 0; + long downsampledForceMergeNeededCounter = 0; // Meaning it's followed by a searchable snapshot with force-merge index false Map downsamplingPhases = new HashMap<>(); for (String ilmPolicy : policies.keySet()) { - for (Phase phase : policies.get(ilmPolicy).values()) { - if (phase.getActions().containsKey(DownsampleAction.NAME)) { - Long current = downsamplingPhases.computeIfAbsent(phase.getName(), ignored -> 0L); - downsamplingPhases.put(phase.getName(), current + 1); + Map phases = policies.get(ilmPolicy); + boolean downsampledForceMergeNeeded = false; + for (String phase : TimeseriesLifecycleType.ORDERED_VALID_PHASES) { + if (phases.containsKey(phase) == false) { + continue; + } + Map actions = phases.get(phase).getActions(); + if (actions.containsKey(DownsampleAction.NAME)) { + // count the phase used + Long current = downsamplingPhases.computeIfAbsent(phase, ignored -> 0L); + downsamplingPhases.put(phase, current + 1); + // Count force merge + DownsampleAction downsampleAction = (DownsampleAction) actions.get(DownsampleAction.NAME); + if (downsampleAction.forceMergeIndex() == null) { + forceMergeDefaultCounter++; + downsampledForceMergeNeeded = true; // this default force merge could be needed depending on the following steps + } else if (downsampleAction.forceMergeIndex()) { + forceMergeExplicitlyEnabledCounter++; + } else { + forceMergeExplicitlyDisabledCounter++; + } + } + + // If there is an explicit force merge action, we could consider the downsampling force merge redundant. + if (actions.containsKey(ForceMergeAction.NAME)) { + downsampledForceMergeNeeded = false; + } + if (downsampledForceMergeNeeded && actions.containsKey(SearchableSnapshotAction.NAME)) { + SearchableSnapshotAction searchableSnapshotAction = (SearchableSnapshotAction) actions.get( + SearchableSnapshotAction.NAME + ); + if (searchableSnapshotAction.isForceMergeIndex() == false) { + // If there was a searchable snapshot with force index false, then the downsample force merge has impact + downsampledForceMergeNeededCounter++; + } + downsampledForceMergeNeeded = false; } } } - return downsamplingPhases; + return new TimeSeriesFeatureSetUsage.IlmPolicyStats( + downsamplingPhases, + forceMergeExplicitlyEnabledCounter, + forceMergeExplicitlyDisabledCounter, + forceMergeDefaultCounter, + downsampledForceMergeNeededCounter + ); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java index 2b0f75f742c92..df56dcf66610f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java @@ -17,6 +17,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.XPackFeatureUsage; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.ilm.DownsampleAction; import java.io.IOException; import java.util.Map; @@ -105,7 +106,7 @@ public TimeSeriesFeatureSetUsage( long timeSeriesDataStreamCount, long timeSeriesIndexCount, DownsamplingFeatureStats ilmDownsamplingStats, - Map phasesUsedInDownsampling, + IlmPolicyStats ilmPolicyStats, DownsamplingFeatureStats dlmDownsamplingStats, Map indexCountPerInterval ) { @@ -118,7 +119,7 @@ public TimeSeriesFeatureSetUsage( this.timeSeriesIndexCount = timeSeriesIndexCount; this.downsamplingUsage = new DownsamplingUsage( ilmDownsamplingStats, - phasesUsedInDownsampling, + ilmPolicyStats, dlmDownsamplingStats, indexCountPerInterval ); @@ -192,26 +193,24 @@ public boolean equals(Object obj) { public record DownsamplingUsage( DownsamplingFeatureStats ilmDownsamplingStats, - Map phasesUsedInDownsampling, + IlmPolicyStats ilmPolicyStats, DownsamplingFeatureStats dlmDownsamplingStats, Map indexCountPerInterval ) implements Writeable, ToXContentObject { public static DownsamplingUsage read(StreamInput in) throws IOException { DownsamplingFeatureStats ilmDownsamplingStats = in.readOptionalWriteable(DownsamplingFeatureStats::read); - Map phasesUsedInDownsampling = ilmDownsamplingStats != null - ? in.readImmutableMap(StreamInput::readString, StreamInput::readVLong) - : null; + IlmPolicyStats ilmPolicyStats = ilmDownsamplingStats != null ? IlmPolicyStats.read(in) : null; DownsamplingFeatureStats dlmDownsamplingStats = DownsamplingFeatureStats.read(in); Map indexCountPerInterval = in.readImmutableMap(StreamInput::readString, StreamInput::readVLong); - return new DownsamplingUsage(ilmDownsamplingStats, phasesUsedInDownsampling, dlmDownsamplingStats, indexCountPerInterval); + return new DownsamplingUsage(ilmDownsamplingStats, ilmPolicyStats, dlmDownsamplingStats, indexCountPerInterval); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(ilmDownsamplingStats); if (ilmDownsamplingStats != null) { - out.writeMap(phasesUsedInDownsampling, StreamOutput::writeString, StreamOutput::writeVLong); + ilmPolicyStats.writeTo(out); } dlmDownsamplingStats.writeTo(out); out.writeMap(indexCountPerInterval, StreamOutput::writeString, StreamOutput::writeVLong); @@ -230,7 +229,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (ilmDownsamplingStats != null) { builder.startObject("ilm"); ilmDownsamplingStats.toXContent(builder, params); - builder.field("phases_in_use", phasesUsedInDownsampling); + ilmPolicyStats.toXContent(builder, params); builder.endObject(); } if (dlmDownsamplingStats != null) { @@ -283,4 +282,66 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } + + /** + * Calculates statistics specific to the ILM policies in use + * @param downsamplingPhases the phases used for downsampling + * @param forceMergeExplicitlyEnabledCounter the policies that have force merge explicitly enabled + * @param forceMergeExplicitlyDisabledCounter the policies that have force merge implicitly enabled + * @param forceMergeDefaultCounter the policies that have not specified force merge + * @param downsampledForceMergeNeededCounter the policies that could potentially skip the force merge in downsampling + */ + public record IlmPolicyStats( + Map downsamplingPhases, + long forceMergeExplicitlyEnabledCounter, + long forceMergeExplicitlyDisabledCounter, + long forceMergeDefaultCounter, + long downsampledForceMergeNeededCounter + ) implements Writeable, ToXContentFragment { + public static final IlmPolicyStats EMPTY = new IlmPolicyStats(Map.of(), 0L, 0L, 0L, 0L); + + static IlmPolicyStats read(StreamInput in) throws IOException { + Map downsamplingPhases = in.readImmutableMap(StreamInput::readString, StreamInput::readVLong); + long forceMergeExplicitlyEnabledCounter = 0; + long forceMergeExplicitlyDisabledCounter = 0; + long forceMergeDefaultCounter = 0; + long downsampledForceMergeNeededCounter = 0; + if (in.getTransportVersion().supports(DownsampleAction.ILM_FORCE_MERGE_IN_DOWNSAMPLING)) { + forceMergeExplicitlyEnabledCounter = in.readVLong(); + forceMergeExplicitlyDisabledCounter = in.readVLong(); + forceMergeDefaultCounter = in.readVLong(); + downsampledForceMergeNeededCounter = in.readVLong(); + } + return new IlmPolicyStats( + downsamplingPhases, + forceMergeExplicitlyEnabledCounter, + forceMergeExplicitlyDisabledCounter, + forceMergeDefaultCounter, + downsampledForceMergeNeededCounter + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(downsamplingPhases, StreamOutput::writeString, StreamOutput::writeVLong); + if (out.getTransportVersion().supports(DownsampleAction.ILM_FORCE_MERGE_IN_DOWNSAMPLING)) { + out.writeVLong(forceMergeExplicitlyEnabledCounter); + out.writeVLong(forceMergeExplicitlyDisabledCounter); + out.writeVLong(forceMergeDefaultCounter); + out.writeVLong(downsampledForceMergeNeededCounter); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("phases_in_use", downsamplingPhases); + builder.startObject("force_merge"); + builder.field("explicitly_enabled_count", forceMergeExplicitlyEnabledCounter); + builder.field("explicitly_disabled_count", forceMergeExplicitlyDisabledCounter); + builder.field("undefined_count", forceMergeDefaultCounter); + builder.field("undefined_force_merge_needed_count", downsampledForceMergeNeededCounter); + builder.endObject(); + return null; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java index 1cde2b3101f10..5ee56b0a44c83 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java @@ -8,12 +8,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +35,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Stream; import static org.elasticsearch.action.downsample.DownsampleConfig.generateDownsampleIndexName; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; @@ -44,6 +48,7 @@ public class DownsampleAction implements LifecycleAction { private static final Logger logger = LogManager.getLogger(DownsampleAction.class); + public static final TransportVersion ILM_FORCE_MERGE_IN_DOWNSAMPLING = TransportVersion.fromName("ilm_downsample_force_merge"); public static final String NAME = "downsample"; public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-"; @@ -51,12 +56,16 @@ public class DownsampleAction implements LifecycleAction { public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; public static final TimeValue DEFAULT_WAIT_TIMEOUT = new TimeValue(1, TimeUnit.DAYS); private static final ParseField FIXED_INTERVAL_FIELD = new ParseField(DownsampleConfig.FIXED_INTERVAL); + private static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index"); private static final ParseField WAIT_TIMEOUT_FIELD = new ParseField("wait_timeout"); static final String BWC_CLEANUP_TARGET_INDEX_NAME = "cleanup-target-index"; + private static final BiFunction DOWNSAMPLED_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> lifecycleState.downsampleIndexName(); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, - a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1]) + a -> new DownsampleAction((DateHistogramInterval) a[0], (TimeValue) a[1], (Boolean) a[2]) ); static { @@ -72,21 +81,24 @@ public class DownsampleAction implements LifecycleAction { WAIT_TIMEOUT_FIELD, ObjectParser.ValueType.STRING ); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX); } private final DateHistogramInterval fixedInterval; private final TimeValue waitTimeout; + private final Boolean forceMergeIndex; public static DownsampleAction parse(XContentParser parser) { return PARSER.apply(parser, null); } - public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout) { + public DownsampleAction(final DateHistogramInterval fixedInterval, final TimeValue waitTimeout, Boolean forceMergeIndex) { if (fixedInterval == null) { throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL_FIELD.getPreferredName() + "] is required."); } this.fixedInterval = fixedInterval; this.waitTimeout = waitTimeout == null ? DEFAULT_WAIT_TIMEOUT : waitTimeout; + this.forceMergeIndex = forceMergeIndex; } public DownsampleAction(StreamInput in) throws IOException { @@ -94,7 +106,8 @@ public DownsampleAction(StreamInput in) throws IOException { new DateHistogramInterval(in), in.getTransportVersion().onOrAfter(TransportVersions.V_8_10_X) ? TimeValue.parseTimeValue(in.readString(), WAIT_TIMEOUT_FIELD.getPreferredName()) - : DEFAULT_WAIT_TIMEOUT + : DEFAULT_WAIT_TIMEOUT, + in.getTransportVersion().supports(ILM_FORCE_MERGE_IN_DOWNSAMPLING) ? in.readOptionalBoolean() : null ); } @@ -106,6 +119,9 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeString(DEFAULT_WAIT_TIMEOUT.getStringRep()); } + if (out.getTransportVersion().supports(ILM_FORCE_MERGE_IN_DOWNSAMPLING)) { + out.writeOptionalBoolean(forceMergeIndex); + } } @Override @@ -113,6 +129,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(FIXED_INTERVAL_FIELD.getPreferredName(), fixedInterval.toString()); builder.field(WAIT_TIMEOUT_FIELD.getPreferredName(), waitTimeout.getStringRep()); + if (forceMergeIndex != null) { + builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex); + } builder.endObject(); return builder; } @@ -130,6 +149,10 @@ public TimeValue waitTimeout() { return waitTimeout; } + public Boolean forceMergeIndex() { + return forceMergeIndex; + } + @Override public boolean isSafeAction() { return false; @@ -146,6 +169,8 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { StepKey generateDownsampleIndexNameKey = new StepKey(phase, NAME, DownsamplePrepareLifeCycleStateStep.NAME); StepKey downsampleKey = new StepKey(phase, NAME, DownsampleStep.NAME); StepKey waitForDownsampleIndexKey = new StepKey(phase, NAME, WaitForIndexColorStep.NAME); + StepKey forceMergeKey = new StepKey(phase, NAME, ForceMergeStep.NAME); + StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME); StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME); StepKey copyIndexLifecycleKey = new StepKey(phase, NAME, CopySettingsStep.NAME); StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY); @@ -230,24 +255,30 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { ClusterStateWaitUntilThresholdStep downsampleAllocatedStep = new ClusterStateWaitUntilThresholdStep( new WaitForIndexColorStep( waitForDownsampleIndexKey, - copyMetadataKey, + isForceMergeEnabled() ? forceMergeKey : copyMetadataKey, ClusterHealthStatus.YELLOW, - (indexName, lifecycleState) -> lifecycleState.downsampleIndexName() + DOWNSAMPLED_INDEX_NAME_SUPPLIER ), cleanupDownsampleIndexKey ); + ForceMergeStep forceMergeStep = null; + SegmentCountStep segmentCountStep = null; + if (isForceMergeEnabled()) { + forceMergeStep = new ForceMergeStep(forceMergeKey, waitForSegmentCountKey, client, 1, DOWNSAMPLED_INDEX_NAME_SUPPLIER); + segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, copyMetadataKey, client, 1, DOWNSAMPLED_INDEX_NAME_SUPPLIER); + } CopyExecutionStateStep copyExecutionStateStep = new CopyExecutionStateStep( copyMetadataKey, copyIndexLifecycleKey, - (indexName, lifecycleState) -> lifecycleState.downsampleIndexName(), + DOWNSAMPLED_INDEX_NAME_SUPPLIER, nextStepKey ); CopySettingsStep copyLifecycleSettingsStep = new CopySettingsStep( copyIndexLifecycleKey, dataStreamCheckBranchingKey, - (indexName, lifecycleState) -> lifecycleState.downsampleIndexName(), + DOWNSAMPLED_INDEX_NAME_SUPPLIER, LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey() ); @@ -269,7 +300,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep( replaceDataStreamIndexKey, deleteIndexKey, - (sourceIndexName, lifecycleState) -> lifecycleState.downsampleIndexName() + DOWNSAMPLED_INDEX_NAME_SUPPLIER ); DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, nextStepKey, client); @@ -277,11 +308,11 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { swapAliasesKey, nextStepKey, client, - (indexName, lifecycleState) -> lifecycleState.downsampleIndexName(), + DOWNSAMPLED_INDEX_NAME_SUPPLIER, false ); - return List.of( + return Stream.of( isTimeSeriesIndexBranchingStep, checkNotWriteIndexStep, waitForNoFollowersStep, @@ -291,13 +322,19 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { generateDownsampleIndexNameStep, downsampleStep, downsampleAllocatedStep, + forceMergeStep, + segmentCountStep, copyExecutionStateStep, copyLifecycleSettingsStep, isDataStreamBranchingStep, replaceDataStreamBackingIndex, deleteSourceIndexStep, swapAliasesAndDeleteSourceIndexStep - ); + ).filter(Objects::nonNull).toList(); + } + + private boolean isForceMergeEnabled() { + return forceMergeIndex == null || forceMergeIndex; } @Override @@ -306,12 +343,14 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; DownsampleAction that = (DownsampleAction) o; - return Objects.equals(this.fixedInterval, that.fixedInterval); + return Objects.equals(this.fixedInterval, that.fixedInterval) + && Objects.equals(this.waitTimeout, that.waitTimeout) + && Objects.equals(this.forceMergeIndex, that.forceMergeIndex); } @Override public int hashCode() { - return Objects.hash(fixedInterval); + return Objects.hash(fixedInterval, waitTimeout, forceMergeIndex); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index 4593ad32d2031..38f2195e2695b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -149,7 +149,7 @@ public SearchableSnapshotAction(StreamInput in) throws IOException { this.replicateFor = in.readOptionalTimeValue(); } - boolean isForceMergeIndex() { + public boolean isForceMergeIndex() { return forceMergeIndex; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionTests.java new file mode 100644 index 0000000000000..125f73b297494 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionTests.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsage; +import org.elasticsearch.xpack.core.ilm.DownsampleAction; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; + +public class TimeSeriesUsageTransportActionTests extends ESTestCase { + + public void testForceMergeCountsSinglePhase() { + var policies = Stream.of( + // Explicitly enabled + randomPolicy(Map.of(randomFrom("hot", "warm", "cold"), new PhaseConfig(true, true, false, false, null))), + // ... but not needed + randomPolicy(Map.of(randomFrom("hot", "cold", "frozen"), new PhaseConfig(true, true, false, true, false))), + // Explicitly disabled but not needed + randomPolicy(Map.of(randomFrom("hot", "warm"), new PhaseConfig(true, false, true, false, false))), + // Unspecified and needed + randomPolicy(Map.of(randomFrom("hot", "warm"), new PhaseConfig(true, null, false, true, false))), + // Unspecified and not needed + randomPolicy(Map.of(randomFrom("hot", "warm"), new PhaseConfig(true, null, false, true, true))), + randomPolicy(Map.of(randomFrom("hot", "warm"), new PhaseConfig(true, null, true, true, false))), + randomPolicy(Map.of(randomFrom("hot", "warm"), new PhaseConfig(true, null, false, true, true))), + // No downsampling + randomPolicy(Map.of(randomFrom("hot", "warm"), new PhaseConfig(false, null, false, true, true))) + ); + var tracker = new TimeSeriesUsageTransportAction.IlmDownsamplingStatsTracker(); + policies.forEach(tracker::trackPolicy); + TimeSeriesFeatureSetUsage.IlmPolicyStats ilmPolicyStats = tracker.calculateIlmPolicyStats(); + assertThat(ilmPolicyStats.forceMergeExplicitlyEnabledCounter(), equalTo(2L)); + assertThat(ilmPolicyStats.forceMergeExplicitlyDisabledCounter(), equalTo(1L)); + assertThat(ilmPolicyStats.forceMergeDefaultCounter(), equalTo(4L)); + assertThat(ilmPolicyStats.downsampledForceMergeNeededCounter(), equalTo(1L)); + } + + public void testForceMergeCountsOverMultiplePhases() { + var tracker = new TimeSeriesUsageTransportAction.IlmDownsamplingStatsTracker(); + tracker.trackPolicy( + randomPolicy( + Map.of("warm", new PhaseConfig(true, null, false, false, false), "cold", new PhaseConfig(true, null, false, true, false)) + ) + ); + TimeSeriesFeatureSetUsage.IlmPolicyStats ilmPolicyStats = tracker.calculateIlmPolicyStats(); + assertThat(ilmPolicyStats.forceMergeExplicitlyEnabledCounter(), equalTo(0L)); + assertThat(ilmPolicyStats.forceMergeExplicitlyDisabledCounter(), equalTo(0L)); + assertThat(ilmPolicyStats.forceMergeDefaultCounter(), equalTo(2L)); + assertThat(ilmPolicyStats.downsampledForceMergeNeededCounter(), equalTo(1L)); + + // Because the force merge is happening due to another downsample action, we still count one. + tracker = new TimeSeriesUsageTransportAction.IlmDownsamplingStatsTracker(); + tracker.trackPolicy( + randomPolicy( + Map.of("warm", new PhaseConfig(true, null, false, false, false), "cold", new PhaseConfig(true, true, false, true, false)) + ) + ); + ilmPolicyStats = tracker.calculateIlmPolicyStats(); + assertThat(ilmPolicyStats.forceMergeExplicitlyEnabledCounter(), equalTo(1L)); + assertThat(ilmPolicyStats.forceMergeExplicitlyDisabledCounter(), equalTo(0L)); + assertThat(ilmPolicyStats.forceMergeDefaultCounter(), equalTo(1L)); + assertThat(ilmPolicyStats.downsampledForceMergeNeededCounter(), equalTo(1L)); + } + + private LifecyclePolicy randomPolicy(Map phases) { + Map phasesMap = new HashMap<>(); + for (var entry : phases.entrySet()) { + Map actions = new HashMap<>(); + PhaseConfig phaseConfig = entry.getValue(); + if (entry.getKey().equals("hot")) { + actions.put( + RolloverAction.NAME, + new RolloverAction(null, null, TimeValue.ONE_HOUR, null, null, null, null, null, null, null) + ); + } + if (phaseConfig.hasDownsampling) { + actions.put( + DownsampleAction.NAME, + new DownsampleAction(new DateHistogramInterval("1m"), null, phaseConfig.hasDownsamplingForceMerge) + ); + } + if (phaseConfig.hasForceMerge) { + actions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null)); + } + if (phaseConfig.hasSearchableSnapshot) { + actions.put( + SearchableSnapshotAction.NAME, + new SearchableSnapshotAction("my-repo", phaseConfig.searchableSnapshotForceMerge) + ); + } + phasesMap.put(entry.getKey(), new Phase(entry.getKey(), null, actions)); + } + return new LifecyclePolicy(randomAlphaOfLength(10), phasesMap); + } + + private record PhaseConfig( + boolean hasDownsampling, + Boolean hasDownsamplingForceMerge, + Boolean hasForceMerge, + Boolean hasSearchableSnapshot, + Boolean searchableSnapshotForceMerge + ) {} +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java index de9eb029c228d..82bfac046cd25 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java @@ -38,7 +38,7 @@ protected TimeSeriesFeatureSetUsage createTestInstance() { randomIntBetween(0, 100), randomIntBetween(100, 100000), randomDownsamplingFeatureStats(), - randomPhaseMap(), + randomIlmPolicyStats(), randomDownsamplingFeatureStats(), randomIntervalMap() ); @@ -65,7 +65,7 @@ protected TimeSeriesFeatureSetUsage mutateInstance(TimeSeriesFeatureSetUsage ins } var indexCount = instance.getTimeSeriesIndexCount(); var ilm = instance.getDownsamplingUsage().ilmDownsamplingStats(); - var ilmPhases = instance.getDownsamplingUsage().phasesUsedInDownsampling(); + var ilmPhases = instance.getDownsamplingUsage().ilmPolicyStats(); var dlm = instance.getDownsamplingUsage().dlmDownsamplingStats(); var indexPerInterval = instance.getDownsamplingUsage().indexCountPerInterval(); int randomisationBranch = between(0, 5); @@ -73,7 +73,7 @@ protected TimeSeriesFeatureSetUsage mutateInstance(TimeSeriesFeatureSetUsage ins case 0 -> dataStreamCount += randomIntBetween(1, 100); case 1 -> indexCount += randomIntBetween(1, 100); case 2 -> ilm = randomValueOtherThan(ilm, this::randomDownsamplingFeatureStats); - case 3 -> ilmPhases = randomValueOtherThan(ilmPhases, this::randomPhaseMap); + case 3 -> ilmPhases = randomValueOtherThan(ilmPhases, this::randomIlmPolicyStats); case 4 -> dlm = randomValueOtherThan(dlm, this::randomDownsamplingFeatureStats); case 5 -> indexPerInterval = randomValueOtherThan(indexPerInterval, this::randomIntervalMap); default -> throw new AssertionError("Illegal randomisation branch: " + randomisationBranch); @@ -91,9 +91,15 @@ private TimeSeriesFeatureSetUsage.DownsamplingFeatureStats randomDownsamplingFea ); } - private Map randomPhaseMap() { - return randomNonEmptySubsetOf(Set.of("hot", "warm", "cold")).stream() - .collect(Collectors.toMap(k -> k, ignored -> randomNonNegativeLong())); + private TimeSeriesFeatureSetUsage.IlmPolicyStats randomIlmPolicyStats() { + return new TimeSeriesFeatureSetUsage.IlmPolicyStats( + randomNonEmptySubsetOf(Set.of("hot", "warm", "cold")).stream() + .collect(Collectors.toMap(k -> k, ignored -> randomNonNegativeLong())), + randomLongBetween(0, 100), + randomLongBetween(0, 100), + randomLongBetween(0, 100), + randomLongBetween(0, 100) + ); } private Map randomIntervalMap() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DownsampleActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DownsampleActionTests.java index b8d5c0025d48a..7d4db96870209 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DownsampleActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DownsampleActionTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; @@ -35,7 +34,7 @@ public class DownsampleActionTests extends AbstractActionTestCase interval = randomValueOtherThan(interval, ConfigTestHelpers::randomInterval); + case 1 -> waitTimeout = TimeValue.timeValueMillis( + randomValueOtherThan(waitTimeout.millis(), () -> randomLongBetween(1, 10000)) + ); + case 2 -> forceMerge = forceMerge == null ? randomBoolean() : forceMerge == false; + } + return new DownsampleAction(interval, waitTimeout, forceMerge); } @Override @@ -65,7 +74,92 @@ public boolean isSafeAction() { @Override public void testToSteps() { - DownsampleAction action = new DownsampleAction(ConfigTestHelpers.randomInterval(), WAIT_TIMEOUT); + DownsampleAction action = new DownsampleAction(ConfigTestHelpers.randomInterval(), WAIT_TIMEOUT, randomBoolean() ? null : true); + String phase = randomAlphaOfLengthBetween(1, 10); + StepKey nextStepKey = new StepKey( + randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10) + ); + List steps = action.toSteps(null, phase, nextStepKey); + assertNotNull(steps); + assertEquals(17, steps.size()); + + assertTrue(steps.get(0) instanceof BranchingStep); + assertThat(steps.get(0).getKey().name(), equalTo(CONDITIONAL_TIME_SERIES_CHECK_KEY)); + expectThrows(IllegalStateException.class, () -> steps.get(0).getNextStepKey()); + assertThat(((BranchingStep) steps.get(0)).getNextStepKeyOnFalse(), equalTo(nextStepKey)); + assertThat(((BranchingStep) steps.get(0)).getNextStepKeyOnTrue().name(), equalTo(CheckNotDataStreamWriteIndexStep.NAME)); + + assertTrue(steps.get(1) instanceof CheckNotDataStreamWriteIndexStep); + assertThat(steps.get(1).getKey().name(), equalTo(CheckNotDataStreamWriteIndexStep.NAME)); + assertThat(steps.get(1).getNextStepKey().name(), equalTo(WaitForNoFollowersStep.NAME)); + + assertTrue(steps.get(2) instanceof WaitForNoFollowersStep); + assertThat(steps.get(2).getKey().name(), equalTo(WaitForNoFollowersStep.NAME)); + assertThat(steps.get(2).getNextStepKey().name(), equalTo(WaitUntilTimeSeriesEndTimePassesStep.NAME)); + + assertTrue(steps.get(3) instanceof WaitUntilTimeSeriesEndTimePassesStep); + assertThat(steps.get(3).getKey().name(), equalTo(WaitUntilTimeSeriesEndTimePassesStep.NAME)); + assertThat(steps.get(3).getNextStepKey().name(), equalTo(ReadOnlyStep.NAME)); + + assertTrue(steps.get(4) instanceof ReadOnlyStep); + assertThat(steps.get(4).getKey().name(), equalTo(ReadOnlyStep.NAME)); + assertThat(steps.get(4).getNextStepKey().name(), equalTo(DownsamplePrepareLifeCycleStateStep.NAME)); + + assertTrue(steps.get(5) instanceof NoopStep); + assertThat(steps.get(5).getKey().name(), equalTo(DownsampleAction.BWC_CLEANUP_TARGET_INDEX_NAME)); + assertThat(steps.get(5).getNextStepKey().name(), equalTo(DownsampleStep.NAME)); + + assertTrue(steps.get(6) instanceof DownsamplePrepareLifeCycleStateStep); + assertThat(steps.get(6).getKey().name(), equalTo(DownsamplePrepareLifeCycleStateStep.NAME)); + assertThat(steps.get(6).getNextStepKey().name(), equalTo(DownsampleStep.NAME)); + + assertTrue(steps.get(7) instanceof DownsampleStep); + assertThat(steps.get(7).getKey().name(), equalTo(DownsampleStep.NAME)); + assertThat(steps.get(7).getNextStepKey().name(), equalTo(WaitForIndexColorStep.NAME)); + + assertTrue(steps.get(8) instanceof ClusterStateWaitUntilThresholdStep); + assertThat(steps.get(8).getKey().name(), equalTo(WaitForIndexColorStep.NAME)); + assertThat(steps.get(8).getNextStepKey().name(), equalTo(ForceMergeStep.NAME)); + + assertTrue(steps.get(9) instanceof ForceMergeStep); + assertThat(steps.get(9).getKey().name(), equalTo(ForceMergeStep.NAME)); + assertThat(steps.get(9).getNextStepKey().name(), equalTo(SegmentCountStep.NAME)); + + assertTrue(steps.get(10) instanceof SegmentCountStep); + assertThat(steps.get(10).getKey().name(), equalTo(SegmentCountStep.NAME)); + assertThat(steps.get(10).getNextStepKey().name(), equalTo(CopyExecutionStateStep.NAME)); + + assertTrue(steps.get(11) instanceof CopyExecutionStateStep); + assertThat(steps.get(11).getKey().name(), equalTo(CopyExecutionStateStep.NAME)); + assertThat(steps.get(11).getNextStepKey().name(), equalTo(CopySettingsStep.NAME)); + + assertTrue(steps.get(12) instanceof CopySettingsStep); + assertThat(steps.get(12).getKey().name(), equalTo(CopySettingsStep.NAME)); + assertThat(steps.get(12).getNextStepKey().name(), equalTo(CONDITIONAL_DATASTREAM_CHECK_KEY)); + + assertTrue(steps.get(13) instanceof BranchingStep); + assertThat(steps.get(13).getKey().name(), equalTo(CONDITIONAL_DATASTREAM_CHECK_KEY)); + expectThrows(IllegalStateException.class, () -> steps.get(13).getNextStepKey()); + assertThat(((BranchingStep) steps.get(13)).getNextStepKeyOnFalse().name(), equalTo(SwapAliasesAndDeleteSourceIndexStep.NAME)); + assertThat(((BranchingStep) steps.get(13)).getNextStepKeyOnTrue().name(), equalTo(ReplaceDataStreamBackingIndexStep.NAME)); + + assertTrue(steps.get(14) instanceof ReplaceDataStreamBackingIndexStep); + assertThat(steps.get(14).getKey().name(), equalTo(ReplaceDataStreamBackingIndexStep.NAME)); + assertThat(steps.get(14).getNextStepKey().name(), equalTo(DeleteStep.NAME)); + + assertTrue(steps.get(15) instanceof DeleteStep); + assertThat(steps.get(15).getKey().name(), equalTo(DeleteStep.NAME)); + assertThat(steps.get(15).getNextStepKey(), equalTo(nextStepKey)); + + assertTrue(steps.get(16) instanceof SwapAliasesAndDeleteSourceIndexStep); + assertThat(steps.get(16).getKey().name(), equalTo(SwapAliasesAndDeleteSourceIndexStep.NAME)); + assertThat(steps.get(16).getNextStepKey(), equalTo(nextStepKey)); + } + + public void testToStepsWithoutForceMerge() { + DownsampleAction action = new DownsampleAction(ConfigTestHelpers.randomInterval(), WAIT_TIMEOUT, false); String phase = randomAlphaOfLengthBetween(1, 10); StepKey nextStepKey = new StepKey( randomAlphaOfLengthBetween(1, 10), @@ -143,7 +237,8 @@ public void testToSteps() { public void testDownsamplingPrerequisitesStep() { DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval(); - DownsampleAction action = new DownsampleAction(fixedInterval, WAIT_TIMEOUT); + boolean withForceMerge = randomBoolean(); + DownsampleAction action = new DownsampleAction(fixedInterval, WAIT_TIMEOUT, withForceMerge); String phase = randomAlphaOfLengthBetween(1, 10); StepKey nextStepKey = new StepKey( randomAlphaOfLengthBetween(1, 10), @@ -152,7 +247,7 @@ public void testDownsamplingPrerequisitesStep() { ); { // non time series indices skip the action - BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey); + BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey, withForceMerge); IndexMetadata indexMetadata = newIndexMeta("test", Settings.EMPTY); ProjectState state = projectStateFromProject(ProjectMetadata.builder(randomProjectIdOrDefault()).put(indexMetadata, true)); @@ -162,7 +257,7 @@ public void testDownsamplingPrerequisitesStep() { } { // time series indices execute the action - BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey); + BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey, withForceMerge); Settings settings = Settings.builder() .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .put("index.routing_path", "uid") @@ -176,7 +271,7 @@ public void testDownsamplingPrerequisitesStep() { } { // already downsampled indices for the interval skip the action - BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey); + BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey, withForceMerge); Settings settings = Settings.builder() .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .put("index.routing_path", "uid") @@ -193,7 +288,7 @@ public void testDownsamplingPrerequisitesStep() { } { // indices with the same name as the target downsample index that are NOT downsample indices skip the action - BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey); + BranchingStep branchingStep = getFirstBranchingStep(action, phase, nextStepKey, withForceMerge); String indexName = DOWNSAMPLED_INDEX_PREFIX + fixedInterval + "-test"; IndexMetadata indexMetadata = newIndexMeta(indexName, Settings.EMPTY); @@ -204,10 +299,10 @@ public void testDownsamplingPrerequisitesStep() { } } - private static BranchingStep getFirstBranchingStep(DownsampleAction action, String phase, StepKey nextStepKey) { + private static BranchingStep getFirstBranchingStep(DownsampleAction action, String phase, StepKey nextStepKey, boolean withForceMerge) { List steps = action.toSteps(null, phase, nextStepKey); assertNotNull(steps); - assertEquals(15, steps.size()); + assertEquals(withForceMerge ? 17 : 15, steps.size()); assertTrue(steps.get(0) instanceof BranchingStep); assertThat(steps.get(0).getKey().name(), equalTo(CONDITIONAL_TIME_SERIES_CHECK_KEY)); @@ -218,17 +313,4 @@ private static BranchingStep getFirstBranchingStep(DownsampleAction action, Stri public static IndexMetadata newIndexMeta(String name, Settings indexSettings) { return IndexMetadata.builder(name).settings(indexSettings(IndexVersion.current(), 1, 1).put(indexSettings)).build(); } - - public void testEqualsAndHashCode() { - EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copy, this::notCopy); - } - - DownsampleAction copy(DownsampleAction downsampleAction) { - return new DownsampleAction(downsampleAction.fixedInterval(), downsampleAction.waitTimeout()); - } - - DownsampleAction notCopy(DownsampleAction downsampleAction) { - DateHistogramInterval fixedInterval = randomValueOtherThan(downsampleAction.fixedInterval(), ConfigTestHelpers::randomInterval); - return new DownsampleAction(fixedInterval, WAIT_TIMEOUT); - } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java index 99fc3a80d8b8b..bac42a430cce8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java @@ -76,7 +76,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase { // same phase private static final MigrateAction TEST_MIGRATE_ACTION = MigrateAction.DISABLED; public static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - private static final DownsampleAction TEST_DOWNSAMPLE_ACTION = new DownsampleAction(DateHistogramInterval.DAY, TIMEOUT); + private static final DownsampleAction TEST_DOWNSAMPLE_ACTION = new DownsampleAction(DateHistogramInterval.DAY, TIMEOUT, true); public void testValidatePhases() { boolean invalid = randomBoolean(); @@ -360,12 +360,12 @@ public void testValidateDownsamplingAction() { Phase warmPhase = new Phase( "warm", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT, randomBoolean())) ); Phase coldPhase = new Phase( "cold", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT, randomBoolean())) ); IllegalArgumentException e = expectThrows( @@ -382,12 +382,12 @@ public void testValidateDownsamplingAction() { Phase warmPhase = new Phase( "warm", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT, randomBoolean())) ); Phase coldPhase = new Phase( "cold", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(30), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(30), TIMEOUT, randomBoolean())) ); IllegalArgumentException e = expectThrows( @@ -404,12 +404,12 @@ public void testValidateDownsamplingAction() { Phase warmPhase = new Phase( "warm", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1), TIMEOUT, randomBoolean())) ); Phase coldPhase = new Phase( "cold", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(130), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(130), TIMEOUT, randomBoolean())) ); IllegalArgumentException e = expectThrows( @@ -430,18 +430,18 @@ public void testValidateDownsamplingAction() { RolloverAction.NAME, TEST_ROLLOVER_ACTION, DownsampleAction.NAME, - new DownsampleAction(DateHistogramInterval.minutes(10), TIMEOUT) + new DownsampleAction(DateHistogramInterval.minutes(10), TIMEOUT, randomBoolean()) ) ); Phase warmPhase = new Phase( "warm", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(30), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(30), TIMEOUT, randomBoolean())) ); Phase coldPhase = new Phase( "cold", TimeValue.ZERO, - Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(2), TIMEOUT)) + Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(2), TIMEOUT, randomBoolean())) ); // This is a valid interval combination diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 1568f7e84ec09..6824c14f83797 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.time.Instant; import java.util.List; -import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -45,9 +44,8 @@ public class DownsampleIT extends DownsamplingIntegTestCase { public void testDownsamplingPassthroughDimensions() throws Exception { String dataStreamName = "metrics-foo"; - String mapping = String.format(Locale.ROOT, """ + String mapping = """ { - %s "properties": { "attributes": { "type": "passthrough", @@ -66,7 +64,7 @@ public void testDownsamplingPassthroughDimensions() throws Exception { } } } - """, generateForceMergeMetadata()); + """; // Create data stream by indexing documents final Instant now = Instant.now(); @@ -173,16 +171,6 @@ private void downsampleAndAssert(String dataStreamName, String mapping, Supplier assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); } - private String generateForceMergeMetadata() { - return switch (randomIntBetween(0, 4)) { - case 0 -> "\"_meta\": { \"downsample.forcemerge.enabled\": false},"; - case 1 -> "\"_meta\": { \"downsample.forcemerge.enabled\": true},"; - case 2 -> "\"_meta\": { \"downsample.forcemerge.enabled\": 4},"; - case 3 -> "\"_meta\": { \"downsample.forcemerge.enabled\": null},"; - default -> ""; - }; - } - public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { String dataStreamName = "metrics-foo"; Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build(); diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java index 6c84d6083c47b..ae3d679ecd06e 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java @@ -118,7 +118,10 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas, new Phase( "warm", TimeValue.ZERO, - Map.of("downsample", new org.elasticsearch.xpack.core.ilm.DownsampleAction(DateHistogramInterval.HOUR, null)) + Map.of( + "downsample", + new org.elasticsearch.xpack.core.ilm.DownsampleAction(DateHistogramInterval.HOUR, null, randomBoolean()) + ) ) ); LifecyclePolicy policy = new LifecyclePolicy(POLICY_NAME, phases); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index e5eac5a793c69..f68fb5025bf3d 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -14,7 +14,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.cluster.stats.MappingVisitor; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; @@ -301,7 +301,7 @@ protected void masterOperation( final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId()); // Short circuit if target index has been downsampled: final String downsampleIndexName = request.getTargetIndex(); - if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), startTime, true, projectMetadata, listener)) { + if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), startTime, projectMetadata, listener)) { logger.info("Skipping downsampling, because a previous execution already completed downsampling"); return; } @@ -319,7 +319,7 @@ protected void masterOperation( // 5. Make downsample index read-only and set replicas // 6. Refresh downsample index // 7. Mark downsample index as "completed successfully" - // 8. Force-merge the downsample index to a single segment + // 8. Flush the downsample index to disk // At any point if there is an issue, delete the downsample index // 1. Extract source index mappings @@ -338,7 +338,6 @@ protected void masterOperation( final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(sourceIndexMetadata); final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings); mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE); - final boolean forceMergeEnabled = isForceMergeEnabled(sourceIndexMappings); // Validate downsampling interval validateDownsamplingInterval(mapperService, request.getDownsampleConfig()); @@ -414,8 +413,7 @@ protected void masterOperation( startTime, metricFields, labelFields, - dimensionFields, - forceMergeEnabled + dimensionFields ); } else { recordFailureMetrics(startTime); @@ -428,7 +426,6 @@ protected void masterOperation( parentTask, request.getWaitTimeout(), startTime, - forceMergeEnabled, clusterService.state().metadata().getProject(projectMetadata.id()), listener )) { @@ -446,8 +443,7 @@ protected void masterOperation( startTime, metricFields, labelFields, - dimensionFields, - forceMergeEnabled + dimensionFields ); } else { recordFailureMetrics(startTime); @@ -458,18 +454,6 @@ protected void masterOperation( })); } - private boolean isForceMergeEnabled(Map sourceIndexMappings) { - if (sourceIndexMappings.containsKey("_meta")) { - if (sourceIndexMappings.get("_meta") instanceof Map metadataMap) { - var enabledForceMergeValue = metadataMap.get("downsample.forcemerge.enabled"); - if (enabledForceMergeValue instanceof Boolean enabledForceMerge) { - return enabledForceMerge; - } - } - } - return true; - } - /** * Shortcircuit when another downsample api invocation already completed successfully. */ @@ -478,7 +462,6 @@ private boolean canShortCircuit( TaskId parentTask, TimeValue waitTimeout, long startTime, - boolean forceMergeEnabled, ProjectMetadata projectMetadata, ActionListener listener ) { @@ -512,8 +495,7 @@ private boolean canShortCircuit( parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout, - startTime, - forceMergeEnabled + startTime ) ); return true; @@ -533,8 +515,7 @@ private void performShardDownsampling( long startTime, List metricFields, List labelFields, - List dimensionFields, - boolean forceMergeEnabled + List dimensionFields ) { final int numberOfShards = sourceIndexMetadata.getNumberOfShards(); final Index sourceIndex = sourceIndexMetadata.getIndex(); @@ -594,8 +575,7 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask nextStepListener = forceMergeEnabled - ? new ForceMergeActionListener(parentTask, downsampleIndexName, startTime, actionListener) - : new MeasurementActionListener(startTime, actionListener); + ActionListener nextStepListener = new FlushActionListener( + parentTask, + downsampleIndexName, + startTime, + actionListener + ); // Mark downsample index as "completed successfully" ("index.downsample.status": "success") taskQueue.submitTask( "update-downsample-metadata [" + downsampleIndexName + "]", @@ -1189,15 +1156,15 @@ public void onFailure(Exception e) { } /** - * Triggers a force merge operation on the downsample target index + * Triggers a flush operation on the downsample target index */ - class ForceMergeActionListener implements ActionListener { + class FlushActionListener implements ActionListener { final ActionListener actionListener; private final TaskId parentTask; private final String downsampleIndexName; - ForceMergeActionListener( + FlushActionListener( final TaskId parentTask, final String downsampleIndexName, final long startTime, @@ -1210,18 +1177,17 @@ class ForceMergeActionListener implements ActionListener { @Override public void onResponse(final AcknowledgedResponse response) { - ForceMergeRequest request = new ForceMergeRequest(downsampleIndexName); - request.maxNumSegments(1); + FlushRequest request = new FlushRequest(downsampleIndexName); request.setParentTask(parentTask); client.admin() .indices() - .forceMerge(request, ActionListener.wrap(mergeIndexResp -> { actionListener.onResponse(AcknowledgedResponse.TRUE); }, t -> { + .flush(request, ActionListener.wrap(flushIndexResp -> actionListener.onResponse(AcknowledgedResponse.TRUE), t -> { /* * At this point downsample index has been created - * successfully even if force merge failed. + * successfully even if flush failed. * So, we should not fail the downsample operation. */ - logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t); + logger.error("Failed to flush downsample index [" + downsampleIndexName + "]", t); actionListener.onResponse(AcknowledgedResponse.TRUE); })); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index a968198098b39..b4f5236fc2255 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -65,7 +65,6 @@ import java.io.IOException; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -103,10 +102,9 @@ public class TransportDownsampleActionTests extends ESTestCase { @Mock private MapperService mapperService; - private static final String TEMPLATE_MAPPING = """ + private static final String MAPPING = """ { "_doc": { - %s "properties": { "attributes.host": { "type": "keyword", @@ -120,23 +118,6 @@ public class TransportDownsampleActionTests extends ESTestCase { } }"""; - private static final String NO_METADATA_MAPPING = String.format(Locale.ROOT, TEMPLATE_MAPPING, ""); - private static final String OTHER_METADATA_MAPPING = String.format( - Locale.ROOT, - TEMPLATE_MAPPING, - "\"_meta\":{\"downsample.forcemerge.enabled\":100}," - ); - private static final String FORCE_MERGE_ENABLED_MAPPING = String.format( - Locale.ROOT, - TEMPLATE_MAPPING, - "\"_meta\":{\"downsample.forcemerge.enabled\":true}," - ); - private static final String FORCE_MERGE_DISABLED_MAPPING = String.format( - Locale.ROOT, - TEMPLATE_MAPPING, - "\"_meta\":{\"downsample.forcemerge.enabled\":false}," - ); - private TransportDownsampleAction action; private AutoCloseable mocks; @@ -175,11 +156,13 @@ public void setUp() throws Exception { projectId = randomProjectIdOrDefault(); task = new Task(1, "type", "action", "description", null, null); + // Initialise mocks for thread pool and cluster service var threadContext = new ThreadContext(Settings.EMPTY); when(threadPool.getThreadContext()).thenReturn(threadContext); when(clusterService.localNode()).thenReturn(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "node_name")); when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + // Mock refresh & flush requests Answer mockBroadcastResponse = invocation -> { @SuppressWarnings("unchecked") var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); @@ -187,17 +170,34 @@ public void setUp() throws Exception { return null; }; doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any()); - doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any()); + doAnswer(mockBroadcastResponse).when(indicesAdminClient).flush(any(), any()); + + // Mocks for updating downsampling metadata doAnswer(invocation -> { var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); return null; }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + + // Mocks for mapping retrieval & merging when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); MappedFieldType timestampFieldMock = mock(MappedFieldType.class); when(timestampFieldMock.meta()).thenReturn(Map.of()); when(mapperService.fieldType(any())).thenReturn(timestampFieldMock); when(mapperService.mappingLookup()).thenReturn(MappingLookup.EMPTY); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); + listener.onResponse( + new GetMappingsResponse( + Map.of(sourceIndex, new MappingMetadata("_doc", XContentHelper.convertToMap(JsonXContent.jsonXContent, MAPPING, true))) + ) + ); + return null; + }).when(indicesAdminClient).getMappings(any(), any()); + DocumentMapper documentMapper = mock(DocumentMapper.class); + when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON(MAPPING)); + when(mapperService.merge(anyString(), any(CompressedXContent.class), any())).thenReturn(documentMapper); } @After @@ -206,25 +206,7 @@ public void tearDown() throws Exception { mocks.close(); } - public void testDownsamplingWithForceMerge() throws IOException { - String mapping = switch (randomIntBetween(0, 2)) { - case 0 -> NO_METADATA_MAPPING; - case 1 -> OTHER_METADATA_MAPPING; - default -> FORCE_MERGE_ENABLED_MAPPING; - }; - downsample(mapping); - verify(indicesAdminClient).forceMerge(any(), any()); - } - - public void testDownsamplingSkipsForceMerge() throws IOException { - downsample(FORCE_MERGE_DISABLED_MAPPING); - verify(indicesAdminClient, never()).forceMerge(any(), any()); - } - - private void downsample(String mapping) throws IOException { - mockGetMapping(mapping); - mockMergedMapping(mapping); - + public void testDownsampling() { var projectMetadata = ProjectMetadata.builder(projectId) .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) .build(); @@ -243,14 +225,14 @@ private void downsample(String mapping) throws IOException { }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); Answer mockPersistentTask = invocation -> { ActionListener> listener = invocation.getArgument(4); - PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); - when(task.getId()).thenReturn(randomAlphaOfLength(10)); + PersistentTasksCustomMetadata.PersistentTask task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task1.getId()).thenReturn(randomAlphaOfLength(10)); DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( DownsampleShardIndexerStatus.COMPLETED, null ); - when(task.getState()).thenReturn(runningTaskState); - listener.onResponse(task); + when(task1.getState()).thenReturn(runningTaskState); + listener.onResponse(task1); return null; }; doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); @@ -275,18 +257,10 @@ private void downsample(String mapping) throws IOException { listener ); safeGet(listener); - verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verifyIndexFinalisation(); } public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { - String mapping = switch (randomIntBetween(0, 3)) { - case 0 -> NO_METADATA_MAPPING; - case 1 -> OTHER_METADATA_MAPPING; - case 2 -> FORCE_MERGE_ENABLED_MAPPING; - default -> FORCE_MERGE_DISABLED_MAPPING; - }; - mockGetMapping(mapping); - var projectMetadata = ProjectMetadata.builder(projectId) .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) .put(createTargetIndexMetadata(targetIndex, primaryShards, replicaShards)) @@ -313,29 +287,10 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { listener ); safeGet(listener); - verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); - verify(indicesAdminClient).forceMerge(any(), any()); - } - - public void testDownsamplingForceMergeWithShortCircuitDuringCreation() throws IOException { - String mapping = switch (randomIntBetween(0, 2)) { - case 0 -> NO_METADATA_MAPPING; - case 1 -> OTHER_METADATA_MAPPING; - default -> FORCE_MERGE_ENABLED_MAPPING; - }; - downsampleWithShortCircuitDuringCreation(mapping); - verify(indicesAdminClient).forceMerge(any(), any()); - } - - public void testDownsamplingSkipsForceMergeWithShortCircuitDuringCreation() throws IOException { - downsampleWithShortCircuitDuringCreation(FORCE_MERGE_DISABLED_MAPPING); - verify(indicesAdminClient, never()).forceMerge(any(), any()); + verifyIndexFinalisation(); } - public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOException { - mockGetMapping(mapping); - mockMergedMapping(mapping); - + public void testDownsamplingWithShortCircuitDuringCreation() throws IOException { var projectMetadata = ProjectMetadata.builder(projectId) .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) .build(); @@ -374,26 +329,14 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx listener ); safeGet(listener); - verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); - } - - private void mockGetMapping(String mapping) { - doAnswer(invocation -> { - @SuppressWarnings("unchecked") - var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); - listener.onResponse( - new GetMappingsResponse( - Map.of(sourceIndex, new MappingMetadata("_doc", XContentHelper.convertToMap(JsonXContent.jsonXContent, mapping, true))) - ) - ); - return null; - }).when(indicesAdminClient).getMappings(any(), any()); + verifyIndexFinalisation(); } - private void mockMergedMapping(String mapping) throws IOException { - DocumentMapper documentMapper = mock(DocumentMapper.class); - when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON(mapping)); - when(mapperService.merge(anyString(), any(CompressedXContent.class), any())).thenReturn(documentMapper); + private void verifyIndexFinalisation() { + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(indicesAdminClient).refresh(any(), any()); + verify(indicesAdminClient).flush(any(), any()); + verify(indicesAdminClient, never()).forceMerge(any(), any()); } private IndexMetadata.Builder createSourceIndexMetadata(String sourceIndex, int primaryShards, int replicaShards) { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/DownsampleActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/DownsampleActionIT.java index 843a8ff8f4a42..f552720650cc7 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/DownsampleActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/DownsampleActionIT.java @@ -194,7 +194,12 @@ public void testRollupIndex() throws Exception { // Create the ILM policy String phaseName = randomFrom("warm", "cold"); DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval(); - createNewSingletonPolicy(client(), policy, phaseName, new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT)); + createNewSingletonPolicy( + client(), + policy, + phaseName, + new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT, randomBoolean()) + ); // Create a time series index managed by the policy createIndex(index, alias, policy, true); @@ -228,7 +233,7 @@ public void testRollupIndexInTheHotPhaseWithoutRollover() { client(), policy, "hot", - new DownsampleAction(ConfigTestHelpers.randomInterval(), DownsampleAction.DEFAULT_WAIT_TIMEOUT) + new DownsampleAction(ConfigTestHelpers.randomInterval(), DownsampleAction.DEFAULT_WAIT_TIMEOUT, randomBoolean()) ) ); assertTrue( @@ -246,7 +251,7 @@ public void testRollupIndexInTheHotPhaseAfterRollover() throws Exception { RolloverAction.NAME, new RolloverAction(null, null, null, 1L, null, null, null, null, null, null), DownsampleAction.NAME, - new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT) + new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT, randomBoolean()) ); Map phases = Map.of("hot", new Phase("hot", TimeValue.ZERO, hotActions)); LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases); @@ -305,7 +310,12 @@ public void testRollupIndexInTheHotPhaseAfterRollover() throws Exception { public void testTsdbDataStreams() throws Exception { // Create the ILM policy DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval(); - createNewSingletonPolicy(client(), policy, "warm", new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT)); + createNewSingletonPolicy( + client(), + policy, + "warm", + new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT, randomBoolean()) + ); // Create a template Request createIndexTemplateRequest = new Request("POST", "/_index_template/" + dataStream); @@ -353,7 +363,12 @@ public void testTsdbDataStreams() throws Exception { public void testILMWaitsForTimeSeriesEndTimeToLapse() throws Exception { // Create the ILM policy DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval(); - createNewSingletonPolicy(client(), policy, "warm", new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT)); + createNewSingletonPolicy( + client(), + policy, + "warm", + new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT, randomBoolean()) + ); // Create a template Request createIndexTemplateRequest = new Request("POST", "/_index_template/" + dataStream); @@ -396,7 +411,12 @@ public void testRollupNonTSIndex() throws Exception { // Create the ILM policy String phaseName = randomFrom("warm", "cold"); DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval(); - createNewSingletonPolicy(client(), policy, phaseName, new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT)); + createNewSingletonPolicy( + client(), + policy, + phaseName, + new DownsampleAction(fixedInterval, DownsampleAction.DEFAULT_WAIT_TIMEOUT, randomBoolean()) + ); // Create a non TSDB managed index createIndex(index, alias, policy, false);