Skip to content
Merged
6 changes: 6 additions & 0 deletions docs/changelog/137375.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 137375
summary: Allow opting out of force-merging on a cloned index in ILM's searchable snapshot
action
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ By default, this snapshot is deleted by the [delete action](/reference/elasticse

This force merging occurs in the phase that the index is in **prior** to the `searchable_snapshot` action. For example, if using a `searchable_snapshot` action in the `hot` phase, the force merge will be performed on the hot nodes. If using a `searchable_snapshot` action in the `cold` phase, the force merge will be performed on whatever tier the index is **prior** to the `cold` phase (either `hot` or `warm`).

`force_merge_on_clone` {applies_to}`stack: ga 9.2.1`
: (Optional, Boolean) By default, if `force_merge_index` is `true`, the index will first be cloned with 0 replicas and the force-merge will be performed on the clone before the searchable snapshot is created. This avoids performing the force-merge redundantly on replica shards, as the snapshot operation only uses primary shards. Setting this option to `false` will skip the clone step and perform the force-merge directly on the managed index. Defaults to `true`.

`total_shards_per_node`
: The maximum number of shards (replicas and primaries) that will be allocated to a single node for the searchable snapshot index. Defaults to unbounded.
: (Optional, Integer) The maximum number of shards (replicas and primaries) that will be allocated to a single node for the searchable snapshot index. Defaults to unbounded.


## Examples [ilm-searchable-snapshot-ex]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9207000,9185006
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_resolve_fields_response_used,9185005
ilm_searchable_snapshot_opt_out_clone,9185006
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
shard_capacity_unhealthy_thresholds,9206000
ilm_searchable_snapshot_opt_out_clone,9207000
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -58,6 +59,12 @@ public class SearchableSnapshotAction implements LifecycleAction {
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node");
public static final ParseField REPLICATE_FOR = new ParseField("replicate_for");
public static final ParseField FORCE_MERGE_ON_CLONE = new ParseField("force_merge_on_clone");

private static final TransportVersion FORCE_MERGE_ON_CLONE_TRANSPORT_VERSION = TransportVersion.fromName(
"ilm_searchable_snapshot_opt_out_clone"
);

public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites";
public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot";
public static final String CONDITIONAL_SKIP_CLONE_STEP = BranchingStep.NAME + "-skip-clone-check";
Expand Down Expand Up @@ -87,7 +94,7 @@ public class SearchableSnapshotAction implements LifecycleAction {

private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(
NAME,
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2], (TimeValue) a[3])
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2], (TimeValue) a[3], (Boolean) a[4])
);

static {
Expand All @@ -100,6 +107,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
REPLICATE_FOR,
ObjectParser.ValueType.STRING
);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_ON_CLONE);
}

