Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133954.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133954
summary: "ILM: Force merge on zero-replica cloned index before snapshotting for searchable snapshots"
area: ILM+SLM
type: enhancement
issues:
- 75478
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public record LifecycleExecutionState(
String snapshotName,
String shrinkIndexName,
String snapshotIndexName,
String downsampleIndexName
String downsampleIndexName,
String forceMergeIndexName
) {

public static final String ILM_CUSTOM_METADATA_KEY = "ilm";
Expand All @@ -64,6 +65,7 @@ public record LifecycleExecutionState(
private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
private static final String SHRINK_INDEX_NAME = "shrink_index_name";
private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name";
private static final String FORCE_MERGE_CLONE_INDEX_NAME = "force_merge_clone_index_name";

public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build();

Expand All @@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) {
.setShrinkIndexName(state.shrinkIndexName)
.setSnapshotIndexName(state.snapshotIndexName)
.setDownsampleIndexName(state.downsampleIndexName)
.setStepTime(state.stepTime);
.setStepTime(state.stepTime)
.setForceMergeIndexName(state.forceMergeIndexName);
}

public static LifecycleExecutionState fromCustomMetadata(Map<String, String> customData) {
Expand Down Expand Up @@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map<String, String> cus
if (downsampleIndexName != null) {
builder.setDownsampleIndexName(downsampleIndexName);
}
String forceMergeIndexName = customData.get(FORCE_MERGE_CLONE_INDEX_NAME);
if (forceMergeIndexName != null) {
builder.setForceMergeIndexName(forceMergeIndexName);
}
return builder.build();
}

Expand Down Expand Up @@ -274,6 +281,9 @@ public Map<String, String> asMap() {
if (downsampleIndexName != null) {
result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName);
}
if (forceMergeIndexName != null) {
result.put(FORCE_MERGE_CLONE_INDEX_NAME, forceMergeIndexName);
}
return Collections.unmodifiableMap(result);
}

