Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133954.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133954
summary: "ILM: Force merge on zero-replica cloned index before snapshot"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps mention that this is for the searchable snapshot step here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to ILM: Force merge on zero-replica cloned index before snapshotting for searchable snapshots. Let me know if that matches what you had in mind.

area: ILM+SLM
type: enhancement
issues:
- 75478
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public record LifecycleExecutionState(
String snapshotName,
String shrinkIndexName,
String snapshotIndexName,
String downsampleIndexName
String downsampleIndexName,
String forceMergeIndexName
) {

public static final String ILM_CUSTOM_METADATA_KEY = "ilm";
Expand All @@ -64,6 +65,7 @@ public record LifecycleExecutionState(
private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
private static final String SHRINK_INDEX_NAME = "shrink_index_name";
private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name";
private static final String FORCE_MERGE_INDEX_NAME = "force_merge_index_name";

public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build();

Expand All @@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) {
.setShrinkIndexName(state.shrinkIndexName)
.setSnapshotIndexName(state.snapshotIndexName)
.setDownsampleIndexName(state.downsampleIndexName)
.setStepTime(state.stepTime);
.setStepTime(state.stepTime)
.setForceMergeIndexName(state.forceMergeIndexName);
}

public static LifecycleExecutionState fromCustomMetadata(Map<String, String> customData) {
Expand Down Expand Up @@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map<String, String> cus
if (downsampleIndexName != null) {
builder.setDownsampleIndexName(downsampleIndexName);
}
String forceMergeIndexName = customData.get(FORCE_MERGE_INDEX_NAME);
if (forceMergeIndexName != null) {
builder.setForceMergeIndexName(forceMergeIndexName);
}
return builder.build();
}

Expand Down Expand Up @@ -274,6 +281,9 @@ public Map<String, String> asMap() {
if (downsampleIndexName != null) {
result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName);
}
if (forceMergeIndexName != null) {
result.put(FORCE_MERGE_INDEX_NAME, forceMergeIndexName);
}
return Collections.unmodifiableMap(result);
}

