Skip to content

Commit 0ab3240

Browse files
authored
Allow opting out of force-merging on a cloned index in ILM's searchable snapshot action (#137375)
In #133954, we modified ILM's searchable snapshot action to perform the force-merge on a clone of the index with 0 replicas. This optimization avoids performing the force-merge redundantly on replicas, as the subsequent snapshot operation only looks at primary shards. We've seen some cases where cloning the index resulted in issues; there was a bug in the clone API that caused shards to be initializing permanently under specific circumstances (fixed by #137096), and cloned shards are unable to be assigned if their source lives on a node that is close/past the low watermark disk threshold (will be fixed soon by the Distributed Coordination team). Therefore, we implement an opt-out flag that users can configure in the `searchable_snapshot` action of their ILM policy if they don't want to clone the index with 0 replicas before performing the force-merge. We implement an opt-out instead of an opt-in, as we believe these issues to be rather specific (and soon resolved), and the clone is worth doing by default.
1 parent fa252fa commit 0ab3240

File tree

12 files changed

+214
-66
lines changed

12 files changed

+214
-66
lines changed

docs/changelog/137375.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 137375
2+
summary: Allow opting out of force-merging on a cloned index in ILM's searchable snapshot
3+
action
4+
area: ILM+SLM
5+
type: enhancement
6+
issues: []

docs/reference/elasticsearch/index-lifecycle-actions/ilm-searchable-snapshot.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ By default, this snapshot is deleted by the [delete action](/reference/elasticse
4646

4747
This force merging occurs in the phase that the index is in **prior** to the `searchable_snapshot` action. For example, if using a `searchable_snapshot` action in the `hot` phase, the force merge will be performed on the hot nodes. If using a `searchable_snapshot` action in the `cold` phase, the force merge will be performed on whatever tier the index is **prior** to the `cold` phase (either `hot` or `warm`).
4848

49+
`force_merge_on_clone` {applies_to}`stack: ga 9.2.1`
50+
: (Optional, Boolean) By default, if `force_merge_index` is `true`, the index will first be cloned with 0 replicas and the force-merge will be performed on the clone before the searchable snapshot is created. This avoids performing the force-merge redundantly on replica shards, as the snapshot operation only uses primary shards. Setting this option to `false` will skip the clone step and perform the force-merge directly on the managed index. Defaults to `true`.
51+
4952
`total_shards_per_node`
50-
: The maximum number of shards (replicas and primaries) that will be allocated to a single node for the searchable snapshot index. Defaults to unbounded.
53+
: (Optional, Integer) The maximum number of shards (replicas and primaries) that will be allocated to a single node for the searchable snapshot index. Defaults to unbounded.
5154

5255

5356
## Examples [ilm-searchable-snapshot-ex]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9209000,9185006
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
esql_resolve_fields_response_used,9185005
1+
ilm_searchable_snapshot_opt_out_clone,9185006
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
aggregation_window,9208000
1+
ilm_searchable_snapshot_opt_out_clone,9209000

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.TransportVersion;
1112
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
1314
import org.elasticsearch.client.internal.Client;
@@ -58,6 +59,12 @@ public class SearchableSnapshotAction implements LifecycleAction {
5859
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
5960
public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node");
6061
public static final ParseField REPLICATE_FOR = new ParseField("replicate_for");
62+
public static final ParseField FORCE_MERGE_ON_CLONE = new ParseField("force_merge_on_clone");
63+
64+
private static final TransportVersion FORCE_MERGE_ON_CLONE_TRANSPORT_VERSION = TransportVersion.fromName(
65+
"ilm_searchable_snapshot_opt_out_clone"
66+
);
67+
6168
public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites";
6269
public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot";
6370
public static final String CONDITIONAL_SKIP_CLONE_STEP = BranchingStep.NAME + "-skip-clone-check";
@@ -87,7 +94,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
8794

8895
private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(
8996
NAME,
90-
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2], (TimeValue) a[3])
97+
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2], (TimeValue) a[3], (Boolean) a[4])
9198
);
9299

