diff --git a/docs/changelog/133954.yaml b/docs/changelog/133954.yaml new file mode 100644 index 0000000000000..a6a4105b343d8 --- /dev/null +++ b/docs/changelog/133954.yaml @@ -0,0 +1,6 @@ +pr: 133954 +summary: "ILM: Force merge on zero-replica cloned index before snapshotting for searchable snapshots" +area: ILM+SLM +type: enhancement +issues: + - 75478 diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java index 77f1135da2505..f1def7aeb0ebf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java @@ -40,7 +40,8 @@ public record LifecycleExecutionState( String snapshotName, String shrinkIndexName, String snapshotIndexName, - String downsampleIndexName + String downsampleIndexName, + String forceMergeCloneIndexName ) { public static final String ILM_CUSTOM_METADATA_KEY = "ilm"; @@ -64,6 +65,7 @@ public record LifecycleExecutionState( private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name"; private static final String SHRINK_INDEX_NAME = "shrink_index_name"; private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name"; + private static final String FORCE_MERGE_CLONE_INDEX_NAME = "force_merge_clone_index_name"; public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build(); @@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) { .setShrinkIndexName(state.shrinkIndexName) .setSnapshotIndexName(state.snapshotIndexName) .setDownsampleIndexName(state.downsampleIndexName) - .setStepTime(state.stepTime); + .setStepTime(state.stepTime) + .setForceMergeCloneIndexName(state.forceMergeCloneIndexName); } public static LifecycleExecutionState fromCustomMetadata(Map customData) { @@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map cus if (downsampleIndexName != null) { builder.setDownsampleIndexName(downsampleIndexName); } + String forceMergeCloneIndexName = customData.get(FORCE_MERGE_CLONE_INDEX_NAME); + if (forceMergeCloneIndexName != null) { + builder.setForceMergeCloneIndexName(forceMergeCloneIndexName); + } return builder.build(); } @@ -274,6 +281,9 @@ public Map asMap() { if (downsampleIndexName != null) { result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName); } + if (forceMergeCloneIndexName != null) { + result.put(FORCE_MERGE_CLONE_INDEX_NAME, forceMergeCloneIndexName); + } return Collections.unmodifiableMap(result); } @@ -307,6 +317,7 @@ public static class Builder { private String shrinkIndexName; private String snapshotIndexName; private String downsampleIndexName; + private String forceMergeCloneIndexName; public Builder setPhase(String phase) { this.phase = phase; @@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) { return this; } + public Builder setForceMergeCloneIndexName(String forceMergeCloneIndexName) { + this.forceMergeCloneIndexName = forceMergeCloneIndexName; + return this; + } + public LifecycleExecutionState build() { return new LifecycleExecutionState( phase, @@ -417,7 +433,8 @@ public LifecycleExecutionState build() { snapshotName, shrinkIndexName, snapshotIndexName, - downsampleIndexName + downsampleIndexName, + forceMergeCloneIndexName ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 3b0470968e2f6..100c523b21374 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2087,6 +2087,10 @@ protected static void awaitIndexExists(String index, TimeValue timeout) throws I ensureHealth(client(), index, request -> request.addParameter("timeout", timeout.toString())); } + protected static void awaitIndexDoesNotExist(String index) throws Exception { + awaitIndexDoesNotExist(index, TimeValue.timeValueSeconds(10)); + } + protected static void awaitIndexDoesNotExist(String index, TimeValue timeout) throws Exception { assertBusy(() -> assertFalse(indexExists(index)), timeout.millis(), TimeUnit.MILLISECONDS); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java index 7a85ead2d1472..8abf938c717f8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java @@ -101,8 +101,11 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList ); return; } + // If we performed the force merge step on the cloned index, we need to snapshot that index instead of the original. + final String clonedIndexName = lifecycleState.forceMergeCloneIndexName(); + final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexName; CreateSnapshotRequest request = new CreateSnapshotRequest(TimeValue.MAX_VALUE, snapshotRepository, snapshotName); - request.indices(indexName); + request.indices(forceMergedIndexName); // this is safe as the snapshot creation will still be async, it's just that the listener will be notified when the snapshot is // complete request.waitForCompletion(true); @@ -112,7 +115,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList logger.debug( "create snapshot response for policy [{}] and index [{}] is: {}", policyName, - indexName, + forceMergedIndexName, Strings.toString(response) ); final SnapshotInfo snapInfo = response.getSnapshotInfo(); @@ -128,7 +131,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList snapshotRepository, snapshotName, policyName, - indexName, + forceMergedIndexName, snapInfo.failedShards(), snapInfo.totalShards() ) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java index a6cedfceb1fbe..176cb013010b6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java @@ -16,26 +16,56 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import java.util.function.BiFunction; + /** * Deletes a single index. */ public class DeleteStep extends AsyncRetryDuringSnapshotActionStep { + public static final String NAME = "delete"; private static final Logger logger = LogManager.getLogger(DeleteStep.class); + private static final BiFunction DEFAULT_TARGET_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> indexName; + + private final BiFunction targetIndexNameSupplier; + private final boolean indexSurvives; + /** + * Use this constructor to delete the index that ILM is currently operating on. + */ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) { + this(key, nextStepKey, client, DEFAULT_TARGET_INDEX_NAME_SUPPLIER, false); + } + + /** + * Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on. The parameter + * {@code indexSurvives} indicates whether the index that ILM runs on will survive (i.e. not get deleted) this step. + * Look at the callers of {@link AsyncActionStep#indexSurvives()} for more details. + */ + public DeleteStep( + StepKey key, + StepKey nextStepKey, + Client client, + BiFunction targetIndexNameSupplier, + boolean indexSurvives + ) { super(key, nextStepKey, client); + this.targetIndexNameSupplier = targetIndexNameSupplier; + this.indexSurvives = indexSurvives; } @Override public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener listener) { String policyName = indexMetadata.getLifecyclePolicyName(); - String indexName = indexMetadata.getIndex().getName(); + String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState()); IndexAbstraction indexAbstraction = currentProject.getIndicesLookup().get(indexName); assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found"; DataStream dataStream = indexAbstraction.getParentDataStream(); @@ -88,7 +118,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata @Override public boolean indexSurvives() { - return false; + return indexSurvives; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java index 0fc0ac1f91f6a..233e39eb4421f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java @@ -16,10 +16,12 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.common.Strings; import java.util.Arrays; import java.util.Objects; +import java.util.function.BiFunction; /** * Invokes a force merge on a single index. @@ -28,11 +30,33 @@ public class ForceMergeStep extends AsyncActionStep { public static final String NAME = "forcemerge"; private static final Logger logger = LogManager.getLogger(ForceMergeStep.class); + private static final BiFunction DEFAULT_TARGET_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> indexName; + private final int maxNumSegments; + private final BiFunction targetIndexNameSupplier; + /** + * Creates a new {@link ForceMergeStep} that will perform a force merge on the index that ILM is currently operating on. + */ public ForceMergeStep(StepKey key, StepKey nextStepKey, Client client, int maxNumSegments) { + this(key, nextStepKey, client, maxNumSegments, DEFAULT_TARGET_INDEX_NAME_SUPPLIER); + } + + /** + * Creates a new {@link ForceMergeStep} that will perform a force merge on the index name returned by the supplier. + */ + public ForceMergeStep( + StepKey key, + StepKey nextStepKey, + Client client, + int maxNumSegments, + BiFunction targetIndexNameSupplier + ) { super(key, nextStepKey, client); this.maxNumSegments = maxNumSegments; + this.targetIndexNameSupplier = targetIndexNameSupplier; } @Override @@ -51,7 +75,8 @@ public void performAction( ClusterStateObserver observer, ActionListener listener ) { - String indexName = indexMetadata.getIndex().getName(); + String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState()); + assert indexName != null : "target index name supplier must not return null"; ForceMergeRequest request = new ForceMergeRequest(indexName); request.maxNumSegments(maxNumSegments); getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java index 2606799fe8a88..88b4abc0871fa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java @@ -21,6 +21,7 @@ import java.util.Locale; import java.util.Objects; +import java.util.function.BiFunction; /** * Generates a snapshot name for the given index and records it in the index metadata along with the provided snapshot repository. @@ -35,10 +36,17 @@ public class GenerateSnapshotNameStep extends ClusterStateActionStep { private static final Logger logger = LogManager.getLogger(GenerateSnapshotNameStep.class); private final String snapshotRepository; - - public GenerateSnapshotNameStep(StepKey key, StepKey nextStepKey, String snapshotRepository) { + private final BiFunction targetIndexNameSupplier; + + public GenerateSnapshotNameStep( + StepKey key, + StepKey nextStepKey, + String snapshotRepository, + BiFunction targetIndexNameSupplier + ) { super(key, nextStepKey); this.snapshotRepository = snapshotRepository; + this.targetIndexNameSupplier = targetIndexNameSupplier; } public String getSnapshotRepository() { @@ -72,9 +80,11 @@ public ProjectState performAction(Index index, ProjectState projectState) { + "] cannot continue until the repository is created or the policy is changed" ); } + final String indexName = targetIndexNameSupplier.apply(index.getName(), lifecycleState); + assert indexName != null : "target index name supplier must not return null"; LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState); - newLifecycleState.setSnapshotIndexName(index.getName()); + newLifecycleState.setSnapshotIndexName(indexName); newLifecycleState.setSnapshotRepository(snapshotRepository); if (lifecycleState.snapshotName() == null) { // generate and validate the snapshotName diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java index 7b1a68325574b..5970045a5d8b4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java @@ -57,6 +57,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl private static final ParseField SHRINK_INDEX_NAME = new ParseField("shrink_index_name"); private static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name"); private static final ParseField SKIP_NAME = new ParseField("skip"); + private static final ParseField FORCE_MERGE_CLONE_INDEX_NAME = new ParseField("force_merge_clone_index_name"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "index_lifecycle_explain_response", @@ -81,7 +82,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl (BytesReference) a[11], (BytesReference) a[21], (PhaseExecutionInfo) a[12], - Objects.requireNonNullElse((Boolean) a[22], false) + Objects.requireNonNullElse((Boolean) a[22], false), + (String) a[24] // a[13] == "age" // a[20] == "time_since_index_creation" // a[23] = "age_in_millis" @@ -124,6 +126,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl }, PREVIOUS_STEP_INFO_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SKIP_NAME); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), AGE_IN_MILLIS_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_CLONE_INDEX_NAME); } private final String index; @@ -147,6 +150,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl private final String snapshotName; private final String shrinkIndexName; private final boolean skip; + private final String forceMergeCloneIndexName; Supplier nowSupplier = System::currentTimeMillis; // Can be changed for testing @@ -170,7 +174,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse( BytesReference stepInfo, BytesReference previousStepInfo, PhaseExecutionInfo phaseExecutionInfo, - boolean skip + boolean skip, + String forceMergeCloneIndexName ) { return new IndexLifecycleExplainResponse( index, @@ -193,7 +198,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse( stepInfo, previousStepInfo, phaseExecutionInfo, - skip + skip, + forceMergeCloneIndexName ); } @@ -219,7 +225,8 @@ public static IndexLifecycleExplainResponse newUnmanagedIndexResponse(String ind null, null, null, - false + false, + null ); } @@ -244,7 +251,8 @@ private IndexLifecycleExplainResponse( BytesReference stepInfo, BytesReference previousStepInfo, PhaseExecutionInfo phaseExecutionInfo, - boolean skip + boolean skip, + String forceMergeCloneIndexName ) { if (managedByILM) { if (policyName == null) { @@ -313,6 +321,7 @@ private IndexLifecycleExplainResponse( this.snapshotName = snapshotName; this.shrinkIndexName = shrinkIndexName; this.skip = skip; + this.forceMergeCloneIndexName = forceMergeCloneIndexName; } public IndexLifecycleExplainResponse(StreamInput in) throws IOException { @@ -351,6 +360,8 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { } else { skip = false; } + // No need for deserialization from this point onwards as this action only runs on the local node. + forceMergeCloneIndexName = null; } else { policyName = null; lifecycleDate = null; @@ -371,6 +382,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { shrinkIndexName = null; indexCreationDate = null; skip = false; + forceMergeCloneIndexName = null; } } @@ -405,6 +417,7 @@ public void writeTo(StreamOutput out) throws IOException { || out.getTransportVersion().onOrAfter(TransportVersions.ILM_ADD_SKIP_SETTING)) { out.writeBoolean(skip); } + // No need for serialization from this point onwards as this action only runs on the local node. } } @@ -508,6 +521,10 @@ public boolean getSkip() { return skip; } + public String getForceMergeCloneIndexName() { + return forceMergeCloneIndexName; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -595,6 +612,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(PHASE_EXECUTION_INFO.getPreferredName(), phaseExecutionInfo); } builder.field(SKIP_NAME.getPreferredName(), skip); + if (forceMergeCloneIndexName != null) { + builder.field(FORCE_MERGE_CLONE_INDEX_NAME.getPreferredName(), forceMergeCloneIndexName); + } } builder.endObject(); return builder; @@ -623,7 +643,8 @@ public int hashCode() { stepInfo, previousStepInfo, phaseExecutionInfo, - skip + skip, + forceMergeCloneIndexName ); } @@ -656,7 +677,8 @@ public boolean equals(Object obj) { && Objects.equals(stepInfo, other.stepInfo) && Objects.equals(previousStepInfo, other.previousStepInfo) && Objects.equals(phaseExecutionInfo, other.phaseExecutionInfo) - && Objects.equals(skip, other.skip); + && Objects.equals(skip, other.skip) + && Objects.equals(forceMergeCloneIndexName, other.forceMergeCloneIndexName); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java index 58ac74a420b9f..eaa1297008488 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java @@ -141,14 +141,14 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren logger.debug( "index [{}] using policy [{}] does not have a stored snapshot index name, " + "using our best effort guess of [{}] for the original snapshotted index name", - indexMetadata.getIndex().getName(), + indexName, policyName, indexName ); } else { indexName = searchableSnapshotMetadata.sourceIndex(); } - } else { + } else if (snapshotIndexName.equals(indexName) == false) { // Use the name of the snapshot as specified in the metadata, because the current index // name not might not reflect the name of the index actually in the snapshot logger.debug( @@ -158,6 +158,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren policyName, snapshotIndexName ); + // Note: this will update the indexName to the force-merged index name if we performed the force-merge on a cloned index. indexName = snapshotIndexName; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index b06b3a20bc0d3..50f5d70054282 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -35,6 +36,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; import static org.elasticsearch.TransportVersions.ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; @@ -56,12 +59,28 @@ public class SearchableSnapshotAction implements LifecycleAction { public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index"); public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node"); public static final ParseField REPLICATE_FOR = new ParseField("replicate_for"); - public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites"; public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot"; + public static final String CONDITIONAL_SKIP_CLONE_STEP = BranchingStep.NAME + "-skip-clone-check"; + public static final String WAIT_FOR_CLONED_INDEX_GREEN = WaitForIndexColorStep.NAME + "-cloned-index"; + public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; + public static final String CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY = BranchingStep.NAME + "-delete-force-merged-index"; + public static final String DELETE_FORCE_MERGED_INDEX_KEY = DeleteStep.NAME + "-force-merged-index"; public static final String FULL_RESTORED_INDEX_PREFIX = "restored-"; public static final String PARTIAL_RESTORED_INDEX_PREFIX = "partial-"; + public static final String FORCE_MERGE_CLONE_INDEX_PREFIX = "fm-clone-"; + /** An index name supplier that always returns the force merge index name (possibly null). */ + public static final BiFunction FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER = ( + indexName, + state) -> state.forceMergeCloneIndexName(); + /** An index name supplier that returns the force merge index name if it exists, or the original index name if not. */ + public static final BiFunction FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER = ( + indexName, + state) -> state.forceMergeCloneIndexName() != null ? state.forceMergeCloneIndexName() : indexName; + + private static final Settings CLONE_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + private static final Function CLONE_SETTINGS_SUPPLIER = indexMetadata -> CLONE_SETTINGS; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, @@ -163,9 +182,17 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac StepKey checkNoWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME); StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME); StepKey waitTimeSeriesEndTimePassesKey = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey skipGeneratingSnapshotKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_GENERATE_AND_CLEAN); + + StepKey conditionalSkipCloneKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_CLONE_STEP); + StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME); + StepKey cleanupClonedIndexKey = new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME); + StepKey generateCloneIndexNameKey = new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME); + StepKey cloneIndexKey = new StepKey(phase, NAME, ResizeIndexStep.CLONE); + StepKey waitForClonedIndexGreenKey = new StepKey(phase, NAME, WAIT_FOR_CLONED_INDEX_GREEN); StepKey forceMergeStepKey = new StepKey(phase, NAME, ForceMergeStep.NAME); StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME); - StepKey skipGeneratingSnapshotKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_GENERATE_AND_CLEAN); + StepKey generateSnapshotNameKey = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME); StepKey cleanSnapshotKey = new StepKey(phase, NAME, CleanupSnapshotStep.NAME); StepKey createSnapshotKey = new StepKey(phase, NAME, CreateSnapshotStep.NAME); @@ -177,7 +204,9 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac StepKey copyLifecyclePolicySettingKey = new StepKey(phase, NAME, CopySettingsStep.NAME); StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME); StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME); - StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); + StepKey deleteSourceIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); + StepKey conditionalDeleteForceMergedIndexKey = new StepKey(phase, NAME, CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY); + StepKey deleteForceMergedIndexKey = new StepKey(phase, NAME, DELETE_FORCE_MERGED_INDEX_KEY); StepKey replicateForKey = new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME); StepKey dropReplicasKey = new StepKey(phase, NAME, UpdateSettingsStep.NAME); @@ -269,9 +298,9 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac Instant::now ); - // When generating a snapshot, we either jump to the force merge step, or we skip the + // When generating a snapshot, we either jump to the force merge section, or we skip the // forcemerge and go straight to steps for creating the snapshot - StepKey keyForSnapshotGeneration = forceMergeIndex ? forceMergeStepKey : generateSnapshotNameKey; + StepKey keyForSnapshotGeneration = forceMergeIndex ? conditionalSkipCloneKey : generateSnapshotNameKey; // Branch, deciding whether there is an existing searchable snapshot that can be used for mounting the index // (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot BranchingStep skipGeneratingSnapshotStep = new BranchingStep( @@ -328,12 +357,74 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); // If a new snapshot is needed, these steps are executed - ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeStepKey, waitForSegmentCountKey, client, 1); - SegmentCountStep segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, generateSnapshotNameKey, client, 1); + // If the index has replicas, we need to clone the index first with 0 replicas and perform the force-merge on that index. + // That avoids us having to force-merge the replica shards too, which is a waste, as the snapshot will only be taken from the + // primary shards. If the index already has 0 replicas, we can skip the clone steps. + BranchingStep conditionalSkipCloneStep = new BranchingStep( + conditionalSkipCloneKey, + readOnlyKey, + forceMergeStepKey, + (index, project) -> { + IndexMetadata indexMetadata = project.index(index); + assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state"; + return indexMetadata.getNumberOfReplicas() == 0; + } + ); + ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, cleanupClonedIndexKey, client, false); + // If a previous step created a clone index but the action did not complete, we need to clean up the old clone index. + CleanupGeneratedIndexStep cleanupClonedIndexStep = new CleanupGeneratedIndexStep( + cleanupClonedIndexKey, + generateCloneIndexNameKey, + client, + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER + ); + GenerateUniqueIndexNameStep generateCloneIndexNameStep = new GenerateUniqueIndexNameStep( + generateCloneIndexNameKey, + cloneIndexKey, + FORCE_MERGE_CLONE_INDEX_PREFIX, + (generatedIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setForceMergeCloneIndexName(generatedIndexName) + ); + // Clone the index with 0 replicas. + ResizeIndexStep cloneIndexStep = new ResizeIndexStep( + cloneIndexKey, + waitForClonedIndexGreenKey, + client, + ResizeType.CLONE, + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER, + CLONE_SETTINGS_SUPPLIER, + null + ); + // Wait for the cloned index to be green before proceeding with the force-merge. We wrap this with a + // ClusterStateWaitUntilThresholdStep to avoid waiting forever if the index cannot be started for some reason. + // On timeout, ILM will move back to the cleanup step, remove the cloned index, and retry the clone. + ClusterStateWaitUntilThresholdStep waitForClonedIndexGreenStep = new ClusterStateWaitUntilThresholdStep( + new WaitForIndexColorStep( + waitForClonedIndexGreenKey, + forceMergeStepKey, + ClusterHealthStatus.GREEN, + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER + ), + cleanupClonedIndexKey + ); + ForceMergeStep forceMergeStep = new ForceMergeStep( + forceMergeStepKey, + waitForSegmentCountKey, + client, + 1, + FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER + ); + SegmentCountStep segmentCountStep = new SegmentCountStep( + waitForSegmentCountKey, + generateSnapshotNameKey, + client, + 1, + FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER + ); GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep( generateSnapshotNameKey, cleanSnapshotKey, - snapshotRepository + snapshotRepository, + FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER ); CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, createSnapshotKey, client); CreateSnapshotStep createSnapshotStep = new CreateSnapshotStep(createSnapshotKey, waitForDataTierKey, cleanSnapshotKey, client); @@ -371,10 +462,30 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); CopySettingsStep copySettingsStep = new CopySettingsStep( copyLifecyclePolicySettingKey, - dataStreamCheckBranchingKey, + forceMergeIndex ? conditionalDeleteForceMergedIndexKey : dataStreamCheckBranchingKey, (index, lifecycleState) -> getRestoredIndexPrefix(copyLifecyclePolicySettingKey) + index, LifecycleSettings.LIFECYCLE_NAME ); + // If we cloned the index, we need to delete it before we swap the mounted snapshot in place of the original index. + // If we did not clone the index, there's nothing else for us to do. + BranchingStep conditionalDeleteForceMergedIndexStep = new BranchingStep( + conditionalDeleteForceMergedIndexKey, + dataStreamCheckBranchingKey, + deleteForceMergedIndexKey, + (index, project) -> { + IndexMetadata indexMetadata = project.index(index); + assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state"; + String cloneIndexName = indexMetadata.getLifecycleExecutionState().forceMergeCloneIndexName(); + return cloneIndexName != null && project.index(cloneIndexName) != null; + } + ); + DeleteStep deleteForceMergedIndexStep = new DeleteStep( + deleteForceMergedIndexKey, + dataStreamCheckBranchingKey, + client, + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER, + true + ); BranchingStep isDataStreamBranchingStep = new BranchingStep( dataStreamCheckBranchingKey, swapAliasesKey, @@ -387,10 +498,10 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep( replaceDataStreamIndexKey, - deleteIndexKey, + deleteSourceIndexKey, (index, executionState) -> getRestoredIndexPrefix(replaceDataStreamIndexKey) + index ); - DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, null, client); + DeleteStep deleteSourceIndexStep = new DeleteStep(deleteSourceIndexKey, null, client); // sending this step to null as the restored index (which will after this step essentially be the source index) was sent to the next // key after we restored the lifecycle execution state SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep( @@ -417,6 +528,12 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac steps.add(waitUntilTimeSeriesEndTimeStep); steps.add(skipGeneratingSnapshotStep); if (forceMergeIndex) { + steps.add(conditionalSkipCloneStep); + steps.add(readOnlyStep); + steps.add(cleanupClonedIndexStep); + steps.add(generateCloneIndexNameStep); + steps.add(cloneIndexStep); + steps.add(waitForClonedIndexGreenStep); steps.add(forceMergeStep); steps.add(segmentCountStep); } @@ -432,6 +549,10 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac steps.add(replicateForStep); steps.add(dropReplicasStep); } + if (forceMergeIndex) { + steps.add(conditionalDeleteForceMergedIndexStep); + steps.add(deleteForceMergedIndexStep); + } steps.add(isDataStreamBranchingStep); steps.add(replaceDataStreamBackingIndex); steps.add(deleteSourceIndexStep); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java index 5ad7c12cd6e33..7442d143e2436 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java @@ -16,10 +16,10 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import java.util.stream.Collectors; /** @@ -39,12 +40,33 @@ public class SegmentCountStep extends AsyncWaitStep { private static final Logger logger = LogManager.getLogger(SegmentCountStep.class); public static final String NAME = "segment-count"; + private static final BiFunction DEFAULT_TARGET_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> indexName; private final int maxNumSegments; + private final BiFunction targetIndexNameSupplier; + /** + * Creates a new {@link SegmentCountStep} that will check the segment count on the index that ILM is currently operating on. + */ public SegmentCountStep(StepKey key, StepKey nextStepKey, Client client, int maxNumSegments) { + this(key, nextStepKey, client, maxNumSegments, DEFAULT_TARGET_INDEX_NAME_SUPPLIER); + } + + /** + * Creates a new {@link SegmentCountStep} that will check the segment count on the index name returned by the supplier. + */ + public SegmentCountStep( + StepKey key, + StepKey nextStepKey, + Client client, + int maxNumSegments, + BiFunction targetIndexNameSupplier + ) { super(key, nextStepKey, client); this.maxNumSegments = maxNumSegments; + this.targetIndexNameSupplier = targetIndexNameSupplier; } @Override @@ -58,16 +80,20 @@ public int getMaxNumSegments() { @Override public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { - Index index = indexMetadata.getIndex(); + String targetIndexName = targetIndexNameSupplier.apply( + indexMetadata.getIndex().getName(), + indexMetadata.getLifecycleExecutionState() + ); + assert targetIndexName != null : "target index name supplier must not return null"; getClient(state.projectId()).admin() .indices() - .segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> { - IndexSegments idxSegments = response.getIndices().get(index.getName()); + .segments(new IndicesSegmentsRequest(targetIndexName), ActionListener.wrap(response -> { + IndexSegments idxSegments = response.getIndices().get(targetIndexName); if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) { final DefaultShardOperationFailedException[] failures = response.getShardFailures(); logger.info( "[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}", - index.getName(), + targetIndexName, response.getFailedShards(), failures == null ? "n/a" @@ -86,7 +112,7 @@ public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, L .collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size())); logger.info( "[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}", - index.getName(), + targetIndexName, maxNumSegments, unmergedShards.size(), unmergedShardCounts diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java index 417e605a5dc6e..27018df2bf43b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java @@ -35,7 +35,7 @@ public class GenerateSnapshotNameStepTests extends AbstractStepTestCase i); } @Override @@ -50,12 +50,12 @@ protected GenerateSnapshotNameStep mutateInstance(GenerateSnapshotNameStep insta case 2 -> snapshotRepository = randomValueOtherThan(snapshotRepository, () -> randomAlphaOfLengthBetween(5, 10)); default -> throw new AssertionError("Illegal randomisation branch"); } - return new GenerateSnapshotNameStep(key, nextKey, snapshotRepository); + return new GenerateSnapshotNameStep(key, nextKey, snapshotRepository, null); } @Override protected GenerateSnapshotNameStep copyInstance(GenerateSnapshotNameStep instance) { - return new GenerateSnapshotNameStep(instance.getKey(), instance.getNextStepKey(), instance.getSnapshotRepository()); + return new GenerateSnapshotNameStep(instance.getKey(), instance.getNextStepKey(), instance.getSnapshotRepository(), null); } public void testPerformAction() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java index d001ecfa9e449..03b91e72a6cc6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java @@ -79,7 +79,9 @@ private static IndexLifecycleExplainResponse randomManagedIndexExplainResponse() randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : PhaseExecutionInfoTests.randomPhaseExecutionInfo(""), - randomBoolean() + randomBoolean(), + // We don't mutate any fields from this point onwards as we don't (de)serialize them, as the action is run on the local node + null ); } @@ -107,7 +109,8 @@ public void testInvalidStepDetails() { randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : PhaseExecutionInfoTests.randomPhaseExecutionInfo(""), - randomBoolean() + randomBoolean(), + randomBoolean() ? null : randomAlphaOfLength(10) ) ); assertThat(exception.getMessage(), startsWith("managed index response must have complete step details")); @@ -144,7 +147,8 @@ public void testIndexAges() throws IOException { null, null, null, - false + false, + null ); assertThat(managedExplainResponse.getLifecycleDate(), is(notNullValue())); Long now = 1_000_000L; @@ -331,7 +335,9 @@ protected IndexLifecycleExplainResponse mutateInstance(IndexLifecycleExplainResp stepInfo, previousStepInfo, phaseExecutionInfo, - skip + skip, + // We don't mutate any fields from this point onwards as we don't (de)serialize them, as the action is run on the local node + instance.getForceMergeCloneIndexName() ); } else { return switch (between(0, 1)) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java index c6119b272a8c0..268cf1a57e653 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java @@ -6,9 +6,13 @@ */ package org.elasticsearch.xpack.core.ilm; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; @@ -50,6 +54,7 @@ public void testToSteps() { CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(index); assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(index - 1))); validateWaitForDataTierStep(phase, steps, index + 1, index + 2); + validateForceMergeClone(action.isForceMergeIndex(), steps); } private void validateWaitForDataTierStep(String phase, List steps, int waitForDataTierStepIndex, int mountStepIndex) { @@ -62,6 +67,25 @@ private void validateWaitForDataTierStep(String phase, List steps, int wai } } + /** + * Validate that the {@link ResizeIndexStep} used to clone the index for force merging configures the target index with 0 replicas. + */ + private void validateForceMergeClone(boolean isForceMergeIndex, List steps) { + if (isForceMergeIndex == false) { + return; + } + ResizeIndexStep cloneStep = (ResizeIndexStep) steps.stream() + .filter(step -> step instanceof ResizeIndexStep) + .findFirst() + .orElseThrow(); + assertThat(cloneStep.getResizeType(), is(ResizeType.CLONE)); + var indexMetadata = IndexMetadata.builder(randomAlphaOfLength(5)) + .settings(indexSettings(IndexVersion.current(), randomIntBetween(1, 5), randomIntBetween(0, 5))) + .build(); + Settings cloneIndexSettings = cloneStep.getTargetIndexSettingsSupplier().apply(indexMetadata); + assertThat(cloneIndexSettings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, -1), is(0)); + } + public void testPrefixAndStorageTypeDefaults() { StepKey nonFrozenKey = new StepKey(randomFrom("hot", "warm", "cold", "delete"), randomAlphaOfLength(5), randomAlphaOfLength(5)); StepKey frozenKey = new StepKey("frozen", randomAlphaOfLength(5), randomAlphaOfLength(5)); @@ -102,6 +126,12 @@ private List expectedStepKeys(String phase, boolean forceMergeIndex, bo new StepKey(phase, NAME, WaitForNoFollowersStep.NAME), new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME), new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN), + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_CLONE_STEP) : null, + forceMergeIndex ? new StepKey(phase, NAME, ReadOnlyStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, ResizeIndexStep.CLONE) : null, + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.WAIT_FOR_CLONED_INDEX_GREEN) : null, forceMergeIndex ? new StepKey(phase, NAME, ForceMergeStep.NAME) : null, forceMergeIndex ? new StepKey(phase, NAME, SegmentCountStep.NAME) : null, new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME), @@ -114,6 +144,8 @@ private List expectedStepKeys(String phase, boolean forceMergeIndex, bo new StepKey(phase, NAME, CopySettingsStep.NAME), hasReplicateFor ? new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME) : null, hasReplicateFor ? new StepKey(phase, NAME, UpdateSettingsStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY) : null, + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.DELETE_FORCE_MERGED_INDEX_KEY) : null, new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY), new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME), new StepKey(phase, NAME, DeleteStep.NAME), diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index 4cd41e58b11ac..f23bab6157339 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -44,6 +44,8 @@ testClusters.configureEach { */ setting 'thread_pool.estimated_time_interval', '0' setting 'time_series.poll_interval', '10m' + // Disable shard balancing to avoid force merges failing due to relocating shards. + setting 'cluster.routing.rebalance.enable', 'none' } if (buildParams.inFipsJvm){ diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java index c90958254fb99..299b9e9313b89 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; @@ -45,9 +47,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween; @@ -372,35 +372,19 @@ private static void ensureGreen(String index) throws IOException { @SuppressWarnings("unchecked") public static Integer getNumberOfPrimarySegments(RestClient client, String index) throws IOException { Response response = client.performRequest(new Request("GET", index + "/_segments")); - XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); - final Map originalResponseEntity = XContentHelper.convertToMap( - entityContentType.xContent(), - response.getEntity().getContent(), - false - ); - if (logger.isTraceEnabled()) { - logger.trace( - "segments response for {}: {}", - index, - originalResponseEntity.keySet() - .stream() - .map(key -> key + "=" + originalResponseEntity.get(key)) - .collect(Collectors.joining(", ", "{", "}")) - ); - } - Map responseEntity = (Map) originalResponseEntity.get("indices"); - responseEntity = (Map) responseEntity.get(index); - responseEntity = (Map) responseEntity.get("shards"); - List> shards = (List>) responseEntity.get("0"); - // We want to mamke sure to get the primary shard because there is a chance the replica doesn't have data yet: - Optional> shardOptional = shards.stream() - .filter(shard -> ((Map) shard.get("routing")).get("primary").equals(true)) - .findAny(); - if (shardOptional.isPresent()) { - return (Integer) shardOptional.get().get("num_search_segments"); - } else { - throw new RuntimeException("No primary shard found for index " + index); - } + final Map originalResponseEntity = ESRestTestCase.entityAsMap(response); + logger.trace("segments response for {}: {}", index, originalResponseEntity); + Map responseEntity = new ObjectPath(originalResponseEntity).evaluateExact("indices", index, "shards"); + return responseEntity.values() + .stream() + .mapToInt( + shardList -> ((List>) shardList).stream() + .filter(shard -> ((Map) shard.get("routing")).get("primary").equals(true)) + .findFirst() + .map(shard -> (Integer) shard.get("num_search_segments")) + .orElseThrow(() -> new IllegalStateException("no primary shard found in " + shardList)) + ) + .sum(); } public static void updatePolicy(RestClient client, String indexName, String policy) throws IOException { @@ -413,6 +397,28 @@ public static void updatePolicy(RestClient client, String indexName, String poli assertOK(client.performRequest(changePolicyRequest)); } + /** + * Moves the specified index from the current ILM step to the next step. + */ + public static void moveIndexToStep(RestClient client, String indexName, Step.StepKey currentStep, Step.StepKey nextStep) + throws IOException { + Request moveToStepRequest = new Request("POST", "_ilm/move/" + indexName); + moveToStepRequest.setJsonEntity(Strings.format(""" + { + "current_step": { + "phase": "%s", + "action": "%s", + "name": "%s" + }, + "next_step": { + "phase": "%s", + "action": "%s", + "name": "%s" + } + }""", currentStep.phase(), currentStep.action(), currentStep.name(), nextStep.phase(), nextStep.action(), nextStep.name())); + ESRestTestCase.assertAcknowledged(client.performRequest(moveToStepRequest)); + } + @SuppressWarnings("unchecked") public static String getSnapshotState(RestClient client, String snapshot) throws IOException { Response response = client.performRequest(new Request("GET", "/_snapshot/repo/" + snapshot)); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index b95ea9b2745cb..111de0b6f400a 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -16,13 +16,16 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.routing.allocation.DataTier; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.TimeSeriesRestDriver; import org.elasticsearch.xpack.core.ilm.AllocateAction; import org.elasticsearch.xpack.core.ilm.DeleteAction; import org.elasticsearch.xpack.core.ilm.ForceMergeAction; @@ -61,10 +64,14 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.elasticsearch.xpack.core.ilm.DeleteAction.WITH_SNAPSHOT_DELETE; +import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.FORCE_MERGE_CLONE_INDEX_PREFIX; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.startsWith; public class SearchableSnapshotActionIT extends ESRestTestCase { @@ -121,65 +128,94 @@ public void testSearchableSnapshotAction() throws Exception { ); } - public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exception { - createSnapshotRepo(client(), snapshotRepo, randomBoolean()); - createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true)); + /** + * Test that when we have a searchable snapshot action with force merge enabled and the source index has _at least one_ replica, + * we perform the force merge on _the cloned index_ with 0 replicas and then snapshot the clone. + * We also sometimes artificially "pause" ILM on the clone step to allow us to assert that the cloned index has the correct settings. + * We do this by disabling allocation in the whole cluster, which means the clone step cannot complete as none of its shards can + * allocate. After re-enabling allocation, we check that the remainder of the action completes as expected. + */ + public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { + final int numberOfPrimaries = randomIntBetween(1, 3); + // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. + final int numberOfReplicas = randomIntBetween(1, 3); + final String phase = randomBoolean() ? "cold" : "frozen"; + final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, numberOfReplicas); + + final boolean pauseOnClone = randomBoolean(); + if (pauseOnClone) { + logger.info("--> pausing test on index clone step"); + configureClusterAllocation(false); + } + try { + // Enable/start ILM on the data stream. + updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - createComposableTemplate( - client(), - randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT), - dataStream, - new Template(null, null, null) - ); + if (pauseOnClone) { + assertForceMergeCloneIndexSettings(backingIndexName, numberOfPrimaries); + configureClusterAllocation(true); + } - for (int i = 0; i < randomIntBetween(5, 10); i++) { - indexDocument(client(), dataStream, true); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, true); + } catch (Exception | AssertionError e) { + // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. + configureClusterAllocation(true); + throw e; } + } - List backingIndices = getDataStreamBackingIndexNames(dataStream); - String backingIndexName = backingIndices.getFirst(); - Integer preLifecycleBackingIndexSegments = getNumberOfPrimarySegments(client(), backingIndexName); - assertThat(preLifecycleBackingIndexSegments, greaterThanOrEqualTo(1)); + /** + * Test that when we have a searchable snapshot action with force merge enabled and the source index has _zero_ replicas, + * we perform the force merge on the _source_ index and snapshot the source index. + */ + public void testSearchableSnapshotForceMergesSourceIndex() throws Exception { + // Data streams have 1 primary shard by default. + // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. + final String phase = randomBoolean() ? "cold" : "frozen"; + final int numberOfPrimaries = 1; + final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, 0); + + // Enable/start ILM on the data stream. + updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - // rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index - rolloverMaxOneDocCondition(client(), dataStream); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, false); + } - updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - assertTrue(waitUntil(() -> { - try { - Integer numberOfSegments = getNumberOfPrimarySegments(client(), backingIndexName); - logger.info("index {} has {} segments", backingIndexName, numberOfSegments); - // this is a loose assertion here as forcemerge is best effort - if (preLifecycleBackingIndexSegments > 1) { - return numberOfSegments < preLifecycleBackingIndexSegments; - } else { - // the index had only one segement to start with so nothing to assert - return true; - } - } catch (Exception e) { - try { - // if ILM executed the action already we don't have an index to assert on so we don't fail the test - return indexExists(backingIndexName) == false; - } catch (IOException ex) { - return false; - } - } - }, 60, TimeUnit.SECONDS)); + /** + * Test that when we have a searchable snapshot action with force merge enabled and the source index has _at least one_ replica, + * we perform the force merge on _the cloned index_ with 0 replicas and then snapshot the clone. + * This test simulates a failure during the clone step by pausing ILM on the clone step and then moving the index back to the cleanup + * step. We need to resort to simulation, as triggering the threshold in the ClusterStateWaitUntilThresholdStep is not feasible in a + * Java REST test. After re-enabling allocation, we check that the remainder of the action completes as expected. + */ + public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exception { + final int numberOfPrimaries = randomIntBetween(1, 3); + final int numberOfReplicas = randomIntBetween(1, 3); + final String phase = randomBoolean() ? "cold" : "frozen"; + final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, numberOfReplicas); + + configureClusterAllocation(false); + try { + // Enable/start ILM on the data stream. + updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName; - assertTrue(waitUntil(() -> { - try { - return indexExists(restoredIndexName); - } catch (IOException e) { - return false; - } - }, 60, TimeUnit.SECONDS)); + assertForceMergeCloneIndexSettings(backingIndexName, numberOfPrimaries); - assertBusy( - () -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); }, - 30, - TimeUnit.SECONDS - ); + TimeSeriesRestDriver.moveIndexToStep( + client(), + backingIndexName, + new Step.StepKey(phase, "searchable_snapshot", "clone"), + new Step.StepKey(phase, "searchable_snapshot", "cleanup-generated-index") + ); + + configureClusterAllocation(true); + + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, true); + } catch (Exception | AssertionError e) { + // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. + configureClusterAllocation(true); + throw e; + } } @SuppressWarnings("unchecked") @@ -306,8 +342,8 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws // rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index indexDocument(client(), dataStream, true); - var backingIndices = getDataStreamBackingIndexNames(dataStream); - String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndices.get(0); + String backingIndexName = getDataStreamBackingIndexNames(dataStream).getFirst(); + String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName; assertTrue(waitUntil(() -> { try { return indexExists(restoredIndexName); @@ -321,6 +357,8 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws assertThat(stepKeyForIndex.phase(), is("hot")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); createPolicy( client(), @@ -398,6 +436,8 @@ public void testRestoredIndexManagedByLocalPolicySkipsIllegalActions() throws Ex assertThat(stepKeyForIndex.phase(), is("hot")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); // snapshot the data stream String dsSnapshotName = "snapshot_ds_" + dataStream; @@ -502,6 +542,8 @@ public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception { assertThat(stepKeyForIndex.phase(), is("cold")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(index); Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); Response response = client().performRequest(getSnaps); @@ -563,6 +605,8 @@ public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(index); Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); Response response = client().performRequest(getSnaps); @@ -889,8 +933,8 @@ public void testSearchableSnapshotsInHotPhasePinnedToHotNodes() throws Exception // searchable snapshots mounted in the hot phase should be pinned to hot nodes assertThat(hotIndexSettings.get(DataTier.TIER_PREFERENCE), is("data_hot")); - assertOK(client().performRequest(new Request("DELETE", "_data_stream/" + dataStream))); - assertOK(client().performRequest(new Request("DELETE", "_ilm/policy/" + policy))); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(firstGenIndex); } // See: https://github.com/elastic/elasticsearch/issues/77269 @@ -978,6 +1022,8 @@ public void testSearchableSnapshotTotalShardsPerNode() throws Exception { assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(index); // validate total_shards_per_node setting Map indexSettings = getIndexSettingsAsMap(searchableSnapMountedIndexName); @@ -1053,6 +1099,8 @@ public void testSearchableSnapshotReplicateFor() throws Exception { Integer numberOfReplicas = Integer.valueOf((String) indexSettings.get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())); assertThat(numberOfReplicas, is(1)); } + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); // tweak the policy to replicate_for hardly any time at all createPolicy( @@ -1101,4 +1149,153 @@ private Step.StepKey getKeyForIndex(Response response, String indexName) throws String step = (String) indexResponse.get("step"); return new Step.StepKey(phase, action, step); } + + /** + * Prepares a data stream with the specified number of primary and replica shards, + * creates a snapshot repository and ILM policy, applies a composable template, + * indexes several documents, and performs a rollover. Returns the name of the + * first generation backing index. + */ + private String prepareDataStreamWithDocs(String phase, int numberOfPrimaries, int numberOfReplicas) throws Exception { + logger.info( + "--> running [{}] with [{}] primaries, [{}] replicas, in phase [{}}", + getTestName(), + numberOfPrimaries, + numberOfReplicas, + phase + ); + createSnapshotRepo(client(), snapshotRepo, randomBoolean()); + createNewSingletonPolicy(client(), policy, phase, new SearchableSnapshotAction(snapshotRepo, true)); + + createComposableTemplate( + client(), + randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT), + dataStream, + new Template(indexSettings(numberOfPrimaries, numberOfReplicas).build(), null, null) + ); + for (int i = 0; i < randomIntBetween(5, 10); i++) { + indexDocument(client(), dataStream, true); + } + final var backingIndexName = getDataStreamBackingIndexNames(dataStream).getFirst(); + + // Retrieve the number of segments in the first (random) shard of the backing index. + final Integer preLifecycleBackingIndexSegments = getNumberOfPrimarySegments(client(), backingIndexName); + // If we have only one primary shard, we expect at least one segment. We're hoping to get multiple segments, but we can't guarantee + // that, so we have to resort to a "greater than or equal to" check. + if (numberOfPrimaries == 1) { + assertThat(preLifecycleBackingIndexSegments, greaterThanOrEqualTo(1)); + } else { + // With multiple primary shards, the segments are more spread out, so it's even less likely that we'll get more than 1 segment + // in one shard, and some shards might even be empty. + assertThat(preLifecycleBackingIndexSegments, greaterThanOrEqualTo(0)); + } + + // Rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index. + rolloverMaxOneDocCondition(client(), dataStream); + // Wait for all shards to be allocated. + ensureGreen(dataStream); + return backingIndexName; + } + + /** + * Waits for the force-merge clone index to exist and asserts that it has the correct number of primary shards and zero replicas, + * and that its name follows the expected naming convention. + */ + private static void assertForceMergeCloneIndexSettings(String backingIndexName, int numberOfPrimaries) throws Exception { + final String forceMergeIndexPattern = FORCE_MERGE_CLONE_INDEX_PREFIX + "*" + backingIndexName; + assertBusy(() -> { + // The force-merged index is hidden, so we need to expand wildcards. + Request request = new Request("GET", "/" + forceMergeIndexPattern + "/_settings?expand_wildcards=all&flat_settings=true"); + Response response = client().performRequest(request); + final Map indicesSettings = entityAsMap(response); + assertThat( + "expected only settings for index: " + forceMergeIndexPattern + ", but got " + indicesSettings, + indicesSettings.size(), + equalTo(1) + ); + final String forceMergeCloneIndexName = indicesSettings.keySet().iterator().next(); + assertThat(forceMergeCloneIndexName, startsWith(FORCE_MERGE_CLONE_INDEX_PREFIX)); + assertThat(forceMergeCloneIndexName, endsWith(backingIndexName)); + @SuppressWarnings("unchecked") + final var forceMergeIndexResponse = (Map) indicesSettings.get(forceMergeCloneIndexName); + @SuppressWarnings("unchecked") + final Map forceMergeIndexSettings = (Map) forceMergeIndexResponse.get("settings"); + assertThat(forceMergeIndexSettings.get("index.number_of_shards"), equalTo(String.valueOf(numberOfPrimaries))); + assertThat(forceMergeIndexSettings.get("index.number_of_replicas"), equalTo("0")); + }); + } + + /** + * Asserts that the restored searchable snapshot index exists, the original and force-merge clone indices are deleted, + * and the restored index has the expected number of segments. Also verifies the snapshot index naming conventions. + * + * @param phase The phase of the ILM policy that the searchable snapshot action runs in. + * @param backingIndexName The original backing index name. + * @param numberOfPrimaries The number of primaries that the original backing index had, affecting segment count assertions. + * @param withReplicas True if the original backing index had one or more replicas, affecting snapshot index naming assertions. + */ + private void assertForceMergedSnapshotDone(String phase, String backingIndexName, int numberOfPrimaries, boolean withReplicas) + throws Exception { + final String prefix = phase.equals("cold") + ? SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + : SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX; + final String restoredIndexName = prefix + backingIndexName; + awaitIndexExists(restoredIndexName); + + assertBusy(() -> assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME))); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); + // Regardless of whether we force merged the backing index or a clone, the cloned index should not exist (anymore). + awaitIndexDoesNotExist(FORCE_MERGE_CLONE_INDEX_PREFIX + "-*-" + backingIndexName); + + // Retrieve the total number of segments across all primary shards of the restored index. + final Integer numberOfPrimarySegments = getNumberOfPrimarySegments(client(), restoredIndexName); + // If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment. + if (numberOfPrimaries > 1 || phase.equals("frozen")) { + assertThat(numberOfPrimarySegments, lessThanOrEqualTo(numberOfPrimaries)); + } else { + // If the backing index had only one primary, we expect exactly 1 segment after force merging. + assertThat(numberOfPrimarySegments, equalTo(1)); + } + + // We can't assert the replicas of the mounted snapshot as it's created with 0 replicas by default, but we can at least assert + // that the mounted snapshot was taken from the correct source index. + final List> snapshots = getSnapshots(); + assertThat("expected to have only one snapshot, but got: " + snapshots, snapshots.size(), equalTo(1)); + final Map snapshot = snapshots.getFirst(); + @SuppressWarnings("unchecked") + final List indices = (List) snapshot.get("indices"); + assertThat("expected to have only one index, but got: " + indices, indices.size(), equalTo(1)); + final String snapshotIndexName = indices.getFirst(); + // If the backing index had replicas, we force merged a clone, so the snapshot index name should match the clone naming pattern. + if (withReplicas) { + assertThat( + "expected index to start with the force merge prefix", + snapshotIndexName, + startsWith(FORCE_MERGE_CLONE_INDEX_PREFIX) + ); + assertThat("expected index to end with the backing index name", snapshotIndexName, endsWith(backingIndexName)); + } else { + // If the backing index had no replicas, we force merged the backing index itself, so the snapshot index name should be equal + // to the backing index name. + assertThat("expected index to be the backing index name", snapshotIndexName, equalTo(backingIndexName)); + } + } + + private List> getSnapshots() throws IOException { + Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); + Response response = client().performRequest(getSnaps); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + return objectPath.evaluate("snapshots"); + } + + /** + * Updates the cluster settings to enable or disable shard allocation in the whole cluster. + */ + private static void configureClusterAllocation(boolean enable) throws IOException { + final String value = enable ? null : "none"; + updateClusterSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), value).build() + ); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java index bac3aa8e6ac38..463f2d868f30b 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java @@ -223,7 +223,8 @@ static IndexLifecycleExplainResponse getIndexLifecycleExplainResponse( stepInfoBytes, previousStepInfoBytes, phaseExecutionInfo, - LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(idxSettings) + LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(idxSettings), + lifecycleState.forceMergeCloneIndexName() ); } else { indexResponse = null;