Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/112972.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 112972
summary: "ILM: Add `total_shards_per_node` setting to searchable snapshot"
area: ILM+SLM
type: enhancement
issues:
- 112261
8 changes: 6 additions & 2 deletions docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ index>> prefixed with `partial-` to the frozen tier. In other phases, the action

In the frozen tier, the action will ignore the setting
<<total-shards-per-node,`index.routing.allocation.total_shards_per_node`>>, if it was present in the original index,
to account for the difference in the number of nodes between the frozen and the other tiers.
to account for the difference in the number of nodes between the frozen and the other tiers. To set <<total-shards-per-node,`index.routing.allocation.total_shards_per_node`>> for searchable snapshots, set the `total_shards_per_node` option in ILM policy.


WARNING: Don't include the `searchable_snapshot` action in both the hot and cold
Expand Down Expand Up @@ -74,6 +74,9 @@ will be performed on the hot nodes. If using a `searchable_snapshot` action in t
force merge will be performed on whatever tier the index is *prior* to the `cold` phase (either
`hot` or `warm`).

`total_shards_per_node`::
The maximum number of shards (replicas and primaries) that will be allocated to a single node for the searchable snapshot index. Defaults to unbounded.

[[ilm-searchable-snapshot-ex]]
==== Examples
////
Expand All @@ -98,7 +101,8 @@ PUT _ilm/policy/my_policy
"cold": {
"actions": {
"searchable_snapshot" : {
"snapshot_repository" : "backing_repo"
"snapshot_repository" : "backing_repo",
"total_shards_per_node" : 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a pretty power-user sort of feature, so I think it'd be best not to have it in the example (I worry about unthinking copy-and-paste)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ static TransportVersion def(int id) {
public static final TransportVersion GLOBAL_RETENTION_TELEMETRY = def(8_740_00_0);
public static final TransportVersion ROUTING_TABLE_VERSION_REMOVED = def(8_741_00_0);
public static final TransportVersion ML_SCHEDULED_EVENT_TIME_SHIFT_CONFIGURATION = def(8_742_00_0);
public static final TransportVersion ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE = def(8_743_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -37,17 +39,34 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {

private final String restoredIndexPrefix;
private final MountSearchableSnapshotRequest.Storage storageType;
@Nullable
private final Integer totalShardsPerNode;

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType
MountSearchableSnapshotRequest.Storage storageType,
@Nullable Integer totalShardsPerNode
) {
super(key, nextStepKey, client);
this.restoredIndexPrefix = restoredIndexPrefix;
this.storageType = Objects.requireNonNull(storageType, "a storage type must be specified");
if (totalShardsPerNode != null && totalShardsPerNode < -1) {
throw new IllegalArgumentException("[totalShardsPerNode] must be >= -1");
}
this.totalShardsPerNode = totalShardsPerNode;
}

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType
) {
this(key, nextStepKey, client, restoredIndexPrefix, storageType, null);
}

@Override
Expand All @@ -63,6 +82,11 @@ public MountSearchableSnapshotRequest.Storage getStorage() {
return storageType;
}

@Nullable
public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
String indexName = indexMetadata.getIndex().getName();
Expand Down Expand Up @@ -140,6 +164,9 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
final Settings.Builder settingsBuilder = Settings.builder();

overrideTierPreference(this.getKey().phase()).ifPresent(override -> settingsBuilder.put(DataTier.TIER_PREFERENCE, override));
if (totalShardsPerNode != null) {
settingsBuilder.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), totalShardsPerNode);
}

final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest(
TimeValue.MAX_VALUE,
Expand All @@ -148,9 +175,9 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
snapshotName,
indexName,
settingsBuilder.build(),
ignoredIndexSettings(this.getKey().phase()),
ignoredIndexSettings(),
// we'll not wait for the snapshot to complete in this step as the async steps are executed from threads that shouldn't
// perform expensive operations (ie. clusterStateProcessed)
// perform expensive operations (i.e. clusterStateProcessed)
false,
storageType
);
Expand Down Expand Up @@ -198,23 +225,27 @@ static Optional<String> overrideTierPreference(String phase) {
* setting, the restored index would be captured by the ILM runner and, depending on what ILM execution state was captured at snapshot
* time, make it's way forward from _that_ step forward in the ILM policy. We'll re-set this setting on the restored index at a later
* step once we restored a deterministic execution state
* - index.routing.allocation.total_shards_per_node: It is likely that frozen tier has fewer nodes than the hot tier.
* Keeping this setting runs the risk that we will not have enough nodes to allocate all the shards in the
* frozen tier and the user does not have any way of fixing this. For this reason, we ignore this setting when moving to frozen.
* - index.routing.allocation.total_shards_per_node: It is likely that frozen tier has fewer nodes than the hot tier. If this setting
* is not specifically set in the frozen tier, keeping this setting runs the risk that we will not have enough nodes to
* allocate all the shards in the frozen tier and the user does not have any way of fixing this. For this reason, we ignore this
* setting when moving to frozen. We do not ignore this setting if it is specifically set in the mount searchable snapshot step
* of frozen tier.
*/
static String[] ignoredIndexSettings(String phase) {
String[] ignoredIndexSettings() {
ArrayList<String> ignoredSettings = new ArrayList<>();
ignoredSettings.add(LifecycleSettings.LIFECYCLE_NAME);
// if we are mounting a searchable snapshot in the hot phase, then we should not change the total_shards_per_node setting
if (TimeseriesLifecycleType.FROZEN_PHASE.equals(phase)) {
return new String[] {
LifecycleSettings.LIFECYCLE_NAME,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey() };
// if total_shards_per_node setting is specifically set for the frozen phase and not propagated from previous phase,
// then it should not be ignored
if (TimeseriesLifecycleType.FROZEN_PHASE.equals(this.getKey().phase()) && this.totalShardsPerNode == null) {
ignoredSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey());
}
return new String[] { LifecycleSettings.LIFECYCLE_NAME };
return ignoredSettings.toArray(new String[0]);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType);
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode);
}

@Override
Expand All @@ -228,6 +259,7 @@ public boolean equals(Object obj) {
MountSnapshotStep other = (MountSnapshotStep) obj;
return super.equals(obj)
&& Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix)
&& Objects.equals(storageType, other.storageType);
&& Objects.equals(storageType, other.storageType)
&& Objects.equals(totalShardsPerNode, other.totalShardsPerNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY;
Expand All @@ -49,6 +50,7 @@ public class SearchableSnapshotAction implements LifecycleAction {

public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node");
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites";
public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot";
Expand All @@ -58,12 +60,13 @@ public class SearchableSnapshotAction implements LifecycleAction {

private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(
NAME,
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1])
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_REPOSITORY);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), TOTAL_SHARDS_PER_NODE);
}