93100
static {
@@ -100,6 +107,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
100107
REPLICATE_FOR,
101108
ObjectParser.ValueType.STRING
102109
);
110+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_ON_CLONE);
103111
}
104112

105113
public static SearchableSnapshotAction parse(XContentParser parser) {
@@ -112,12 +120,16 @@ public static SearchableSnapshotAction parse(XContentParser parser) {
112120
private final Integer totalShardsPerNode;
113121
@Nullable
114122
private final TimeValue replicateFor;
123+
/** Opt-out field for forcing the force-merge step to run on the source index instead of a cloned version with 0 replicas. */
124+
@Nullable
125+
private final Boolean forceMergeOnClone;
115126

116127
public SearchableSnapshotAction(
117128
String snapshotRepository,
118129
boolean forceMergeIndex,
119130
@Nullable Integer totalShardsPerNode,
120-
@Nullable TimeValue replicateFor
131+
@Nullable TimeValue replicateFor,
132+
@Nullable Boolean forceMergeOnClone
121133
) {
122134
if (Strings.hasText(snapshotRepository) == false) {
123135
throw new IllegalArgumentException("the snapshot repository must be specified");
@@ -136,21 +148,35 @@ public SearchableSnapshotAction(
136148
);
137149
}
138150
this.replicateFor = replicateFor;
151+
152+
if (forceMergeIndex == false && forceMergeOnClone != null) {
153+
throw new IllegalArgumentException(
154+
Strings.format(
155+
"[%s] is not allowed when [%s] is [false]",
156+
FORCE_MERGE_ON_CLONE.getPreferredName(),
157+
FORCE_MERGE_INDEX.getPreferredName()
158+
)
159+
);
160+
}
161+
this.forceMergeOnClone = forceMergeOnClone;
139162
}
140163

141164
public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
142-
this(snapshotRepository, forceMergeIndex, null, null);
165+
this(snapshotRepository, forceMergeIndex, null, null, null);
143166
}
144167

145168
public SearchableSnapshotAction(String snapshotRepository) {
146-
this(snapshotRepository, true, null, null);
169+
this(snapshotRepository, true, null, null, null);
147170
}
148171

149172
public SearchableSnapshotAction(StreamInput in) throws IOException {
150173
this.snapshotRepository = in.readString();
151174
this.forceMergeIndex = in.readBoolean();
152175
this.totalShardsPerNode = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalInt() : null;
153176
this.replicateFor = in.getTransportVersion().supports(TransportVersions.V_8_18_0) ? in.readOptionalTimeValue() : null;
177+
this.forceMergeOnClone = in.getTransportVersion().supports(FORCE_MERGE_ON_CLONE_TRANSPORT_VERSION)
178+
? in.readOptionalBoolean()
179+
: null;
154180
}
155181

156182
public boolean isForceMergeIndex() {
@@ -171,6 +197,11 @@ public TimeValue getReplicateFor() {
171197
return replicateFor;
172198
}
173199

200+
@Nullable
201+
public Boolean isForceMergeOnClone() {
202+
return forceMergeOnClone;
203+
}
204+
174205
@Override
175206
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
176207
assert false;
@@ -299,9 +330,12 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
299330
Instant::now
300331
);
301332

333+
// We force-merge on the clone by default, but allow the user to opt-out of this behavior if there is any reason why they don't want
334+
// to clone the index (e.g. if something is preventing the cloned index shards from being assigned).
335+
StepKey keyForForceMerge = shouldForceMergeOnClone() ? conditionalSkipCloneKey : forceMergeStepKey;
302336
// When generating a snapshot, we either jump to the force merge section, or we skip the
303337
// forcemerge and go straight to steps for creating the snapshot
304-
StepKey keyForSnapshotGeneration = forceMergeIndex ? conditionalSkipCloneKey : generateSnapshotNameKey;
338+
StepKey keyForSnapshotGeneration = forceMergeIndex ? keyForForceMerge : generateSnapshotNameKey;
305339
// Branch, deciding whether there is an existing searchable snapshot that can be used for mounting the index
306340
// (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot
307341
BranchingStep skipGeneratingSnapshotStep = new BranchingStep(
@@ -529,12 +563,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
529563
steps.add(waitUntilTimeSeriesEndTimeStep);
530564
steps.add(skipGeneratingSnapshotStep);
531565
if (forceMergeIndex) {
532-
steps.add(conditionalSkipCloneStep);
533-
steps.add(readOnlyStep);
534-
steps.add(cleanupClonedIndexStep);
535-
steps.add(generateCloneIndexNameStep);
536-
steps.add(cloneIndexStep);
537-
steps.add(waitForClonedIndexGreenStep);
566+
if (shouldForceMergeOnClone()) {
567+
steps.add(conditionalSkipCloneStep);
568+
steps.add(readOnlyStep);
569+
steps.add(cleanupClonedIndexStep);
570+
steps.add(generateCloneIndexNameStep);
571+
steps.add(cloneIndexStep);
572+
steps.add(waitForClonedIndexGreenStep);
573+
}
538574
steps.add(forceMergeStep);
539575
steps.add(segmentCountStep);
540576
}
@@ -581,6 +617,15 @@ static MountSearchableSnapshotRequest.Storage getConcreteStorageType(StepKey cur
581617
}
582618
}
583619

620+
/**
621+
* Returns whether we should first clone the index and perform the force-merge on that cloned index (true) or force-merge on the
622+
* original index (false). Defaults to true when {@link #forceMergeOnClone} is null/unspecified. Note that this value is ignored when
623+
* {@link #forceMergeIndex} is false.
624+
*/
625+
private boolean shouldForceMergeOnClone() {
626+
return forceMergeOnClone == null || forceMergeOnClone;
627+
}
628+
584629
@Override
585630
public boolean isSafeAction() {
586631
return true;
@@ -601,6 +646,9 @@ public void writeTo(StreamOutput out) throws IOException {
601646
if (out.getTransportVersion().supports(TransportVersions.V_8_18_0)) {
602647
out.writeOptionalTimeValue(replicateFor);
603648
}
649+
if (out.getTransportVersion().supports(FORCE_MERGE_ON_CLONE_TRANSPORT_VERSION)) {
650+
out.writeOptionalBoolean(forceMergeOnClone);
651+
}
604652
}
605653

606654
@Override
@@ -614,6 +662,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
614662
if (replicateFor != null) {
615663
builder.field(REPLICATE_FOR.getPreferredName(), replicateFor);
616664
}
665+
if (forceMergeOnClone != null) {
666+
builder.field(FORCE_MERGE_ON_CLONE.getPreferredName(), forceMergeOnClone);
667+
}
617668
builder.endObject();
618669
return builder;
619670
}
@@ -630,12 +681,13 @@ public boolean equals(Object o) {
630681
return Objects.equals(snapshotRepository, that.snapshotRepository)
631682
&& Objects.equals(forceMergeIndex, that.forceMergeIndex)
632683
&& Objects.equals(totalShardsPerNode, that.totalShardsPerNode)
633-
&& Objects.equals(replicateFor, that.replicateFor);
684+
&& Objects.equals(replicateFor, that.replicateFor)
685+
&& Objects.equals(forceMergeOnClone, that.forceMergeOnClone);
634686
}
635687

636688
@Override
637689
public int hashCode() {
638-
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor);
690+
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor, forceMergeOnClone);
639691
}
640692