Expand Down Expand Up @@ -307,6 +317,7 @@ public static class Builder {
private String shrinkIndexName;
private String snapshotIndexName;
private String downsampleIndexName;
private String forceMergeIndexName;

public Builder setPhase(String phase) {
this.phase = phase;
Expand Down Expand Up @@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) {
return this;
}

public Builder setForceMergeIndexName(String forceMergeIndexName) {
this.forceMergeIndexName = forceMergeIndexName;
return this;
}

public LifecycleExecutionState build() {
return new LifecycleExecutionState(
phase,
Expand All @@ -417,7 +433,8 @@ public LifecycleExecutionState build() {
snapshotName,
shrinkIndexName,
snapshotIndexName,
downsampleIndexName
downsampleIndexName,
forceMergeIndexName
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,10 @@ protected static void awaitIndexExists(String index, TimeValue timeout) throws I
ensureHealth(client(), index, request -> request.addParameter("timeout", timeout.toString()));
}

protected static void awaitIndexDoesNotExist(String index) throws Exception {
awaitIndexDoesNotExist(index, TimeValue.timeValueSeconds(10));
}

protected static void awaitIndexDoesNotExist(String index, TimeValue timeout) throws Exception {
assertBusy(() -> assertFalse(indexExists(index)), timeout.millis(), TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
);
return;
}
// If we performed the force merge step on the cloned index, we need to snapshot that index instead of the original.
final String clonedIndexName = lifecycleState.forceMergeIndexName();
final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexName;
CreateSnapshotRequest request = new CreateSnapshotRequest(TimeValue.MAX_VALUE, snapshotRepository, snapshotName);
request.indices(indexName);
request.indices(forceMergedIndexName);
// this is safe as the snapshot creation will still be async, it's just that the listener will be notified when the snapshot is
// complete
request.waitForCompletion(true);
Expand All @@ -112,7 +115,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
logger.debug(
"create snapshot response for policy [{}] and index [{}] is: {}",
policyName,
indexName,
forceMergedIndexName,
Strings.toString(response)
);
final SnapshotInfo snapInfo = response.getSnapshotInfo();
Expand All @@ -128,7 +131,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
snapshotRepository,
snapshotName,
policyName,
indexName,
forceMergedIndexName,
snapInfo.failedShards(),
snapInfo.totalShards()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,54 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;

import java.util.function.BiFunction;

/**
* Deletes a single index.
*/
public class DeleteStep extends AsyncRetryDuringSnapshotActionStep {

public static final String NAME = "delete";
private static final Logger logger = LogManager.getLogger(DeleteStep.class);
private static final BiFunction<String, LifecycleExecutionState, String> DEFAULT_TARGET_INDEX_NAME_SUPPLIER = (
indexName,
lifecycleState) -> indexName;

private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;
private final boolean indexSurvives;

/**
* Use this constructor to delete the index that ILM is currently operating on.
*/
public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
this(key, nextStepKey, client, DEFAULT_TARGET_INDEX_NAME_SUPPLIER, false);
}

/**
* Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on.
*/
public DeleteStep(
StepKey key,
StepKey nextStepKey,
Client client,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
boolean indexSurvives
) {
super(key, nextStepKey, client);
this.targetIndexNameSupplier = targetIndexNameSupplier;
this.indexSurvives = indexSurvives;
}

@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
String policyName = indexMetadata.getLifecyclePolicyName();
String indexName = indexMetadata.getIndex().getName();
String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState());
IndexAbstraction indexAbstraction = currentProject.getIndicesLookup().get(indexName);
assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found";
DataStream dataStream = indexAbstraction.getParentDataStream();
Expand Down Expand Up @@ -88,7 +116,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata

@Override
public boolean indexSurvives() {
return false;
return indexSurvives;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public void performAction(
ClusterStateObserver observer,
ActionListener<Void> listener
) {
String indexName = indexMetadata.getIndex().getName();
// Use the cloned index name if we have one, otherwise fall back to the original index name.
String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
String indexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName();
ForceMergeRequest request = new ForceMergeRequest(indexName);
request.maxNumSegments(maxNumSegments);
getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ public ProjectState performAction(Index index, ProjectState projectState) {
+ "] cannot continue until the repository is created or the policy is changed"
);
}
// If we performed the force merge step on the cloned index, we perform the snapshot on that index instead of the original.
final String clonedIndexName = lifecycleState.forceMergeIndexName();
final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : index.getName();

LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState);
newLifecycleState.setSnapshotIndexName(index.getName());
newLifecycleState.setSnapshotIndexName(forceMergedIndexName);
newLifecycleState.setSnapshotRepository(snapshotRepository);
if (lifecycleState.snapshotName() == null) {
// generate and validate the snapshotName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
private static final ParseField SHRINK_INDEX_NAME = new ParseField("shrink_index_name");
private static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name");
private static final ParseField SKIP_NAME = new ParseField("skip");
private static final ParseField FORCE_MERGE_INDEX_NAME = new ParseField("force_merge_index_name");

public static final ConstructingObjectParser<IndexLifecycleExplainResponse, Void> PARSER = new ConstructingObjectParser<>(
"index_lifecycle_explain_response",
Expand All @@ -81,7 +82,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
(BytesReference) a[11],
(BytesReference) a[21],
(PhaseExecutionInfo) a[12],
Objects.requireNonNullElse((Boolean) a[22], false)
Objects.requireNonNullElse((Boolean) a[22], false),
(String) a[24]
// a[13] == "age"
// a[20] == "time_since_index_creation"
// a[23] = "age_in_millis"
Expand Down Expand Up @@ -124,6 +126,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
}, PREVIOUS_STEP_INFO_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SKIP_NAME);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), AGE_IN_MILLIS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX_NAME);
}

private final String index;
Expand All @@ -147,6 +150,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
private final String snapshotName;
private final String shrinkIndexName;
private final boolean skip;
private final String forceMergeIndexName;

Supplier<Long> nowSupplier = System::currentTimeMillis; // Can be changed for testing

