Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119003.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119003
summary: Add a `replicate_for` option to the ILM `searchable_snapshot` action
area: ILM+SLM
type: enhancement
issues: []
48 changes: 48 additions & 0 deletions docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ snapshot retention runs off the index lifecycle management (ILM) policies and is
(Required, string)
<<snapshots-register-repository,Repository>> used to store the snapshot.

`replicate_for`::
(Optional, TimeValue)
By default, searchable snapshot indices are mounted without replicas. Using this will
result in a searchable snapshot index being mounted with a single replica for the time period
specified, after which the replica will be removed. This option is only permitted on the
first searchable snapshot action of a policy.

`force_merge_index`::
(Optional, Boolean)
Force merges the managed index to one segment.
Expand Down Expand Up @@ -109,3 +116,44 @@ PUT _ilm/policy/my_policy
}
}
--------------------------------------------------

[[ilm-searchable-snapshot-replicate-for-ex]]
===== Mount a searchable snapshot with replicas for fourteen days

This policy mounts a searchable snapshot in the hot phase with a
single replica and maintains that replica for fourteen days. After
that time has elapsed, the searchable snapshot index will remain (with
no replicas) for another fourteen days, at which point it will proceed
into the delete phase and will be deleted.

[source,console]
--------------------------------------------------
PUT _ilm/policy/my_policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover" : {
"max_primary_shard_size": "50gb"
},
"searchable_snapshot" : {
"snapshot_repository" : "backing_repo",
"replicate_for": "14d"
}
}
},
"delete": {
"min_age": "28d",
"actions": {
"delete" : { }
}
}
}
}
}
--------------------------------------------------

[NOTE]
If the `replicate_for` option is specified, its value must be
less than the minimum age of the next phase in the policy.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ADD_INCLUDE_FAILURE_INDICES_OPTION = def(8_831_00_0);
public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_832_00_0);
public static final TransportVersion RANK_DOC_OPTIONAL_METADATA_FOR_EXPLAIN = def(8_833_00_0);
public static final TransportVersion ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR = def(8_834_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
Expand All @@ -33,6 +36,7 @@
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY;
Expand All @@ -51,6 +55,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node");
public static final ParseField REPLICATE_FOR = new ParseField("replicate_for");
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites";
public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot";
Expand All @@ -60,13 +65,19 @@ public class SearchableSnapshotAction implements LifecycleAction {

private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(
NAME,
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2])
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2], (TimeValue) a[3])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_REPOSITORY);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), TOTAL_SHARDS_PER_NODE);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeValue.parseTimeValue(p.textOrNull(), REPLICATE_FOR.getPreferredName()),
REPLICATE_FOR,
ObjectParser.ValueType.STRING
);
}

public static SearchableSnapshotAction parse(XContentParser parser) {
Expand All @@ -77,8 +88,15 @@ public static SearchableSnapshotAction parse(XContentParser parser) {
private final boolean forceMergeIndex;
@Nullable
private final Integer totalShardsPerNode;

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex, @Nullable Integer totalShardsPerNode) {
@Nullable
private final TimeValue replicateFor;

public SearchableSnapshotAction(
String snapshotRepository,
boolean forceMergeIndex,
@Nullable Integer totalShardsPerNode,
@Nullable TimeValue replicateFor
) {
if (Strings.hasText(snapshotRepository) == false) {
throw new IllegalArgumentException("the snapshot repository must be specified");
}
Expand All @@ -89,20 +107,30 @@ public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeInd
throw new IllegalArgumentException("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1");
}
this.totalShardsPerNode = totalShardsPerNode;

if (replicateFor != null && replicateFor.millis() <= 0) {
throw new IllegalArgumentException(
"[" + REPLICATE_FOR.getPreferredName() + "] must be positive [" + replicateFor.getStringRep() + "]"
);
}
this.replicateFor = replicateFor;
}

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
this(snapshotRepository, forceMergeIndex, null);
this(snapshotRepository, forceMergeIndex, null, null);
}

public SearchableSnapshotAction(String snapshotRepository) {
this(snapshotRepository, true, null);
this(snapshotRepository, true, null, null);
}