641693
@Nullable

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
222222
randomAlphaOfLength(10),
223223
randomBoolean(),
224224
(randomBoolean() ? null : randomIntBetween(1, 100)),
225+
null,
225226
null
226227
)
227228
)

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ public void testToSteps() {
3737

3838
List<Step> steps = action.toSteps(null, phase, nextStepKey, null);
3939

40-
List<StepKey> expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex(), action.getReplicateFor() != null);
40+
List<StepKey> expectedSteps = expectedStepKeys(
41+
phase,
42+
action.isForceMergeIndex(),
43+
action.getReplicateFor() != null,
44+
action.isForceMergeOnClone()
45+
);
4146

4247
assertThat(steps.size(), is(expectedSteps.size()));
4348
for (int i = 0; i < expectedSteps.size(); i++) {
@@ -54,7 +59,7 @@ public void testToSteps() {
5459
CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(index);
5560
assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(index - 1)));
5661
validateWaitForDataTierStep(phase, steps, index + 1, index + 2);
57-
validateForceMergeClone(action.isForceMergeIndex(), steps);
62+
validateForceMergeClone(action.isForceMergeIndex(), action.isForceMergeOnClone(), steps);
5863
}
5964

6065
private void validateWaitForDataTierStep(String phase, List<Step> steps, int waitForDataTierStepIndex, int mountStepIndex) {
@@ -70,8 +75,8 @@ private void validateWaitForDataTierStep(String phase, List<Step> steps, int wai
7075
/**
7176
* Validate that the {@link ResizeIndexStep} used to clone the index for force merging configures the target index with 0 replicas.
7277
*/
73-
private void validateForceMergeClone(boolean isForceMergeIndex, List<Step> steps) {
74-
if (isForceMergeIndex == false) {
78+
private void validateForceMergeClone(boolean isForceMergeIndex, Boolean isForceMergeOnClone, List<Step> steps) {
79+
if (isForceMergeIndex == false || (isForceMergeOnClone != null && isForceMergeOnClone == false)) {
7580
return;
7681
}
7782
ResizeIndexStep cloneStep = (ResizeIndexStep) steps.stream()
@@ -114,24 +119,25 @@ public void testCreateWithInvalidTotalShardsPerNode() {
114119

115120
IllegalArgumentException exception = expectThrows(
116121
IllegalArgumentException.class,
117-
() -> new SearchableSnapshotAction("test", true, invalidTotalShardsPerNode, null)
122+
() -> new SearchableSnapshotAction("test", true, invalidTotalShardsPerNode, null, null)
118123
);
119124
assertEquals("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1", exception.getMessage());
120125
}
121126

122-
private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex, boolean hasReplicateFor) {
127+
private List<StepKey> expectedStepKeys(String phase, boolean forceMergeIndex, boolean hasReplicateFor, Boolean forceMergeOnClone) {
128+
final var shouldForceMergeOnClone = forceMergeOnClone != null ? forceMergeOnClone : forceMergeIndex;
123129
return Stream.of(
124130
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP),
125131
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
126132
new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
127133
new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME),
128134
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN),
129-
forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_CLONE_STEP) : null,
130-
forceMergeIndex ? new StepKey(phase, NAME, ReadOnlyStep.NAME) : null,
131-
forceMergeIndex ? new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME) : null,
132-
forceMergeIndex ? new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME) : null,
133-
forceMergeIndex ? new StepKey(phase, NAME, ResizeIndexStep.CLONE) : null,
134-
forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.WAIT_FOR_CLONED_INDEX_GREEN) : null,
135+
shouldForceMergeOnClone ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_CLONE_STEP) : null,
136+
shouldForceMergeOnClone ? new StepKey(phase, NAME, ReadOnlyStep.NAME) : null,
137+
shouldForceMergeOnClone ? new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME) : null,
138+
shouldForceMergeOnClone ? new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME) : null,
139+
shouldForceMergeOnClone ? new StepKey(phase, NAME, ResizeIndexStep.CLONE) : null,
140+
shouldForceMergeOnClone ? new StepKey(phase, NAME, SearchableSnapshotAction.WAIT_FOR_CLONED_INDEX_GREEN) : null,
135141
forceMergeIndex ? new StepKey(phase, NAME, ForceMergeStep.NAME) : null,
136142
forceMergeIndex ? new StepKey(phase, NAME, SegmentCountStep.NAME) : null,
137143
new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
@@ -170,43 +176,46 @@ protected Writeable.Reader<SearchableSnapshotAction> instanceReader() {
170176

171177
@Override
172178
protected SearchableSnapshotAction mutateInstance(SearchableSnapshotAction instance) {
173-
return switch (randomIntBetween(0, 3)) {
174-
case 0 -> new SearchableSnapshotAction(
175-
randomAlphaOfLengthBetween(5, 10),
176-
instance.isForceMergeIndex(),
177-
instance.getTotalShardsPerNode(),
178-
instance.getReplicateFor()
179-
);
180-
case 1 -> new SearchableSnapshotAction(
181-
instance.getSnapshotRepository(),
182-
instance.isForceMergeIndex() == false,
183-
instance.getTotalShardsPerNode(),
184-
instance.getReplicateFor()
185-
);
186-
case 2 -> new SearchableSnapshotAction(
187-
instance.getSnapshotRepository(),
188-
instance.isForceMergeIndex(),
189-
instance.getTotalShardsPerNode() == null ? 1 : instance.getTotalShardsPerNode() + randomIntBetween(1, 100),
190-
instance.getReplicateFor()
191-
);
192-
case 3 -> new SearchableSnapshotAction(
193-
instance.getSnapshotRepository(),
194-
instance.isForceMergeIndex(),
195-
instance.getTotalShardsPerNode(),
196-
instance.getReplicateFor() == null
197-
? TimeValue.timeValueDays(1)
198-
: TimeValue.timeValueDays(instance.getReplicateFor().getDays() + randomIntBetween(1, 10))
199-
);
179+
var snapshotRepository = instance.getSnapshotRepository();
180+
var forceMergeIndex = instance.isForceMergeIndex();
181+
var totalShardsPerNode = instance.getTotalShardsPerNode();
182+
var replicateFor = instance.getReplicateFor();
183+
var forceMergeOnClone = instance.isForceMergeOnClone();
184+
switch (randomIntBetween(0, 4)) {
185+
case 0 -> snapshotRepository = randomAlphaOfLengthBetween(5, 10);
186+
case 1 -> {
187+
forceMergeIndex = forceMergeIndex == false;
188+
if (forceMergeIndex == false) {
189+
forceMergeOnClone = null;
190+
}
191+
}
192+
case 2 -> totalShardsPerNode = totalShardsPerNode == null ? 1 : totalShardsPerNode + randomIntBetween(1, 100);
193+
case 3 -> replicateFor = replicateFor == null
194+
? TimeValue.timeValueDays(1)
195+
: TimeValue.timeValueDays(replicateFor.getDays() + randomIntBetween(1, 10));
196+
case 4 -> {
197+
if (forceMergeOnClone == null) {
198+
forceMergeOnClone = randomBoolean();
199+
} else {
200+
forceMergeOnClone = randomBoolean() ? null : forceMergeOnClone == false;
201+
}
202+
if (forceMergeOnClone != null) {
203+
forceMergeIndex = true;
204+
}
205+
}
200206
default -> throw new IllegalArgumentException("Invalid mutation branch");
201-
};
207+
}
208+
return new SearchableSnapshotAction(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor, forceMergeOnClone);
202209
}
203210

204211
static SearchableSnapshotAction randomInstance() {
212+
final var forceMergeIndex = randomBoolean();
205213
return new SearchableSnapshotAction(
206214
randomAlphaOfLengthBetween(5, 10),
207-
randomBoolean(),
215+
forceMergeIndex,
208216
(randomBoolean() ? null : randomIntBetween(1, 100)),
209-
(randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 10)))
217+
(randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 10))),
218+
forceMergeIndex && randomBoolean() ? randomBoolean() : null
210219
);
211220
}
212221
}

0 commit comments

Comments
 (0)