public static SearchableSnapshotAction parse(XContentParser parser) {
Expand All @@ -112,12 +120,16 @@ public static SearchableSnapshotAction parse(XContentParser parser) {
private final Integer totalShardsPerNode;
@Nullable
private final TimeValue replicateFor;
/** Opt-out field for forcing the force-merge step to run on the source index instead of a cloned version with 0 replicas. */
@Nullable
private final Boolean forceMergeOnClone;

public SearchableSnapshotAction(
String snapshotRepository,
boolean forceMergeIndex,
@Nullable Integer totalShardsPerNode,
@Nullable TimeValue replicateFor
@Nullable TimeValue replicateFor,
@Nullable Boolean forceMergeOnClone
) {
if (Strings.hasText(snapshotRepository) == false) {
throw new IllegalArgumentException("the snapshot repository must be specified");
Expand All @@ -136,21 +148,35 @@ public SearchableSnapshotAction(
);
}
this.replicateFor = replicateFor;

if (forceMergeIndex == false && forceMergeOnClone != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't ready through the whole PR yet so I might be missing something, but what if forceMergeOnClone is false? We don't want to throw an exception here do we? Is there any reason for forceMergeOnClone to be a Boolean rather than a boolean? Is there any real value in it being able to have 3 states instead of 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was a bit on the fence here. My reason for going with a Boolean was that it better represents what the user actually does (null, false, true). However, we don't currently make a lot of use of that null state, so I don't have super strong feelings on this. If you strongly prefer having a boolean, I'm ok with changing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense with the toXcontent not returning it when null. But is this a bug here that you can't say force_merge_on_clone: false and force_merge_index: false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I forgot to comment on that. I intentionally went with that behavior, as I thought it was cleaner and less confusing if force_merge_on_clone is simply not supported when force_merge_index: false. It wouldn't have any effect if you were able to specify it, but I think it's confusing to allow configuring fields that are ignored. This way, it's clear that the force_merge_on_clone field is not relevant/supported. What are your thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit odd to me, but it sort of makes sense.

throw new IllegalArgumentException(
Strings.format(
"[%s] is not allowed when [%s] is [false]",
FORCE_MERGE_ON_CLONE.getPreferredName(),
FORCE_MERGE_INDEX.getPreferredName()
)
);
}
this.forceMergeOnClone = forceMergeOnClone;
}

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
this(snapshotRepository, forceMergeIndex, null, null);
this(snapshotRepository, forceMergeIndex, null, null, null);
}

public SearchableSnapshotAction(String snapshotRepository) {
this(snapshotRepository, true, null, null);
this(snapshotRepository, true, null, null, null);
}

public SearchableSnapshotAction(StreamInput in) throws IOException {
this.snapshotRepository = in.readString();
this.forceMergeIndex = in.readBoolean();
this.totalShardsPerNode = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalInt() : null;
this.replicateFor = in.getTransportVersion().supports(TransportVersions.V_8_18_0) ? in.readOptionalTimeValue() : null;
this.forceMergeOnClone = in.getTransportVersion().supports(FORCE_MERGE_ON_CLONE_TRANSPORT_VERSION)
? in.readOptionalBoolean()
: null;
}

public boolean isForceMergeIndex() {
Expand All @@ -171,6 +197,11 @@ public TimeValue getReplicateFor() {
return replicateFor;
}

@Nullable
public Boolean isForceMergeOnClone() {
return forceMergeOnClone;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
assert false;
Expand Down Expand Up @@ -299,9 +330,12 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
Instant::now
);

// We force-merge on the clone by default, but allow the user to opt-out of this behavior if there is any reason why they don't want
// to clone the index (e.g. if something is preventing the cloned index shards from being assigned).
StepKey keyForForceMerge = shouldForceMergeOnClone() ? conditionalSkipCloneKey : forceMergeStepKey;
// When generating a snapshot, we either jump to the force merge section, or we skip the
// forcemerge and go straight to steps for creating the snapshot
StepKey keyForSnapshotGeneration = forceMergeIndex ? conditionalSkipCloneKey : generateSnapshotNameKey;
StepKey keyForSnapshotGeneration = forceMergeIndex ? keyForForceMerge : generateSnapshotNameKey;
// Branch, deciding whether there is an existing searchable snapshot that can be used for mounting the index
// (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot
BranchingStep skipGeneratingSnapshotStep = new BranchingStep(
Expand Down Expand Up @@ -529,12 +563,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
steps.add(waitUntilTimeSeriesEndTimeStep);
steps.add(skipGeneratingSnapshotStep);
if (forceMergeIndex) {
steps.add(conditionalSkipCloneStep);
steps.add(readOnlyStep);
steps.add(cleanupClonedIndexStep);
steps.add(generateCloneIndexNameStep);
steps.add(cloneIndexStep);
steps.add(waitForClonedIndexGreenStep);
if (shouldForceMergeOnClone()) {
steps.add(conditionalSkipCloneStep);
steps.add(readOnlyStep);
steps.add(cleanupClonedIndexStep);
steps.add(generateCloneIndexNameStep);
steps.add(cloneIndexStep);
steps.add(waitForClonedIndexGreenStep);
}
steps.add(forceMergeStep);
steps.add(segmentCountStep);
}
Expand Down Expand Up @@ -581,6 +617,15 @@ static MountSearchableSnapshotRequest.Storage getConcreteStorageType(StepKey cur
}
}

/**
* Returns whether we should first clone the index and perform the force-merge on that cloned index (true) or force-merge on the
* original index (false). Defaults to true when {@link #forceMergeOnClone} is null/unspecified. Note that this value is ignored when
* {@link #forceMergeIndex} is false.
*/
private boolean shouldForceMergeOnClone() {
return forceMergeOnClone == null || forceMergeOnClone;
}

@Override
public boolean isSafeAction() {
return true;
Expand All @@ -601,6 +646,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(TransportVersions.V_8_18_0)) {
out.writeOptionalTimeValue(replicateFor);
}
if (out.getTransportVersion().supports(FORCE_MERGE_ON_CLONE_TRANSPORT_VERSION)) {
out.writeOptionalBoolean(forceMergeOnClone);
}
}

@Override
Expand All @@ -614,6 +662,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (replicateFor != null) {
builder.field(REPLICATE_FOR.getPreferredName(), replicateFor);
}
if (forceMergeOnClone != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this answers my question from above -- we don't want it in the xcontent if it has not been set explicitly.

builder.field(FORCE_MERGE_ON_CLONE.getPreferredName(), forceMergeOnClone);
}
builder.endObject();
return builder;
}
Expand All @@ -630,12 +681,13 @@ public boolean equals(Object o) {
return Objects.equals(snapshotRepository, that.snapshotRepository)
&& Objects.equals(forceMergeIndex, that.forceMergeIndex)
&& Objects.equals(totalShardsPerNode, that.totalShardsPerNode)
&& Objects.equals(replicateFor, that.replicateFor);
&& Objects.equals(replicateFor, that.replicateFor)
&& Objects.equals(forceMergeOnClone, that.forceMergeOnClone);
}