Expand All @@ -170,7 +174,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse(
BytesReference stepInfo,
BytesReference previousStepInfo,
PhaseExecutionInfo phaseExecutionInfo,
boolean skip
boolean skip,
String forceMergeIndexName
) {
return new IndexLifecycleExplainResponse(
index,
Expand All @@ -193,7 +198,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse(
stepInfo,
previousStepInfo,
phaseExecutionInfo,
skip
skip,
forceMergeIndexName
);
}

Expand All @@ -219,7 +225,8 @@ public static IndexLifecycleExplainResponse newUnmanagedIndexResponse(String ind
null,
null,
null,
false
false,
null
);
}

Expand All @@ -244,7 +251,8 @@ private IndexLifecycleExplainResponse(
BytesReference stepInfo,
BytesReference previousStepInfo,
PhaseExecutionInfo phaseExecutionInfo,
boolean skip
boolean skip,
String forceMergeIndexName
) {
if (managedByILM) {
if (policyName == null) {
Expand Down Expand Up @@ -313,6 +321,7 @@ private IndexLifecycleExplainResponse(
this.snapshotName = snapshotName;
this.shrinkIndexName = shrinkIndexName;
this.skip = skip;
this.forceMergeIndexName = forceMergeIndexName;
}

public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
Expand Down Expand Up @@ -351,6 +360,8 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
} else {
skip = false;
}
// No need for deserialization from this point onwards as this action only runs on the local node.
forceMergeIndexName = null;
} else {
policyName = null;
lifecycleDate = null;
Expand All @@ -371,6 +382,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
shrinkIndexName = null;
indexCreationDate = null;
skip = false;
forceMergeIndexName = null;
}
}

Expand Down Expand Up @@ -405,6 +417,7 @@ public void writeTo(StreamOutput out) throws IOException {
|| out.getTransportVersion().onOrAfter(TransportVersions.ILM_ADD_SKIP_SETTING)) {
out.writeBoolean(skip);
}
// No need for serialization from this point onwards as this action only runs on the local node.
}
}

Expand Down Expand Up @@ -508,6 +521,10 @@ public boolean getSkip() {
return skip;
}

public String getForceMergeIndexName() {
return forceMergeIndexName;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -595,6 +612,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(PHASE_EXECUTION_INFO.getPreferredName(), phaseExecutionInfo);
}
builder.field(SKIP_NAME.getPreferredName(), skip);
if (forceMergeIndexName != null) {
builder.field(FORCE_MERGE_INDEX_NAME.getPreferredName(), forceMergeIndexName);
}
}
builder.endObject();
return builder;
Expand Down Expand Up @@ -623,7 +643,8 @@ public int hashCode() {
stepInfo,
previousStepInfo,
phaseExecutionInfo,
skip
skip,
forceMergeIndexName
);
}

Expand Down Expand Up @@ -656,7 +677,8 @@ public boolean equals(Object obj) {
&& Objects.equals(stepInfo, other.stepInfo)
&& Objects.equals(previousStepInfo, other.previousStepInfo)
&& Objects.equals(phaseExecutionInfo, other.phaseExecutionInfo)
&& Objects.equals(skip, other.skip);
&& Objects.equals(skip, other.skip)
&& Objects.equals(forceMergeIndexName, other.forceMergeIndexName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren
logger.debug(
"index [{}] using policy [{}] does not have a stored snapshot index name, "
+ "using our best effort guess of [{}] for the original snapshotted index name",
indexMetadata.getIndex().getName(),
indexName,
policyName,
indexName
);
} else {
indexName = searchableSnapshotMetadata.sourceIndex();
}
} else {
} else if (snapshotIndexName.equals(indexName) == false) {
// Use the name of the snapshot as specified in the metadata, because the current index
// name not might not reflect the name of the index actually in the snapshot
logger.debug(
Expand All @@ -158,6 +158,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren
policyName,
snapshotIndexName
);
// Note: this will update the indexName to the force-merged index name if we performed the force-merge on a cloned index.
indexName = snapshotIndexName;
}

Expand Down
Loading