public SearchableSnapshotAction(StreamInput in) throws IOException {
this.snapshotRepository = in.readString();
this.forceMergeIndex = in.readBoolean();
this.totalShardsPerNode = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalInt() : null;
this.replicateFor = in.getTransportVersion().onOrAfter(ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR)
? in.readOptionalTimeValue()
: null;
}

boolean isForceMergeIndex() {
Expand All @@ -118,6 +146,11 @@ public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

@Nullable
public TimeValue getReplicateFor() {
return replicateFor;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
assert false;
Expand Down Expand Up @@ -145,6 +178,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);
StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
StepKey replicateForKey = new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME);
StepKey dropReplicasKey = new StepKey(phase, NAME, UpdateSettingsStep.NAME);

// Before going through all these steps, first check if we need to do them at all. For example, the index could already be
// a searchable snapshot of the same type and repository, in which case we don't need to do anything. If that is detected,
Expand Down Expand Up @@ -319,19 +354,20 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
getRestoredIndexPrefix(mountSnapshotKey),
storageType,
totalShardsPerNode,
0
replicateFor != null ? 1 : 0 // if the 'replicate_for' option is set, then have a replica, otherwise don't
);
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(
waitForGreenRestoredIndexKey,
copyMetadataKey,
ClusterHealthStatus.GREEN,
getRestoredIndexPrefix(waitForGreenRestoredIndexKey)
);
StepKey keyForReplicateForOrContinue = replicateFor != null ? replicateForKey : nextStepKey;
CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep(
copyMetadataKey,
copyLifecyclePolicySettingKey,
(index, executionState) -> getRestoredIndexPrefix(copyMetadataKey) + index,
nextStepKey
keyForReplicateForOrContinue
);
CopySettingsStep copySettingsStep = new CopySettingsStep(
copyLifecyclePolicySettingKey,
Expand Down Expand Up @@ -364,6 +400,16 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
getRestoredIndexPrefix(swapAliasesKey)
);

// note that the replicateForStep and dropReplicasStep will only be used if replicateFor != null, see the construction of
// the list of steps below
Step replicateForStep = new WaitUntilReplicateForTimePassesStep(replicateForKey, dropReplicasKey, replicateFor);
UpdateSettingsStep dropReplicasStep = new UpdateSettingsStep(
dropReplicasKey,
nextStepKey,
client,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

List<Step> steps = new ArrayList<>();
steps.add(conditionalSkipActionStep);
steps.add(checkNoWriteIndexStep);
Expand All @@ -382,6 +428,10 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
steps.add(waitForGreenIndexHealthStep);
steps.add(copyMetadataStep);
steps.add(copySettingsStep);
if (replicateFor != null) {
steps.add(replicateForStep);
steps.add(dropReplicasStep);
}
steps.add(isDataStreamBranchingStep);
steps.add(replaceDataStreamBackingIndex);
steps.add(deleteSourceIndexStep);
Expand Down Expand Up @@ -426,6 +476,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeOptionalInt(totalShardsPerNode);
}
if (out.getTransportVersion().onOrAfter(ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR)) {
out.writeOptionalTimeValue(replicateFor);
}
}

@Override
Expand All @@ -436,6 +489,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (totalShardsPerNode != null) {
builder.field(TOTAL_SHARDS_PER_NODE.getPreferredName(), totalShardsPerNode);
}
if (replicateFor != null) {
builder.field(REPLICATE_FOR.getPreferredName(), replicateFor);
}
builder.endObject();
return builder;
}
Expand All @@ -451,12 +507,13 @@ public boolean equals(Object o) {
SearchableSnapshotAction that = (SearchableSnapshotAction) o;
return Objects.equals(snapshotRepository, that.snapshotRepository)
&& Objects.equals(forceMergeIndex, that.forceMergeIndex)
&& Objects.equals(totalShardsPerNode, that.totalShardsPerNode);
&& Objects.equals(totalShardsPerNode, that.totalShardsPerNode)
&& Objects.equals(replicateFor, that.replicateFor);
}

@Override
public int hashCode() {
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode);
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode, replicateFor);
}

@Nullable
Expand Down
Loading