Expand Down Expand Up @@ -307,6 +317,7 @@ public static class Builder {
private String shrinkIndexName;
private String snapshotIndexName;
private String downsampleIndexName;
private String forceMergeIndexName;

public Builder setPhase(String phase) {
this.phase = phase;
Expand Down Expand Up @@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) {
return this;
}

public Builder setForceMergeIndexName(String forceMergeIndexName) {
this.forceMergeIndexName = forceMergeIndexName;
return this;
}

public LifecycleExecutionState build() {
return new LifecycleExecutionState(
phase,
Expand All @@ -417,7 +433,8 @@ public LifecycleExecutionState build() {
snapshotName,
shrinkIndexName,
snapshotIndexName,
downsampleIndexName
downsampleIndexName,
forceMergeIndexName
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,10 @@ protected static void awaitIndexExists(String index, TimeValue timeout) throws I
ensureHealth(client(), index, request -> request.addParameter("timeout", timeout.toString()));
}

protected static void awaitIndexDoesNotExist(String index) throws Exception {
awaitIndexDoesNotExist(index, TimeValue.timeValueSeconds(10));
}

protected static void awaitIndexDoesNotExist(String index, TimeValue timeout) throws Exception {
assertBusy(() -> assertFalse(indexExists(index)), timeout.millis(), TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
);
return;
}
// If we performed the force merge step on the cloned index, we need to snapshot that index instead of the original.
final String clonedIndexName = lifecycleState.forceMergeIndexName();
final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexName;
CreateSnapshotRequest request = new CreateSnapshotRequest(TimeValue.MAX_VALUE, snapshotRepository, snapshotName);
request.indices(indexName);
request.indices(forceMergedIndexName);
// this is safe as the snapshot creation will still be async, it's just that the listener will be notified when the snapshot is
// complete
request.waitForCompletion(true);
Expand All @@ -112,7 +115,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
logger.debug(
"create snapshot response for policy [{}] and index [{}] is: {}",
policyName,
indexName,
forceMergedIndexName,
Strings.toString(response)
);
final SnapshotInfo snapInfo = response.getSnapshotInfo();
Expand All @@ -128,7 +131,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
snapshotRepository,
snapshotName,
policyName,
indexName,
forceMergedIndexName,
snapInfo.failedShards(),
snapInfo.totalShards()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,56 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;

import java.util.function.BiFunction;

/**
* Deletes a single index.
*/
public class DeleteStep extends AsyncRetryDuringSnapshotActionStep {

public static final String NAME = "delete";
private static final Logger logger = LogManager.getLogger(DeleteStep.class);
private static final BiFunction<String, LifecycleExecutionState, String> DEFAULT_TARGET_INDEX_NAME_SUPPLIER = (
indexName,
lifecycleState) -> indexName;

private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;
private final boolean indexSurvives;

/**
* Use this constructor to delete the index that ILM is currently operating on.
*/
public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
this(key, nextStepKey, client, DEFAULT_TARGET_INDEX_NAME_SUPPLIER, false);
}

/**
* Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on. The parameter
* {@code indexSurvives} indicates whether the index that ILM runs on will survive (i.e. not get deleted) this step.
* Look at the callers of {@link AsyncActionStep#indexSurvives()} for more details.
*/
public DeleteStep(
StepKey key,
StepKey nextStepKey,
Client client,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
boolean indexSurvives
) {
super(key, nextStepKey, client);
this.targetIndexNameSupplier = targetIndexNameSupplier;
this.indexSurvives = indexSurvives;
}

@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
String policyName = indexMetadata.getLifecyclePolicyName();
String indexName = indexMetadata.getIndex().getName();
String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState());
IndexAbstraction indexAbstraction = currentProject.getIndicesLookup().get(indexName);
assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found";
DataStream dataStream = indexAbstraction.getParentDataStream();
Expand Down Expand Up @@ -88,7 +118,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata

@Override
public boolean indexSurvives() {
return false;
return indexSurvives;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.common.Strings;

import java.util.Arrays;
import java.util.Objects;
import java.util.function.BiFunction;

/**
* Invokes a force merge on a single index.
Expand All @@ -28,11 +30,33 @@ public class ForceMergeStep extends AsyncActionStep {

public static final String NAME = "forcemerge";
private static final Logger logger = LogManager.getLogger(ForceMergeStep.class);
private static final BiFunction<String, LifecycleExecutionState, String> DEFAULT_TARGET_INDEX_NAME_SUPPLIER = (
indexName,
lifecycleState) -> indexName;

private final int maxNumSegments;
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;

/**
* Creates a new {@link ForceMergeStep} that will perform a force merge on the index that ILM is currently operating on.
*/
public ForceMergeStep(StepKey key, StepKey nextStepKey, Client client, int maxNumSegments) {
this(key, nextStepKey, client, maxNumSegments, DEFAULT_TARGET_INDEX_NAME_SUPPLIER);
}

/**
* Creates a new {@link ForceMergeStep} that will perform a force merge on the index name returned by the supplier.
*/
public ForceMergeStep(
StepKey key,
StepKey nextStepKey,
Client client,
int maxNumSegments,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier
) {
super(key, nextStepKey, client);
this.maxNumSegments = maxNumSegments;
this.targetIndexNameSupplier = targetIndexNameSupplier;
}

@Override
Expand All @@ -51,7 +75,8 @@ public void performAction(
ClusterStateObserver observer,
ActionListener<Void> listener
) {
String indexName = indexMetadata.getIndex().getName();
String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState());
assert indexName != null : "target index name supplier must not return null";
ForceMergeRequest request = new ForceMergeRequest(indexName);
request.maxNumSegments(maxNumSegments);
getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Locale;
import java.util.Objects;
import java.util.function.BiFunction;

/**
* Generates a snapshot name for the given index and records it in the index metadata along with the provided snapshot repository.
Expand All @@ -35,10 +36,17 @@ public class GenerateSnapshotNameStep extends ClusterStateActionStep {
private static final Logger logger = LogManager.getLogger(GenerateSnapshotNameStep.class);

private final String snapshotRepository;

public GenerateSnapshotNameStep(StepKey key, StepKey nextStepKey, String snapshotRepository) {
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;

public GenerateSnapshotNameStep(
StepKey key,
StepKey nextStepKey,
String snapshotRepository,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier
) {
super(key, nextStepKey);
this.snapshotRepository = snapshotRepository;
this.targetIndexNameSupplier = targetIndexNameSupplier;
}

public String getSnapshotRepository() {
Expand Down Expand Up @@ -72,9 +80,11 @@ public ProjectState performAction(Index index, ProjectState projectState) {
+ "] cannot continue until the repository is created or the policy is changed"
);
}
final String indexName = targetIndexNameSupplier.apply(index.getName(), lifecycleState);
assert indexName != null : "target index name supplier must not return null";

LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState);
newLifecycleState.setSnapshotIndexName(index.getName());
newLifecycleState.setSnapshotIndexName(indexName);
newLifecycleState.setSnapshotRepository(snapshotRepository);
if (lifecycleState.snapshotName() == null) {
// generate and validate the snapshotName
Expand Down
Loading