@Override
public int hashCode() {
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor);
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor, forceMergeOnClone);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
randomAlphaOfLength(10),
randomBoolean(),
(randomBoolean() ? null : randomIntBetween(1, 100)),
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ public void testToSteps() {

List<Step> steps = action.toSteps(null, phase, nextStepKey, null);

List<StepKey> expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex(), action.getReplicateFor() != null);
List<StepKey> expectedSteps = expectedStepKeys(
phase,
action.isForceMergeIndex(),
action.getReplicateFor() != null,
action.isForceMergeOnClone()
);

assertThat(steps.size(), is(expectedSteps.size()));
for (int i = 0; i < expectedSteps.size(); i++) {
Expand All @@ -54,7 +59,7 @@ public void testToSteps() {
CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(index);
assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(index - 1)));
validateWaitForDataTierStep(phase, steps, index + 1, index + 2);
validateForceMergeClone(action.isForceMergeIndex(), steps);
validateForceMergeClone(action.isForceMergeIndex(), action.isForceMergeOnClone(), steps);
}

private void validateWaitForDataTierStep(String phase, List<Step> steps, int waitForDataTierStepIndex, int mountStepIndex) {
Expand All @@ -70,8 +75,8 @@ private void validateWaitForDataTierStep(String phase, List<Step> steps, int wai
/**
* Validate that the {@link ResizeIndexStep} used to clone the index for force merging configures the target index with 0 replicas.
*/
private void validateForceMergeClone(boolean isForceMergeIndex, List<Step> steps) {
if (isForceMergeIndex == false) {
private void validateForceMergeClone(boolean isForceMergeIndex, Boolean isForceMergeOnClone, List<Step> steps) {
if (isForceMergeIndex == false || (isForceMergeOnClone != null && isForceMergeOnClone == false)) {
return;
}
ResizeIndexStep cloneStep = (ResizeIndexStep) steps.stream()
Expand Down Expand Up @@ -114,24 +119,25 @@ public void testCreateWithInvalidTotalShardsPerNode() {

IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> new SearchableSnapshotAction("test", true, invalidTotalShardsPerNode, null)
() -> new SearchableSnapshotAction("test", true, invalidTotalShardsPerNode, null, null)
);
assertEquals("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1", exception.getMessage());
}

private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex, boolean hasReplicateFor) {
private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex, boolean hasReplicateFor, Boolean forceMergeOnClone) {
final var shouldForceMergeOnClone = forceMergeOnClone != null ? forceMergeOnClone : forceMergeIndex;
return Stream.of(
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP),
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME),
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN),
forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_CLONE_STEP) : null,
forceMergeIndex ? new StepKey(phase, NAME, ReadOnlyStep.NAME) : null,
forceMergeIndex ? new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME) : null,
forceMergeIndex ? new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME) : null,
forceMergeIndex ? new StepKey(phase, NAME, ResizeIndexStep.CLONE) : null,
forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.WAIT_FOR_CLONED_INDEX_GREEN) : null,
shouldForceMergeOnClone ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_CLONE_STEP) : null,
shouldForceMergeOnClone ? new StepKey(phase, NAME, ReadOnlyStep.NAME) : null,
shouldForceMergeOnClone ? new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME) : null,
shouldForceMergeOnClone ? new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME) : null,
shouldForceMergeOnClone ? new StepKey(phase, NAME, ResizeIndexStep.CLONE) : null,
shouldForceMergeOnClone ? new StepKey(phase, NAME, SearchableSnapshotAction.WAIT_FOR_CLONED_INDEX_GREEN) : null,
forceMergeIndex ? new StepKey(phase, NAME, ForceMergeStep.NAME) : null,
forceMergeIndex ? new StepKey(phase, NAME, SegmentCountStep.NAME) : null,
new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
Expand Down Expand Up @@ -170,43 +176,46 @@ protected Writeable.Reader<SearchableSnapshotAction> instanceReader() {

@Override
protected SearchableSnapshotAction mutateInstance(SearchableSnapshotAction instance) {
return switch (randomIntBetween(0, 3)) {
case 0 -> new SearchableSnapshotAction(
randomAlphaOfLengthBetween(5, 10),
instance.isForceMergeIndex(),
instance.getTotalShardsPerNode(),
instance.getReplicateFor()
);
case 1 -> new SearchableSnapshotAction(
instance.getSnapshotRepository(),
instance.isForceMergeIndex() == false,
instance.getTotalShardsPerNode(),
instance.getReplicateFor()
);
case 2 -> new SearchableSnapshotAction(
instance.getSnapshotRepository(),
instance.isForceMergeIndex(),
instance.getTotalShardsPerNode() == null ? 1 : instance.getTotalShardsPerNode() + randomIntBetween(1, 100),
instance.getReplicateFor()
);
case 3 -> new SearchableSnapshotAction(
instance.getSnapshotRepository(),
instance.isForceMergeIndex(),
instance.getTotalShardsPerNode(),
instance.getReplicateFor() == null
? TimeValue.timeValueDays(1)
: TimeValue.timeValueDays(instance.getReplicateFor().getDays() + randomIntBetween(1, 10))
);
var snapshotRepository = instance.getSnapshotRepository();
var forceMergeIndex = instance.isForceMergeIndex();
var totalShardsPerNode = instance.getTotalShardsPerNode();
var replicateFor = instance.getReplicateFor();
var forceMergeOnClone = instance.isForceMergeOnClone();
switch (randomIntBetween(0, 4)) {
case 0 -> snapshotRepository = randomAlphaOfLengthBetween(5, 10);
case 1 -> {
forceMergeIndex = forceMergeIndex == false;
if (forceMergeIndex == false) {
forceMergeOnClone = null;
}
}
case 2 -> totalShardsPerNode = totalShardsPerNode == null ? 1 : totalShardsPerNode + randomIntBetween(1, 100);
case 3 -> replicateFor = replicateFor == null
? TimeValue.timeValueDays(1)
: TimeValue.timeValueDays(replicateFor.getDays() + randomIntBetween(1, 10));
case 4 -> {
if (forceMergeOnClone == null) {
forceMergeOnClone = randomBoolean();
} else {
forceMergeOnClone = randomBoolean() ? null : forceMergeOnClone == false;
}
if (forceMergeOnClone != null) {
forceMergeIndex = true;
}
}
default -> throw new IllegalArgumentException("Invalid mutation branch");
};
}
return new SearchableSnapshotAction(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor, forceMergeOnClone);
}

static SearchableSnapshotAction randomInstance() {
final var forceMergeIndex = randomBoolean();
return new SearchableSnapshotAction(
randomAlphaOfLengthBetween(5, 10),
randomBoolean(),
forceMergeIndex,
(randomBoolean() ? null : randomIntBetween(1, 100)),
(randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 10)))
(randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 10))),
forceMergeIndex && randomBoolean() ? randomBoolean() : null
);
}
}
Loading