public static SearchableSnapshotAction parse(XContentParser parser) {
Expand All @@ -72,22 +75,36 @@ public static SearchableSnapshotAction parse(XContentParser parser) {

private final String snapshotRepository;
private final boolean forceMergeIndex;
@Nullable
private final Integer totalShardsPerNode;

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex, @Nullable Integer totalShardsPerNode) {
if (Strings.hasText(snapshotRepository) == false) {
throw new IllegalArgumentException("the snapshot repository must be specified");
}
this.snapshotRepository = snapshotRepository;
this.forceMergeIndex = forceMergeIndex;

if (totalShardsPerNode != null && totalShardsPerNode < -1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (totalShardsPerNode != null && totalShardsPerNode < -1) {
if (totalShardsPerNode != null && totalShardsPerNode < 1) {

(since we want to disallow -1 and 0)

throw new IllegalArgumentException("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= -1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= -1");
throw new IllegalArgumentException("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1");

}
this.totalShardsPerNode = totalShardsPerNode;
}

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
this(snapshotRepository, forceMergeIndex, null);
}

public SearchableSnapshotAction(String snapshotRepository) {
this(snapshotRepository, true);
this(snapshotRepository, true, null);
}

public SearchableSnapshotAction(StreamInput in) throws IOException {
this.snapshotRepository = in.readString();
this.forceMergeIndex = in.readBoolean();
this.totalShardsPerNode = in.getTransportVersion().onOrAfter(ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE)
? in.readOptionalInt()
: null;
}

boolean isForceMergeIndex() {
Expand All @@ -98,6 +115,10 @@ public String getSnapshotRepository() {
return snapshotRepository;
}

public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
assert false;
Expand Down Expand Up @@ -298,7 +319,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
waitForGreenRestoredIndexKey,
client,
getRestoredIndexPrefix(mountSnapshotKey),
storageType
storageType,
totalShardsPerNode
);
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(
waitForGreenRestoredIndexKey,
Expand Down Expand Up @@ -402,13 +424,19 @@ public String getWriteableName() {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(snapshotRepository);
out.writeBoolean(forceMergeIndex);
if (out.getTransportVersion().onOrAfter(ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE)) {
out.writeOptionalInt(totalShardsPerNode);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SNAPSHOT_REPOSITORY.getPreferredName(), snapshotRepository);
builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
if (totalShardsPerNode != null) {
builder.field(TOTAL_SHARDS_PER_NODE.getPreferredName(), totalShardsPerNode);
}
builder.endObject();
return builder;
}
Expand All @@ -422,12 +450,14 @@ public boolean equals(Object o) {
return false;
}
SearchableSnapshotAction that = (SearchableSnapshotAction) o;
return Objects.equals(snapshotRepository, that.snapshotRepository) && Objects.equals(forceMergeIndex, that.forceMergeIndex);
return Objects.equals(snapshotRepository, that.snapshotRepository)
&& Objects.equals(forceMergeIndex, that.forceMergeIndex)
&& Objects.equals(totalShardsPerNode, that.totalShardsPerNode);
}

@Override
public int hashCode() {
return Objects.hash(snapshotRepository, forceMergeIndex);
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
frozenTime,
Collections.singletonMap(
SearchableSnapshotAction.NAME,
new SearchableSnapshotAction(randomAlphaOfLength(10), randomBoolean())
new SearchableSnapshotAction(randomAlphaOfLength(10), randomBoolean(), randomIntBetween(-1, 100))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is not a valid setting (it would be like saying "you can allocate zero copies of the data to the node"), so I think this would be better as something like:

Suggested change
new SearchableSnapshotAction(randomAlphaOfLength(10), randomBoolean(), randomIntBetween(-1, 100))
new SearchableSnapshotAction(randomAlphaOfLength(10), randomBoolean(), (randomBoolean() ? null : randomIntBetween(1, 100)))

)
)
);
Expand Down
Loading