From 43eef91867a8fb9e5ca6897525a5637b1462c1dc Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Mon, 1 Sep 2025 19:26:19 +0200 Subject: [PATCH 01/17] ILM: Force merge on zero-replica cloned index before snapshot When performing a searchable snapshot action with force merge enabled, if the source index has one or more replicas, ILM now clones the index with zero replicas and performs the force merge on the clone. The snapshot is then taken from the force-merged clone instead of the source index, ensuring only primary shards are force-merged. The cloned index is deleted after the snapshot is mounted, and all references and step logic have been updated accordingly. Test coverage was added for the new flow, including handling retries and cleanup of failed clones. Key changes: - Execution state: Track the force-merged clone index in ILM state and propagate through relevant APIs. - SearchableSnapshotAction: Add conditional steps to clone the index with 0 replicas, force-merge, and delete the clone as needed. - Steps: Update ForceMerge, SegmentCount, Snapshot, and Delete steps to operate on the correct index (source or clone). - Tests/QA: Add and enhance tests to verify force-merge and snapshot behavior with and without replicas, including retry/cleanup paths and configuration for stable force-merges. Resolves #75478 --- .../metadata/LifecycleExecutionState.java | 23 +- .../test/rest/ESRestTestCase.java | 4 + .../xpack/core/ilm/CreateSnapshotStep.java | 9 +- .../xpack/core/ilm/DeleteStep.java | 32 ++- .../xpack/core/ilm/ForceMergeStep.java | 4 +- .../core/ilm/GenerateSnapshotNameStep.java | 5 +- .../ilm/IndexLifecycleExplainResponse.java | 36 ++- .../xpack/core/ilm/MountSnapshotStep.java | 5 +- .../core/ilm/SearchableSnapshotAction.java | 116 +++++++- .../xpack/core/ilm/SegmentCountStep.java | 13 +- .../IndexLifecycleExplainResponseTests.java | 14 +- .../ilm/SearchableSnapshotActionTests.java | 32 +++ x-pack/plugin/ilm/qa/multi-node/build.gradle | 2 + .../xpack/TimeSeriesRestDriver.java | 23 ++ .../actions/SearchableSnapshotActionIT.java | 268 ++++++++++++++---- .../TransportExplainLifecycleAction.java | 3 +- 16 files changed, 500 insertions(+), 89 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java index 77f1135da2505..cefac0fb968b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java @@ -40,7 +40,8 @@ public record LifecycleExecutionState( String snapshotName, String shrinkIndexName, String snapshotIndexName, - String downsampleIndexName + String downsampleIndexName, + String forceMergeIndexName ) { public static final String ILM_CUSTOM_METADATA_KEY = "ilm"; @@ -64,6 +65,7 @@ public record LifecycleExecutionState( private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name"; private static final String SHRINK_INDEX_NAME = "shrink_index_name"; private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name"; + private static final String FORCE_MERGE_INDEX_NAME = "force_merge_index_name"; public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build(); @@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) { .setShrinkIndexName(state.shrinkIndexName) .setSnapshotIndexName(state.snapshotIndexName) .setDownsampleIndexName(state.downsampleIndexName) - .setStepTime(state.stepTime); + .setStepTime(state.stepTime) + .setForceMergeIndexName(state.forceMergeIndexName); } public static LifecycleExecutionState fromCustomMetadata(Map customData) { @@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map cus if (downsampleIndexName != null) { builder.setDownsampleIndexName(downsampleIndexName); } + String forceMergeIndexName = customData.get(FORCE_MERGE_INDEX_NAME); + if (forceMergeIndexName != null) { + builder.setForceMergeIndexName(forceMergeIndexName); + } return builder.build(); } @@ -274,6 +281,9 @@ public Map asMap() { if (downsampleIndexName != null) { result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName); } + if (forceMergeIndexName != null) { + result.put(FORCE_MERGE_INDEX_NAME, forceMergeIndexName); + } return Collections.unmodifiableMap(result); } @@ -307,6 +317,7 @@ public static class Builder { private String shrinkIndexName; private String snapshotIndexName; private String downsampleIndexName; + private String forceMergeIndexName; public Builder setPhase(String phase) { this.phase = phase; @@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) { return this; } + public Builder setForceMergeIndexName(String forceMergeIndexName) { + this.forceMergeIndexName = forceMergeIndexName; + return this; + } + public LifecycleExecutionState build() { return new LifecycleExecutionState( phase, @@ -417,7 +433,8 @@ public LifecycleExecutionState build() { snapshotName, shrinkIndexName, snapshotIndexName, - downsampleIndexName + downsampleIndexName, + forceMergeIndexName ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 3b0470968e2f6..100c523b21374 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2087,6 +2087,10 @@ protected static void awaitIndexExists(String index, TimeValue timeout) throws I ensureHealth(client(), index, request -> request.addParameter("timeout", timeout.toString())); } + protected static void awaitIndexDoesNotExist(String index) throws Exception { + awaitIndexDoesNotExist(index, TimeValue.timeValueSeconds(10)); + } + protected static void awaitIndexDoesNotExist(String index, TimeValue timeout) throws Exception { assertBusy(() -> assertFalse(indexExists(index)), timeout.millis(), TimeUnit.MILLISECONDS); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java index 7a85ead2d1472..f3a2851d62711 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java @@ -101,8 +101,11 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList ); return; } + // If we performed the force merge step on the cloned index, we need to snapshot that index instead of the original. + final String clonedIndexName = lifecycleState.forceMergeIndexName(); + final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexName; CreateSnapshotRequest request = new CreateSnapshotRequest(TimeValue.MAX_VALUE, snapshotRepository, snapshotName); - request.indices(indexName); + request.indices(forceMergedIndexName); // this is safe as the snapshot creation will still be async, it's just that the listener will be notified when the snapshot is // complete request.waitForCompletion(true); @@ -112,7 +115,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList logger.debug( "create snapshot response for policy [{}] and index [{}] is: {}", policyName, - indexName, + forceMergedIndexName, Strings.toString(response) ); final SnapshotInfo snapInfo = response.getSnapshotInfo(); @@ -128,7 +131,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList snapshotRepository, snapshotName, policyName, - indexName, + forceMergedIndexName, snapInfo.failedShards(), snapInfo.totalShards() ) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java index a6cedfceb1fbe..23de791ea201c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java @@ -16,26 +16,54 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import java.util.function.BiFunction; + /** * Deletes a single index. */ public class DeleteStep extends AsyncRetryDuringSnapshotActionStep { + public static final String NAME = "delete"; private static final Logger logger = LogManager.getLogger(DeleteStep.class); + private static final BiFunction DEFAULT_TARGET_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> indexName; + + private final BiFunction targetIndexNameSupplier; + private final boolean indexSurvives; + /** + * Use this constructor to delete the index that ILM is currently operating on. + */ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) { + this(key, nextStepKey, client, DEFAULT_TARGET_INDEX_NAME_SUPPLIER, false); + } + + /** + * Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on. + */ + public DeleteStep( + StepKey key, + StepKey nextStepKey, + Client client, + BiFunction targetIndexNameSupplier, + boolean indexSurvives + ) { super(key, nextStepKey, client); + this.targetIndexNameSupplier = targetIndexNameSupplier; + this.indexSurvives = indexSurvives; } @Override public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener listener) { String policyName = indexMetadata.getLifecyclePolicyName(); - String indexName = indexMetadata.getIndex().getName(); + String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState()); IndexAbstraction indexAbstraction = currentProject.getIndicesLookup().get(indexName); assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found"; DataStream dataStream = indexAbstraction.getParentDataStream(); @@ -88,7 +116,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata @Override public boolean indexSurvives() { - return false; + return indexSurvives; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java index 0fc0ac1f91f6a..68b287bbeb5f4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java @@ -51,7 +51,9 @@ public void performAction( ClusterStateObserver observer, ActionListener listener ) { - String indexName = indexMetadata.getIndex().getName(); + // Use the cloned index name if we have one, otherwise fall back to the original index name. + String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName(); + String indexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName(); ForceMergeRequest request = new ForceMergeRequest(indexName); request.maxNumSegments(maxNumSegments); getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java index 2606799fe8a88..b7b47473de757 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java @@ -72,9 +72,12 @@ public ProjectState performAction(Index index, ProjectState projectState) { + "] cannot continue until the repository is created or the policy is changed" ); } + // If we performed the force merge step on the cloned index, we perform the snapshot on that index instead of the original. + final String clonedIndexName = lifecycleState.forceMergeIndexName(); + final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : index.getName(); LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState); - newLifecycleState.setSnapshotIndexName(index.getName()); + newLifecycleState.setSnapshotIndexName(forceMergedIndexName); newLifecycleState.setSnapshotRepository(snapshotRepository); if (lifecycleState.snapshotName() == null) { // generate and validate the snapshotName diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java index 7b1a68325574b..07cd672e8f588 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java @@ -57,6 +57,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl private static final ParseField SHRINK_INDEX_NAME = new ParseField("shrink_index_name"); private static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name"); private static final ParseField SKIP_NAME = new ParseField("skip"); + private static final ParseField FORCE_MERGE_INDEX_NAME = new ParseField("force_merge_index_name"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "index_lifecycle_explain_response", @@ -81,7 +82,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl (BytesReference) a[11], (BytesReference) a[21], (PhaseExecutionInfo) a[12], - Objects.requireNonNullElse((Boolean) a[22], false) + Objects.requireNonNullElse((Boolean) a[22], false), + (String) a[24] // a[13] == "age" // a[20] == "time_since_index_creation" // a[23] = "age_in_millis" @@ -124,6 +126,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl }, PREVIOUS_STEP_INFO_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SKIP_NAME); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), AGE_IN_MILLIS_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX_NAME); } private final String index; @@ -147,6 +150,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl private final String snapshotName; private final String shrinkIndexName; private final boolean skip; + private final String forceMergeIndexName; Supplier nowSupplier = System::currentTimeMillis; // Can be changed for testing @@ -170,7 +174,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse( BytesReference stepInfo, BytesReference previousStepInfo, PhaseExecutionInfo phaseExecutionInfo, - boolean skip + boolean skip, + String forceMergeIndexName ) { return new IndexLifecycleExplainResponse( index, @@ -193,7 +198,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse( stepInfo, previousStepInfo, phaseExecutionInfo, - skip + skip, + forceMergeIndexName ); } @@ -219,7 +225,8 @@ public static IndexLifecycleExplainResponse newUnmanagedIndexResponse(String ind null, null, null, - false + false, + null ); } @@ -244,7 +251,8 @@ private IndexLifecycleExplainResponse( BytesReference stepInfo, BytesReference previousStepInfo, PhaseExecutionInfo phaseExecutionInfo, - boolean skip + boolean skip, + String forceMergeIndexName ) { if (managedByILM) { if (policyName == null) { @@ -313,6 +321,7 @@ private IndexLifecycleExplainResponse( this.snapshotName = snapshotName; this.shrinkIndexName = shrinkIndexName; this.skip = skip; + this.forceMergeIndexName = forceMergeIndexName; } public IndexLifecycleExplainResponse(StreamInput in) throws IOException { @@ -351,6 +360,8 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { } else { skip = false; } + // No need for serialization from this point onwards as this action only runs on the local node. + forceMergeIndexName = null; } else { policyName = null; lifecycleDate = null; @@ -371,6 +382,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { shrinkIndexName = null; indexCreationDate = null; skip = false; + forceMergeIndexName = null; } } @@ -405,6 +417,7 @@ public void writeTo(StreamOutput out) throws IOException { || out.getTransportVersion().onOrAfter(TransportVersions.ILM_ADD_SKIP_SETTING)) { out.writeBoolean(skip); } + // No need for deserialization from this point onwards as this action only runs on the local node. } } @@ -508,6 +521,10 @@ public boolean getSkip() { return skip; } + public String getForceMergeIndexName() { + return forceMergeIndexName; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -595,6 +612,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(PHASE_EXECUTION_INFO.getPreferredName(), phaseExecutionInfo); } builder.field(SKIP_NAME.getPreferredName(), skip); + if (forceMergeIndexName != null) { + builder.field(FORCE_MERGE_INDEX_NAME.getPreferredName(), forceMergeIndexName); + } } builder.endObject(); return builder; @@ -623,7 +643,8 @@ public int hashCode() { stepInfo, previousStepInfo, phaseExecutionInfo, - skip + skip, + forceMergeIndexName ); } @@ -656,7 +677,8 @@ public boolean equals(Object obj) { && Objects.equals(stepInfo, other.stepInfo) && Objects.equals(previousStepInfo, other.previousStepInfo) && Objects.equals(phaseExecutionInfo, other.phaseExecutionInfo) - && Objects.equals(skip, other.skip); + && Objects.equals(skip, other.skip) + && Objects.equals(forceMergeIndexName, other.forceMergeIndexName); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java index 58ac74a420b9f..eaa1297008488 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java @@ -141,14 +141,14 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren logger.debug( "index [{}] using policy [{}] does not have a stored snapshot index name, " + "using our best effort guess of [{}] for the original snapshotted index name", - indexMetadata.getIndex().getName(), + indexName, policyName, indexName ); } else { indexName = searchableSnapshotMetadata.sourceIndex(); } - } else { + } else if (snapshotIndexName.equals(indexName) == false) { // Use the name of the snapshot as specified in the metadata, because the current index // name not might not reflect the name of the index actually in the snapshot logger.debug( @@ -158,6 +158,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren policyName, snapshotIndexName ); + // Note: this will update the indexName to the force-merged index name if we performed the force-merge on a cloned index. indexName = snapshotIndexName; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index b06b3a20bc0d3..525006d806491 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -35,6 +36,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; import static org.elasticsearch.TransportVersions.ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; @@ -56,12 +59,22 @@ public class SearchableSnapshotAction implements LifecycleAction { public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index"); public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node"); public static final ParseField REPLICATE_FOR = new ParseField("replicate_for"); - public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites"; public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot"; + public static final String CONDITIONAL_SKIP_CLONE_STEP = BranchingStep.NAME + "-skip-clone-check"; + public static final String WAIT_FOR_CLONED_INDEX_GREEN = WaitForIndexColorStep.NAME + "-cloned-index"; + public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; + public static final String CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY = BranchingStep.NAME + "-delete-force-merged-index"; + public static final String DELETE_FORCE_MERGED_INDEX_KEY = DeleteStep.NAME + "-force-merged-index"; public static final String FULL_RESTORED_INDEX_PREFIX = "restored-"; public static final String PARTIAL_RESTORED_INDEX_PREFIX = "partial-"; + public static final String FORCE_MERGE_INDEX_PREFIX = "force-merge-"; + public static final BiFunction FORCE_MERGE_INDEX_NAME_SUPPLIER = (indexName, state) -> state + .forceMergeIndexName(); + + private static final Settings CLONE_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + private static final Function CLONE_SETTINGS_SUPPLIER = indexMetadata -> CLONE_SETTINGS; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, @@ -163,9 +176,17 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac StepKey checkNoWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME); StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME); StepKey waitTimeSeriesEndTimePassesKey = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME); + StepKey skipGeneratingSnapshotKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_GENERATE_AND_CLEAN); + + StepKey conditionalSkipCloneKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_CLONE_STEP); + StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME); + StepKey cleanupClonedIndexKey = new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME); + StepKey generateCloneIndexNameKey = new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME); + StepKey cloneIndexKey = new StepKey(phase, NAME, ResizeIndexStep.CLONE); + StepKey waitForClonedIndexGreenKey = new StepKey(phase, NAME, WAIT_FOR_CLONED_INDEX_GREEN); StepKey forceMergeStepKey = new StepKey(phase, NAME, ForceMergeStep.NAME); StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME); - StepKey skipGeneratingSnapshotKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_GENERATE_AND_CLEAN); + StepKey generateSnapshotNameKey = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME); StepKey cleanSnapshotKey = new StepKey(phase, NAME, CleanupSnapshotStep.NAME); StepKey createSnapshotKey = new StepKey(phase, NAME, CreateSnapshotStep.NAME); @@ -177,7 +198,9 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac StepKey copyLifecyclePolicySettingKey = new StepKey(phase, NAME, CopySettingsStep.NAME); StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME); StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME); - StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); + StepKey deleteSourceIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); + StepKey conditionalDeleteForceMergedIndexKey = new StepKey(phase, NAME, CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY); + StepKey deleteForceMergedIndexKey = new StepKey(phase, NAME, DELETE_FORCE_MERGED_INDEX_KEY); StepKey replicateForKey = new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME); StepKey dropReplicasKey = new StepKey(phase, NAME, UpdateSettingsStep.NAME); @@ -269,9 +292,9 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac Instant::now ); - // When generating a snapshot, we either jump to the force merge step, or we skip the + // When generating a snapshot, we either jump to the force merge section, or we skip the // forcemerge and go straight to steps for creating the snapshot - StepKey keyForSnapshotGeneration = forceMergeIndex ? forceMergeStepKey : generateSnapshotNameKey; + StepKey keyForSnapshotGeneration = forceMergeIndex ? conditionalSkipCloneKey : generateSnapshotNameKey; // Branch, deciding whether there is an existing searchable snapshot that can be used for mounting the index // (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot BranchingStep skipGeneratingSnapshotStep = new BranchingStep( @@ -328,6 +351,55 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); // If a new snapshot is needed, these steps are executed + // If the index has replicas, we need to clone the index first with 0 replicas and perform the force-merge on that index. + // That avoids us having to force-merge the replica shards too, which is a waste, as the snapshot will only be taken from the + // primary shards. If the index already has 0 replicas, we can skip the clone steps. + BranchingStep conditionalSkipCloneStep = new BranchingStep( + conditionalSkipCloneKey, + readOnlyKey, + forceMergeStepKey, + (index, project) -> { + IndexMetadata indexMetadata = project.index(index); + assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state"; + return indexMetadata.getNumberOfReplicas() == 0; + } + ); + ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, cleanupClonedIndexKey, client, false); + // If a previous step created a clone index but the action did not complete, we need to clean up the old clone index. + CleanupGeneratedIndexStep cleanupClonedIndexStep = new CleanupGeneratedIndexStep( + cleanupClonedIndexKey, + generateCloneIndexNameKey, + client, + FORCE_MERGE_INDEX_NAME_SUPPLIER + ); + GenerateUniqueIndexNameStep generateCloneIndexNameStep = new GenerateUniqueIndexNameStep( + generateCloneIndexNameKey, + cloneIndexKey, + FORCE_MERGE_INDEX_PREFIX, + (generatedIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setForceMergeIndexName(generatedIndexName) + ); + // Clone the index with 0 replicas. + ResizeIndexStep cloneIndexStep = new ResizeIndexStep( + cloneIndexKey, + waitForClonedIndexGreenKey, + client, + ResizeType.CLONE, + FORCE_MERGE_INDEX_NAME_SUPPLIER, + CLONE_SETTINGS_SUPPLIER, + null + ); + // Wait for the cloned index to be green before proceeding with the force-merge. We wrap this with a + // ClusterStateWaitUntilThresholdStep to avoid waiting forever if the index cannot be started for some reason. + // On timeout, ILM will move back to the cleanup step, remove the cloned index, and retry the clone. + ClusterStateWaitUntilThresholdStep waitForClonedIndexGreenStep = new ClusterStateWaitUntilThresholdStep( + new WaitForIndexColorStep( + waitForClonedIndexGreenKey, + forceMergeStepKey, + ClusterHealthStatus.GREEN, + FORCE_MERGE_INDEX_NAME_SUPPLIER + ), + cleanupClonedIndexKey + ); ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeStepKey, waitForSegmentCountKey, client, 1); SegmentCountStep segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, generateSnapshotNameKey, client, 1); GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep( @@ -371,10 +443,30 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); CopySettingsStep copySettingsStep = new CopySettingsStep( copyLifecyclePolicySettingKey, - dataStreamCheckBranchingKey, + conditionalDeleteForceMergedIndexKey, (index, lifecycleState) -> getRestoredIndexPrefix(copyLifecyclePolicySettingKey) + index, LifecycleSettings.LIFECYCLE_NAME ); + // If we cloned the index, we need to delete it before we swap the mounted snapshot in place of the original index. + // If we did not clone the index, there's nothing else for us to do. + BranchingStep conditionalDeleteForceMergedIndexStep = new BranchingStep( + conditionalDeleteForceMergedIndexKey, + dataStreamCheckBranchingKey, + deleteForceMergedIndexKey, + (index, project) -> { + IndexMetadata indexMetadata = project.index(index); + assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state"; + String cloneIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName(); + return cloneIndexName != null && project.index(cloneIndexName) != null; + } + ); + DeleteStep deleteForceMergedIndexStep = new DeleteStep( + deleteForceMergedIndexKey, + dataStreamCheckBranchingKey, + client, + FORCE_MERGE_INDEX_NAME_SUPPLIER, + true + ); BranchingStep isDataStreamBranchingStep = new BranchingStep( dataStreamCheckBranchingKey, swapAliasesKey, @@ -387,10 +479,10 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep( replaceDataStreamIndexKey, - deleteIndexKey, + deleteSourceIndexKey, (index, executionState) -> getRestoredIndexPrefix(replaceDataStreamIndexKey) + index ); - DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, null, client); + DeleteStep deleteSourceIndexStep = new DeleteStep(deleteSourceIndexKey, null, client); // sending this step to null as the restored index (which will after this step essentially be the source index) was sent to the next // key after we restored the lifecycle execution state SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep( @@ -417,6 +509,12 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac steps.add(waitUntilTimeSeriesEndTimeStep); steps.add(skipGeneratingSnapshotStep); if (forceMergeIndex) { + steps.add(conditionalSkipCloneStep); + steps.add(readOnlyStep); + steps.add(cleanupClonedIndexStep); + steps.add(generateCloneIndexNameStep); + steps.add(cloneIndexStep); + steps.add(waitForClonedIndexGreenStep); steps.add(forceMergeStep); steps.add(segmentCountStep); } @@ -432,6 +530,8 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac steps.add(replicateForStep); steps.add(dropReplicasStep); } + steps.add(conditionalDeleteForceMergedIndexStep); + steps.add(deleteForceMergedIndexStep); steps.add(isDataStreamBranchingStep); steps.add(replaceDataStreamBackingIndex); steps.add(deleteSourceIndexStep); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java index 5ad7c12cd6e33..d4cf3c53c9000 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -58,16 +57,18 @@ public int getMaxNumSegments() { @Override public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { - Index index = indexMetadata.getIndex(); + // Use the cloned index name if we have one, otherwise fall back to the original index name. + String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName(); + String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName(); getClient(state.projectId()).admin() .indices() - .segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> { - IndexSegments idxSegments = response.getIndices().get(index.getName()); + .segments(new IndicesSegmentsRequest(forceMergedIndexName), ActionListener.wrap(response -> { + IndexSegments idxSegments = response.getIndices().get(forceMergedIndexName); if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) { final DefaultShardOperationFailedException[] failures = response.getShardFailures(); logger.info( "[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}", - index.getName(), + forceMergedIndexName, response.getFailedShards(), failures == null ? "n/a" @@ -86,7 +87,7 @@ public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, L .collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size())); logger.info( "[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}", - index.getName(), + forceMergedIndexName, maxNumSegments, unmergedShards.size(), unmergedShardCounts diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java index d001ecfa9e449..ee56b740d6611 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java @@ -79,7 +79,9 @@ private static IndexLifecycleExplainResponse randomManagedIndexExplainResponse() randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : PhaseExecutionInfoTests.randomPhaseExecutionInfo(""), - randomBoolean() + randomBoolean(), + // We don't mutate any fields from this point onwards as we don't (de)serialize them, as the action is run on the local node + null ); } @@ -107,7 +109,8 @@ public void testInvalidStepDetails() { randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : new BytesArray(new RandomStepInfo(() -> randomAlphaOfLength(10)).toString()), randomBoolean() ? null : PhaseExecutionInfoTests.randomPhaseExecutionInfo(""), - randomBoolean() + randomBoolean(), + randomBoolean() ? null : randomAlphaOfLength(10) ) ); assertThat(exception.getMessage(), startsWith("managed index response must have complete step details")); @@ -144,7 +147,8 @@ public void testIndexAges() throws IOException { null, null, null, - false + false, + null ); assertThat(managedExplainResponse.getLifecycleDate(), is(notNullValue())); Long now = 1_000_000L; @@ -331,7 +335,9 @@ protected IndexLifecycleExplainResponse mutateInstance(IndexLifecycleExplainResp stepInfo, previousStepInfo, phaseExecutionInfo, - skip + skip, + // We don't mutate any fields from this point onwards as we don't (de)serialize them, as the action is run on the local node + instance.getForceMergeIndexName() ); } else { return switch (between(0, 1)) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java index c6119b272a8c0..a55158170dfff 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java @@ -6,9 +6,13 @@ */ package org.elasticsearch.xpack.core.ilm; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; @@ -50,6 +54,7 @@ public void testToSteps() { CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(index); assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(index - 1))); validateWaitForDataTierStep(phase, steps, index + 1, index + 2); + validateForceMergeClone(action.isForceMergeIndex(), steps); } private void validateWaitForDataTierStep(String phase, List steps, int waitForDataTierStepIndex, int mountStepIndex) { @@ -62,6 +67,25 @@ private void validateWaitForDataTierStep(String phase, List steps, int wai } } + /** + * Validate that the {@link ResizeIndexStep} used to clone the index for force merging configures the target index with 0 replicas. + */ + private void validateForceMergeClone(boolean isForceMergeIndex, List steps) { + if (isForceMergeIndex == false) { + return; + } + ResizeIndexStep cloneStep = (ResizeIndexStep) steps.stream() + .filter(step -> step instanceof ResizeIndexStep) + .findFirst() + .orElseThrow(); + assertThat(cloneStep.getResizeType(), is(ResizeType.CLONE)); + var indexMetadata = IndexMetadata.builder(randomAlphaOfLength(5)) + .settings(indexSettings(IndexVersion.current(), randomIntBetween(1, 5), randomIntBetween(0, 5))) + .build(); + Settings cloneIndexSettings = cloneStep.getTargetIndexSettingsSupplier().apply(indexMetadata); + assertThat(cloneIndexSettings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, -1), is(0)); + } + public void testPrefixAndStorageTypeDefaults() { StepKey nonFrozenKey = new StepKey(randomFrom("hot", "warm", "cold", "delete"), randomAlphaOfLength(5), randomAlphaOfLength(5)); StepKey frozenKey = new StepKey("frozen", randomAlphaOfLength(5), randomAlphaOfLength(5)); @@ -102,6 +126,12 @@ private List expectedStepKeys(String phase, boolean forceMergeIndex, bo new StepKey(phase, NAME, WaitForNoFollowersStep.NAME), new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME), new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN), + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_CLONE_STEP) : null, + forceMergeIndex ? new StepKey(phase, NAME, ReadOnlyStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, CleanupGeneratedIndexStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, ResizeIndexStep.CLONE) : null, + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.WAIT_FOR_CLONED_INDEX_GREEN) : null, forceMergeIndex ? new StepKey(phase, NAME, ForceMergeStep.NAME) : null, forceMergeIndex ? new StepKey(phase, NAME, SegmentCountStep.NAME) : null, new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME), @@ -114,6 +144,8 @@ private List expectedStepKeys(String phase, boolean forceMergeIndex, bo new StepKey(phase, NAME, CopySettingsStep.NAME), hasReplicateFor ? new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME) : null, hasReplicateFor ? new StepKey(phase, NAME, UpdateSettingsStep.NAME) : null, + new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY), + new StepKey(phase, NAME, SearchableSnapshotAction.DELETE_FORCE_MERGED_INDEX_KEY), new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY), new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME), new StepKey(phase, NAME, DeleteStep.NAME), diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index 4cd41e58b11ac..f23bab6157339 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -44,6 +44,8 @@ testClusters.configureEach { */ setting 'thread_pool.estimated_time_interval', '0' setting 'time_series.poll_interval', '10m' + // Disable shard balancing to avoid force merges failing due to relocating shards. + setting 'cluster.routing.rebalance.enable', 'none' } if (buildParams.inFipsJvm){ diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java index c90958254fb99..47a302973883d 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; @@ -413,6 +414,28 @@ public static void updatePolicy(RestClient client, String indexName, String poli assertOK(client.performRequest(changePolicyRequest)); } + /** + * Moves the specified index from the current ILM step to the next step. + */ + public static void moveIndexToStep(RestClient client, String indexName, Step.StepKey currentStep, Step.StepKey nextStep) + throws IOException { + Request moveToStepRequest = new Request("POST", "_ilm/move/" + indexName); + moveToStepRequest.setJsonEntity(Strings.format(""" + { + "current_step": { + "phase": "%s", + "action": "%s", + "name": "%s" + }, + "next_step": { + "phase": "%s", + "action": "%s", + "name": "%s" + } + }""", currentStep.phase(), currentStep.action(), currentStep.name(), nextStep.phase(), nextStep.action(), nextStep.name())); + ESRestTestCase.assertAcknowledged(client.performRequest(moveToStepRequest)); + } + @SuppressWarnings("unchecked") public static String getSnapshotState(RestClient client, String snapshot) throws IOException { Response response = client.performRequest(new Request("GET", "/_snapshot/repo/" + snapshot)); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index b95ea9b2745cb..32136d16b33d7 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -16,13 +16,16 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.routing.allocation.DataTier; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.TimeSeriesRestDriver; import org.elasticsearch.xpack.core.ilm.AllocateAction; import org.elasticsearch.xpack.core.ilm.DeleteAction; import org.elasticsearch.xpack.core.ilm.ForceMergeAction; @@ -61,10 +64,15 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.elasticsearch.xpack.core.ilm.DeleteAction.WITH_SNAPSHOT_DELETE; +import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.FORCE_MERGE_INDEX_PREFIX; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.startsWith; public class SearchableSnapshotActionIT extends ESRestTestCase { @@ -121,65 +129,90 @@ public void testSearchableSnapshotAction() throws Exception { ); } - public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exception { - createSnapshotRepo(client(), snapshotRepo, randomBoolean()); - createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true)); + /** + * Test that when we have a searchable snapshot action with force merge enabled and the source index has _at least one_ replica, + * we perform the force merge on _the cloned index_ with 0 replicas and then snapshot the clone. + * We also sometimes artificially "pause" ILM on the clone step to allow us to assert that the cloned index has the correct settings. + * We do this by disabling allocation in the whole cluster, which means the clone step cannot complete as none of its shards can + * allocate. After re-enabling allocation, we check that the remainder of the action completes as expected. + */ + public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { + final int numberOfPrimaries = randomIntBetween(1, 3); + // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. + final int numberOfReplicas = randomIntBetween(1, 3); + final String backingIndexName = prepareDataStreamWithDocs(numberOfPrimaries, numberOfReplicas); + + final boolean pauseOnClone = randomBoolean(); + if (pauseOnClone) { + logger.info("--> pausing test on index clone step"); + configureClusterAllocation(false); + } + try { + // Enable/start ILM on the data stream. + updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - createComposableTemplate( - client(), - randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT), - dataStream, - new Template(null, null, null) - ); + if (pauseOnClone) { + assertForceMergeCloneIndexSettings(backingIndexName, numberOfPrimaries); + configureClusterAllocation(true); + } - for (int i = 0; i < randomIntBetween(5, 10); i++) { - indexDocument(client(), dataStream, true); + assertForceMergedSnapshotDone(backingIndexName, numberOfPrimaries > 1, true); + } catch (Exception e) { + // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. + configureClusterAllocation(true); + throw e; } + } - List backingIndices = getDataStreamBackingIndexNames(dataStream); - String backingIndexName = backingIndices.getFirst(); - Integer preLifecycleBackingIndexSegments = getNumberOfPrimarySegments(client(), backingIndexName); - assertThat(preLifecycleBackingIndexSegments, greaterThanOrEqualTo(1)); - - // rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index - rolloverMaxOneDocCondition(client(), dataStream); + /** + * Test that when we have a searchable snapshot action with force merge enabled and the source index has _zero_ replicas, + * we perform the force merge on the _source_ index and snapshot the source index. + */ + public void testSearchableSnapshotForceMergesSourceIndex() throws Exception { + // Data streams have 1 primary shard by default. + // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. + final String backingIndexName = prepareDataStreamWithDocs(1, 0); + // Enable/start ILM on the data stream. updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - assertTrue(waitUntil(() -> { - try { - Integer numberOfSegments = getNumberOfPrimarySegments(client(), backingIndexName); - logger.info("index {} has {} segments", backingIndexName, numberOfSegments); - // this is a loose assertion here as forcemerge is best effort - if (preLifecycleBackingIndexSegments > 1) { - return numberOfSegments < preLifecycleBackingIndexSegments; - } else { - // the index had only one segement to start with so nothing to assert - return true; - } - } catch (Exception e) { - try { - // if ILM executed the action already we don't have an index to assert on so we don't fail the test - return indexExists(backingIndexName) == false; - } catch (IOException ex) { - return false; - } - } - }, 60, TimeUnit.SECONDS)); - String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName; - assertTrue(waitUntil(() -> { - try { - return indexExists(restoredIndexName); - } catch (IOException e) { - return false; - } - }, 60, TimeUnit.SECONDS)); + assertForceMergedSnapshotDone(backingIndexName, false, false); + } - assertBusy( - () -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); }, - 30, - TimeUnit.SECONDS - ); + /** + * Test that when we have a searchable snapshot action with force merge enabled and the source index has _at least one_ replica, + * we perform the force merge on _the cloned index_ with 0 replicas and then snapshot the clone. + * This test simulates a failure during the clone step by pausing ILM on the clone step and then moving the index back to the cleanup + * step. We need to resort to simulation, as triggering the threshold in the ClusterStateWaitUntilThresholdStep is not feasible in a + * Java REST test. After re-enabling allocation, we check that the remainder of the action completes as expected. + */ + public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exception { + final int numberOfPrimaries = randomIntBetween(1, 3); + final int numberOfReplicas = randomIntBetween(1, 3); + final String backingIndexName = prepareDataStreamWithDocs(numberOfPrimaries, numberOfReplicas); + + configureClusterAllocation(false); + try { + // Enable/start ILM on the data stream. + updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); + + assertForceMergeCloneIndexSettings(backingIndexName, numberOfPrimaries); + + TimeSeriesRestDriver.moveIndexToStep( + client(), + backingIndexName, + new Step.StepKey("cold", "searchable_snapshot", "clone"), + new Step.StepKey("cold", "searchable_snapshot", "cleanup-generated-index") + ); + + configureClusterAllocation(true); + + assertForceMergedSnapshotDone(backingIndexName, numberOfPrimaries > 1, true); + } catch (Exception e) { + // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. + configureClusterAllocation(true); + throw e; + } } @SuppressWarnings("unchecked") @@ -1101,4 +1134,137 @@ private Step.StepKey getKeyForIndex(Response response, String indexName) throws String step = (String) indexResponse.get("step"); return new Step.StepKey(phase, action, step); } + + /** + * Prepares a data stream with the specified number of primary and replica shards, + * creates a snapshot repository and ILM policy, applies a composable template, + * indexes several documents, and performs a rollover. Returns the name of the + * first generation backing index. + */ + private String prepareDataStreamWithDocs(int numberOfPrimaries, int numberOfReplicas) throws Exception { + createSnapshotRepo(client(), snapshotRepo, randomBoolean()); + createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true)); + + createComposableTemplate( + client(), + randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT), + dataStream, + new Template(indexSettings(numberOfPrimaries, numberOfReplicas).build(), null, null) + ); + for (int i = 0; i < randomIntBetween(5, 10); i++) { + indexDocument(client(), dataStream, true); + } + final var backingIndexName = getDataStreamBackingIndexNames(dataStream).getFirst(); + + // Retrieve the number of segments in the first (random) shard of the backing index. + final Integer preLifecycleBackingIndexSegments = getNumberOfPrimarySegments(client(), backingIndexName); + // If we have only one primary shard, we expect at least one segment. We're hoping to get multiple segments, but we can't guarantee + // that, so we have to resort to a "greater than or equal to" check. + if (numberOfPrimaries == 1) { + assertThat(preLifecycleBackingIndexSegments, greaterThanOrEqualTo(1)); + } else { + // With multiple primary shards, the segments are more spread out, so it's even less likely that we'll get more than 1 segment + // in one shard, and some shards might even be empty. + assertThat(preLifecycleBackingIndexSegments, anyOf(greaterThanOrEqualTo(1), equalTo(0))); + } + + // Rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index. + rolloverMaxOneDocCondition(client(), dataStream); + // Wait for all shards to be allocated. + ensureGreen(dataStream); + return backingIndexName; + } + + /** + * Waits for the force-merge clone index to exist and asserts that it has the correct number of primary shards and zero replicas, + * and that its name follows the expected naming convention. + */ + private static void assertForceMergeCloneIndexSettings(String backingIndexName, int numberOfPrimaries) throws Exception { + final String forceMergeIndexPattern = FORCE_MERGE_INDEX_PREFIX + "*" + backingIndexName; + assertBusy(() -> { + // The force-merged index is hidden, so we need to expand wildcards. + Request request = new Request("GET", "/" + forceMergeIndexPattern + "/_settings?expand_wildcards=all&flat_settings=true"); + Response response = client().performRequest(request); + final Map indicesSettings = entityAsMap(response); + assertThat( + "expected only settings for index: " + forceMergeIndexPattern + ", but got " + indicesSettings, + indicesSettings.size(), + equalTo(1) + ); + final String forceMergeIndexName = indicesSettings.keySet().iterator().next(); + assertThat(forceMergeIndexName, startsWith(FORCE_MERGE_INDEX_PREFIX)); + assertThat(forceMergeIndexName, endsWith(backingIndexName)); + @SuppressWarnings("unchecked") + final var forceMergeIndexResponse = (Map) indicesSettings.get(forceMergeIndexName); + @SuppressWarnings("unchecked") + final Map forceMergeIndexSettings = (Map) forceMergeIndexResponse.get("settings"); + assertThat(forceMergeIndexSettings.get("index.number_of_shards"), equalTo(String.valueOf(numberOfPrimaries))); + assertThat(forceMergeIndexSettings.get("index.number_of_replicas"), equalTo("0")); + }); + } + + /** + * Asserts that the restored searchable snapshot index exists, the original and force-merge clone indices are deleted, + * and the restored index has the expected number of segments. Also verifies the snapshot index naming conventions. + * + * @param backingIndexName The original backing index name. + * @param multiplePrimaries True if the original backing index had multiple primaries, affecting segment assertions. + * @param withReplicas True if the original backing index had one or more replicas, affecting snapshot index naming assertions. + */ + private void assertForceMergedSnapshotDone(String backingIndexName, boolean multiplePrimaries, boolean withReplicas) throws Exception { + final String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName; + awaitIndexExists(restoredIndexName); + + assertBusy(() -> assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME))); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); + // Regardless of whether we force merged the backing index or a clone, the cloned index should not exist (anymore). + awaitIndexDoesNotExist(FORCE_MERGE_INDEX_PREFIX + "-*-" + backingIndexName); + + // Retrieve the number of segments in the first (random) shard of the backing index. + final Integer numberOfPrimarySegments = getNumberOfPrimarySegments(client(), restoredIndexName); + // If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment + if (multiplePrimaries) { + assertThat(numberOfPrimarySegments, lessThanOrEqualTo(1)); + } else { + // If the backing index had only one primary, we expect exactly 1 segment after force merging. + assertThat(numberOfPrimarySegments, equalTo(1)); + } + + // We can't assert the replicas of the mounted snapshot as it's created with 0 replicas by default, but we can at least assert + // that the mounted snapshot was taken from the correct source index. + final List> snapshots = getSnapshots(); + assertThat("expected to have only one snapshot, but got: " + snapshots, snapshots.size(), equalTo(1)); + final Map snapshot = snapshots.getFirst(); + @SuppressWarnings("unchecked") + final List indices = (List) snapshot.get("indices"); + assertThat("expected to have only one index, but got: " + indices, indices.size(), equalTo(1)); + final String snapshotIndexName = indices.getFirst(); + // If the backing index had replicas, we force merged a clone, so the snapshot index name should match the clone naming pattern. + if (withReplicas) { + assertThat("expected index to start with the force merge prefix", snapshotIndexName, startsWith(FORCE_MERGE_INDEX_PREFIX)); + assertThat("expected index to end with the backing index name", snapshotIndexName, endsWith(backingIndexName)); + } else { + // If the backing index had no replicas, we force merged the backing index itself, so the snapshot index name should be equal + // to the backing index name. + assertThat("expected index to be the backing index name", snapshotIndexName, equalTo(backingIndexName)); + } + } + + private List> getSnapshots() throws IOException { + Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); + Response response = client().performRequest(getSnaps); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + return objectPath.evaluate("snapshots"); + } + + /** + * Updates the cluster settings to enable or disable shard allocation in the whole cluster. + */ + private static void configureClusterAllocation(boolean enable) throws IOException { + final String value = enable ? null : "none"; + updateClusterSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), value).build() + ); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java index bac3aa8e6ac38..d19dcb28cb7d2 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java @@ -223,7 +223,8 @@ static IndexLifecycleExplainResponse getIndexLifecycleExplainResponse( stepInfoBytes, previousStepInfoBytes, phaseExecutionInfo, - LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(idxSettings) + LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(idxSettings), + lifecycleState.forceMergeIndexName() ); } else { indexResponse = null; From 0ea2ce4a02a3a1b5de2e87d0d03aabeab97789cf Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Mon, 1 Sep 2025 19:38:04 +0200 Subject: [PATCH 02/17] Small comment fixes Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../xpack/core/ilm/IndexLifecycleExplainResponse.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java index 07cd672e8f588..9937353826bd0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java @@ -360,7 +360,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { } else { skip = false; } - // No need for serialization from this point onwards as this action only runs on the local node. + // No need for deserialization from this point onwards as this action only runs on the local node. forceMergeIndexName = null; } else { policyName = null; @@ -417,7 +417,7 @@ public void writeTo(StreamOutput out) throws IOException { || out.getTransportVersion().onOrAfter(TransportVersions.ILM_ADD_SKIP_SETTING)) { out.writeBoolean(skip); } - // No need for deserialization from this point onwards as this action only runs on the local node. + // No need for serialization from this point onwards as this action only runs on the local node. } } From 15abc013b26d2db197b89514a388d59a68abce2d Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Mon, 1 Sep 2025 19:38:46 +0200 Subject: [PATCH 03/17] Update docs/changelog/133954.yaml --- docs/changelog/133954.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/133954.yaml diff --git a/docs/changelog/133954.yaml b/docs/changelog/133954.yaml new file mode 100644 index 0000000000000..ed233e652bb4b --- /dev/null +++ b/docs/changelog/133954.yaml @@ -0,0 +1,6 @@ +pr: 133954 +summary: "ILM: Force merge on zero-replica cloned index before snapshot" +area: ILM+SLM +type: enhancement +issues: + - 75478 From 7e9d07227ee7cd1e17ee4277722935e9f193417e Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Mon, 1 Sep 2025 22:49:20 +0200 Subject: [PATCH 04/17] Add extra waits to fix tests --- .../ilm/actions/SearchableSnapshotActionIT.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 32136d16b33d7..72999bea03942 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -339,8 +339,8 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws // rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index indexDocument(client(), dataStream, true); - var backingIndices = getDataStreamBackingIndexNames(dataStream); - String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndices.get(0); + String backingIndexName = getDataStreamBackingIndexNames(dataStream).getFirst(); + String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName; assertTrue(waitUntil(() -> { try { return indexExists(restoredIndexName); @@ -354,6 +354,8 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws assertThat(stepKeyForIndex.phase(), is("hot")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); createPolicy( client(), @@ -431,6 +433,8 @@ public void testRestoredIndexManagedByLocalPolicySkipsIllegalActions() throws Ex assertThat(stepKeyForIndex.phase(), is("hot")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); // snapshot the data stream String dsSnapshotName = "snapshot_ds_" + dataStream; @@ -535,6 +539,9 @@ public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception { assertThat(stepKeyForIndex.phase(), is("cold")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(index); + Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); Response response = client().performRequest(getSnaps); @@ -596,6 +603,8 @@ public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(index); Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); Response response = client().performRequest(getSnaps); @@ -1011,6 +1020,8 @@ public void testSearchableSnapshotTotalShardsPerNode() throws Exception { assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); }, 30, TimeUnit.SECONDS); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(index); // validate total_shards_per_node setting Map indexSettings = getIndexSettingsAsMap(searchableSnapMountedIndexName); @@ -1086,6 +1097,8 @@ public void testSearchableSnapshotReplicateFor() throws Exception { Integer numberOfReplicas = Integer.valueOf((String) indexSettings.get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())); assertThat(numberOfReplicas, is(1)); } + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(backingIndexName); // tweak the policy to replicate_for hardly any time at all createPolicy( From 405fb5b9138dee0b257acb4d1d1a0d1dd74b825e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 1 Sep 2025 20:55:19 +0000 Subject: [PATCH 05/17] [CI] Auto commit changes from spotless --- .../xpack/ilm/actions/SearchableSnapshotActionIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 72999bea03942..95fb731b2a7a5 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -542,7 +542,6 @@ public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception { // Wait for the original index to be deleted, to ensure ILM has finished awaitIndexDoesNotExist(index); - Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); Response response = client().performRequest(getSnaps); Map responseMap; From c720e0c316112827337dd59c1caf4a340ef5dc4c Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Mon, 1 Sep 2025 23:14:25 +0200 Subject: [PATCH 06/17] Add another wait --- .../xpack/ilm/actions/SearchableSnapshotActionIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 95fb731b2a7a5..54c4a0d78ba5d 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -930,8 +930,8 @@ public void testSearchableSnapshotsInHotPhasePinnedToHotNodes() throws Exception // searchable snapshots mounted in the hot phase should be pinned to hot nodes assertThat(hotIndexSettings.get(DataTier.TIER_PREFERENCE), is("data_hot")); - assertOK(client().performRequest(new Request("DELETE", "_data_stream/" + dataStream))); - assertOK(client().performRequest(new Request("DELETE", "_ilm/policy/" + policy))); + // Wait for the original index to be deleted, to ensure ILM has finished + awaitIndexDoesNotExist(firstGenIndex); } // See: https://github.com/elastic/elasticsearch/issues/77269 From 4f0ae14420cc64a2b2ad71c607198ae2c1175ba7 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Wed, 3 Sep 2025 10:35:23 -0300 Subject: [PATCH 07/17] Randomize the phase of the tests --- .../actions/SearchableSnapshotActionIT.java | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 54c4a0d78ba5d..cc879a773191b 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -65,7 +65,6 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.elasticsearch.xpack.core.ilm.DeleteAction.WITH_SNAPSHOT_DELETE; import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.FORCE_MERGE_INDEX_PREFIX; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -140,7 +139,8 @@ public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { final int numberOfPrimaries = randomIntBetween(1, 3); // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. final int numberOfReplicas = randomIntBetween(1, 3); - final String backingIndexName = prepareDataStreamWithDocs(numberOfPrimaries, numberOfReplicas); + final String phase = randomBoolean() ? "cold" : "frozen"; + final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, numberOfReplicas); final boolean pauseOnClone = randomBoolean(); if (pauseOnClone) { @@ -156,7 +156,7 @@ public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { configureClusterAllocation(true); } - assertForceMergedSnapshotDone(backingIndexName, numberOfPrimaries > 1, true); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true); } catch (Exception e) { // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. configureClusterAllocation(true); @@ -171,12 +171,13 @@ public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { public void testSearchableSnapshotForceMergesSourceIndex() throws Exception { // Data streams have 1 primary shard by default. // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. - final String backingIndexName = prepareDataStreamWithDocs(1, 0); + final String phase = randomBoolean() ? "cold" : "frozen"; + final String backingIndexName = prepareDataStreamWithDocs(phase, 1, 0); // Enable/start ILM on the data stream. updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - assertForceMergedSnapshotDone(backingIndexName, false, false); + assertForceMergedSnapshotDone(phase, backingIndexName, false, false); } /** @@ -189,7 +190,8 @@ public void testSearchableSnapshotForceMergesSourceIndex() throws Exception { public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exception { final int numberOfPrimaries = randomIntBetween(1, 3); final int numberOfReplicas = randomIntBetween(1, 3); - final String backingIndexName = prepareDataStreamWithDocs(numberOfPrimaries, numberOfReplicas); + final String phase = randomBoolean() ? "cold" : "frozen"; + final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, numberOfReplicas); configureClusterAllocation(false); try { @@ -201,13 +203,13 @@ public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exce TimeSeriesRestDriver.moveIndexToStep( client(), backingIndexName, - new Step.StepKey("cold", "searchable_snapshot", "clone"), - new Step.StepKey("cold", "searchable_snapshot", "cleanup-generated-index") + new Step.StepKey(phase, "searchable_snapshot", "clone"), + new Step.StepKey(phase, "searchable_snapshot", "cleanup-generated-index") ); configureClusterAllocation(true); - assertForceMergedSnapshotDone(backingIndexName, numberOfPrimaries > 1, true); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true); } catch (Exception e) { // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. configureClusterAllocation(true); @@ -1153,9 +1155,9 @@ private Step.StepKey getKeyForIndex(Response response, String indexName) throws * indexes several documents, and performs a rollover. Returns the name of the * first generation backing index. */ - private String prepareDataStreamWithDocs(int numberOfPrimaries, int numberOfReplicas) throws Exception { + private String prepareDataStreamWithDocs(String phase, int numberOfPrimaries, int numberOfReplicas) throws Exception { createSnapshotRepo(client(), snapshotRepo, randomBoolean()); - createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true)); + createNewSingletonPolicy(client(), policy, phase, new SearchableSnapshotAction(snapshotRepo, true)); createComposableTemplate( client(), @@ -1177,7 +1179,7 @@ private String prepareDataStreamWithDocs(int numberOfPrimaries, int numberOfRepl } else { // With multiple primary shards, the segments are more spread out, so it's even less likely that we'll get more than 1 segment // in one shard, and some shards might even be empty. - assertThat(preLifecycleBackingIndexSegments, anyOf(greaterThanOrEqualTo(1), equalTo(0))); + assertThat(preLifecycleBackingIndexSegments, greaterThanOrEqualTo(0)); } // Rolling over the data stream so we can apply the searchable snapshot policy to a backing index that's not the write index. @@ -1219,12 +1221,17 @@ private static void assertForceMergeCloneIndexSettings(String backingIndexName, * Asserts that the restored searchable snapshot index exists, the original and force-merge clone indices are deleted, * and the restored index has the expected number of segments. Also verifies the snapshot index naming conventions. * - * @param backingIndexName The original backing index name. + * @param phase The phase of the ILM policy that the searchable snapshot action runs in. + * @param backingIndexName The original backing index name. * @param multiplePrimaries True if the original backing index had multiple primaries, affecting segment assertions. - * @param withReplicas True if the original backing index had one or more replicas, affecting snapshot index naming assertions. + * @param withReplicas True if the original backing index had one or more replicas, affecting snapshot index naming assertions. */ - private void assertForceMergedSnapshotDone(String backingIndexName, boolean multiplePrimaries, boolean withReplicas) throws Exception { - final String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName; + private void assertForceMergedSnapshotDone(String phase, String backingIndexName, boolean multiplePrimaries, boolean withReplicas) + throws Exception { + final String prefix = phase.equals("cold") + ? SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + : SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX; + final String restoredIndexName = prefix + backingIndexName; awaitIndexExists(restoredIndexName); assertBusy(() -> assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME))); @@ -1236,7 +1243,7 @@ private void assertForceMergedSnapshotDone(String backingIndexName, boolean mult // Retrieve the number of segments in the first (random) shard of the backing index. final Integer numberOfPrimarySegments = getNumberOfPrimarySegments(client(), restoredIndexName); // If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment - if (multiplePrimaries) { + if (multiplePrimaries || phase.equals("frozen")) { assertThat(numberOfPrimarySegments, lessThanOrEqualTo(1)); } else { // If the backing index had only one primary, we expect exactly 1 segment after force merging. From 2f044a33a802dc6fb218d314a4cbba98bdadea47 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Wed, 3 Sep 2025 10:38:11 -0300 Subject: [PATCH 08/17] Add comment on `indexSurvives` --- .../java/org/elasticsearch/xpack/core/ilm/DeleteStep.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java index 23de791ea201c..176cb013010b6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java @@ -46,7 +46,9 @@ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) { } /** - * Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on. + * Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on. The parameter + * {@code indexSurvives} indicates whether the index that ILM runs on will survive (i.e. not get deleted) this step. + * Look at the callers of {@link AsyncActionStep#indexSurvives()} for more details. */ public DeleteStep( StepKey key, From 04562692492201f0a3ca0feb4e7662aff50b7a61 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Wed, 3 Sep 2025 18:01:58 -0300 Subject: [PATCH 09/17] Pass index name supplier --- .../xpack/core/ilm/ForceMergeStep.java | 29 +++++++++++++++-- .../core/ilm/GenerateSnapshotNameStep.java | 19 ++++++++---- .../core/ilm/SearchableSnapshotAction.java | 24 ++++++++++++-- .../xpack/core/ilm/SegmentCountStep.java | 31 +++++++++++++++++-- .../ilm/GenerateSnapshotNameStepTests.java | 6 ++-- 5 files changed, 91 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java index 68b287bbeb5f4..233e39eb4421f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java @@ -16,10 +16,12 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.common.Strings; import java.util.Arrays; import java.util.Objects; +import java.util.function.BiFunction; /** * Invokes a force merge on a single index. @@ -28,11 +30,33 @@ public class ForceMergeStep extends AsyncActionStep { public static final String NAME = "forcemerge"; private static final Logger logger = LogManager.getLogger(ForceMergeStep.class); + private static final BiFunction DEFAULT_TARGET_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> indexName; + private final int maxNumSegments; + private final BiFunction targetIndexNameSupplier; + /** + * Creates a new {@link ForceMergeStep} that will perform a force merge on the index that ILM is currently operating on. + */ public ForceMergeStep(StepKey key, StepKey nextStepKey, Client client, int maxNumSegments) { + this(key, nextStepKey, client, maxNumSegments, DEFAULT_TARGET_INDEX_NAME_SUPPLIER); + } + + /** + * Creates a new {@link ForceMergeStep} that will perform a force merge on the index name returned by the supplier. + */ + public ForceMergeStep( + StepKey key, + StepKey nextStepKey, + Client client, + int maxNumSegments, + BiFunction targetIndexNameSupplier + ) { super(key, nextStepKey, client); this.maxNumSegments = maxNumSegments; + this.targetIndexNameSupplier = targetIndexNameSupplier; } @Override @@ -51,9 +75,8 @@ public void performAction( ClusterStateObserver observer, ActionListener listener ) { - // Use the cloned index name if we have one, otherwise fall back to the original index name. - String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName(); - String indexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName(); + String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState()); + assert indexName != null : "target index name supplier must not return null"; ForceMergeRequest request = new ForceMergeRequest(indexName); request.maxNumSegments(maxNumSegments); getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java index b7b47473de757..88b4abc0871fa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java @@ -21,6 +21,7 @@ import java.util.Locale; import java.util.Objects; +import java.util.function.BiFunction; /** * Generates a snapshot name for the given index and records it in the index metadata along with the provided snapshot repository. @@ -35,10 +36,17 @@ public class GenerateSnapshotNameStep extends ClusterStateActionStep { private static final Logger logger = LogManager.getLogger(GenerateSnapshotNameStep.class); private final String snapshotRepository; - - public GenerateSnapshotNameStep(StepKey key, StepKey nextStepKey, String snapshotRepository) { + private final BiFunction targetIndexNameSupplier; + + public GenerateSnapshotNameStep( + StepKey key, + StepKey nextStepKey, + String snapshotRepository, + BiFunction targetIndexNameSupplier + ) { super(key, nextStepKey); this.snapshotRepository = snapshotRepository; + this.targetIndexNameSupplier = targetIndexNameSupplier; } public String getSnapshotRepository() { @@ -72,12 +80,11 @@ public ProjectState performAction(Index index, ProjectState projectState) { + "] cannot continue until the repository is created or the policy is changed" ); } - // If we performed the force merge step on the cloned index, we perform the snapshot on that index instead of the original. - final String clonedIndexName = lifecycleState.forceMergeIndexName(); - final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : index.getName(); + final String indexName = targetIndexNameSupplier.apply(index.getName(), lifecycleState); + assert indexName != null : "target index name supplier must not return null"; LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState); - newLifecycleState.setSnapshotIndexName(forceMergedIndexName); + newLifecycleState.setSnapshotIndexName(indexName); newLifecycleState.setSnapshotRepository(snapshotRepository); if (lifecycleState.snapshotName() == null) { // generate and validate the snapshotName diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index 525006d806491..9d72ec22c3092 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -70,8 +70,13 @@ public class SearchableSnapshotAction implements LifecycleAction { public static final String FULL_RESTORED_INDEX_PREFIX = "restored-"; public static final String PARTIAL_RESTORED_INDEX_PREFIX = "partial-"; public static final String FORCE_MERGE_INDEX_PREFIX = "force-merge-"; + /** An index name supplier that always returns the force merge index name (possibly null). */ public static final BiFunction FORCE_MERGE_INDEX_NAME_SUPPLIER = (indexName, state) -> state .forceMergeIndexName(); + /** An index name supplier that returns the force merge index name if it exists, or the original index name if not. */ + public static final BiFunction FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER = ( + indexName, + state) -> state.forceMergeIndexName() != null ? state.forceMergeIndexName() : indexName; private static final Settings CLONE_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); private static final Function CLONE_SETTINGS_SUPPLIER = indexMetadata -> CLONE_SETTINGS; @@ -400,12 +405,25 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ), cleanupClonedIndexKey ); - ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeStepKey, waitForSegmentCountKey, client, 1); - SegmentCountStep segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, generateSnapshotNameKey, client, 1); + ForceMergeStep forceMergeStep = new ForceMergeStep( + forceMergeStepKey, + waitForSegmentCountKey, + client, + 1, + FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER + ); + SegmentCountStep segmentCountStep = new SegmentCountStep( + waitForSegmentCountKey, + generateSnapshotNameKey, + client, + 1, + FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER + ); GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep( generateSnapshotNameKey, cleanSnapshotKey, - snapshotRepository + snapshotRepository, + FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER ); CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, createSnapshotKey, client); CreateSnapshotStep createSnapshotStep = new CreateSnapshotStep(createSnapshotKey, waitForDataTierKey, cleanSnapshotKey, client); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java index d4cf3c53c9000..15b8291720980 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import java.util.stream.Collectors; /** @@ -38,12 +40,33 @@ public class SegmentCountStep extends AsyncWaitStep { private static final Logger logger = LogManager.getLogger(SegmentCountStep.class); public static final String NAME = "segment-count"; + private static final BiFunction DEFAULT_TARGET_INDEX_NAME_SUPPLIER = ( + indexName, + lifecycleState) -> indexName; private final int maxNumSegments; + private final BiFunction targetIndexNameSupplier; + /** + * Creates a new {@link SegmentCountStep} that will check the segment count on the index that ILM is currently operating on. + */ public SegmentCountStep(StepKey key, StepKey nextStepKey, Client client, int maxNumSegments) { + this(key, nextStepKey, client, maxNumSegments, DEFAULT_TARGET_INDEX_NAME_SUPPLIER); + } + + /** + * Creates a new {@link SegmentCountStep} that will check the segment count on the index name returned by the supplier. + */ + public SegmentCountStep( + StepKey key, + StepKey nextStepKey, + Client client, + int maxNumSegments, + BiFunction targetIndexNameSupplier + ) { super(key, nextStepKey, client); this.maxNumSegments = maxNumSegments; + this.targetIndexNameSupplier = targetIndexNameSupplier; } @Override @@ -57,9 +80,11 @@ public int getMaxNumSegments() { @Override public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { - // Use the cloned index name if we have one, otherwise fall back to the original index name. - String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName(); - String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName(); + String forceMergedIndexName = targetIndexNameSupplier.apply( + indexMetadata.getIndex().getName(), + indexMetadata.getLifecycleExecutionState() + ); + assert forceMergedIndexName != null : "target index name supplier must not return null"; getClient(state.projectId()).admin() .indices() .segments(new IndicesSegmentsRequest(forceMergedIndexName), ActionListener.wrap(response -> { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java index 417e605a5dc6e..27018df2bf43b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStepTests.java @@ -35,7 +35,7 @@ public class GenerateSnapshotNameStepTests extends AbstractStepTestCase i); } @Override @@ -50,12 +50,12 @@ protected GenerateSnapshotNameStep mutateInstance(GenerateSnapshotNameStep insta case 2 -> snapshotRepository = randomValueOtherThan(snapshotRepository, () -> randomAlphaOfLengthBetween(5, 10)); default -> throw new AssertionError("Illegal randomisation branch"); } - return new GenerateSnapshotNameStep(key, nextKey, snapshotRepository); + return new GenerateSnapshotNameStep(key, nextKey, snapshotRepository, null); } @Override protected GenerateSnapshotNameStep copyInstance(GenerateSnapshotNameStep instance) { - return new GenerateSnapshotNameStep(instance.getKey(), instance.getNextStepKey(), instance.getSnapshotRepository()); + return new GenerateSnapshotNameStep(instance.getKey(), instance.getNextStepKey(), instance.getSnapshotRepository(), null); } public void testPerformAction() { From 9912bc4e2af376eaa3d641bf267be414453566ca Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 07:49:03 -0300 Subject: [PATCH 10/17] Put new steps behind if-statement --- .../xpack/core/ilm/SearchableSnapshotAction.java | 8 +++++--- .../xpack/core/ilm/SearchableSnapshotActionTests.java | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index 9d72ec22c3092..fe0136853ea04 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -461,7 +461,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); CopySettingsStep copySettingsStep = new CopySettingsStep( copyLifecyclePolicySettingKey, - conditionalDeleteForceMergedIndexKey, + forceMergeIndex ? conditionalDeleteForceMergedIndexKey : dataStreamCheckBranchingKey, (index, lifecycleState) -> getRestoredIndexPrefix(copyLifecyclePolicySettingKey) + index, LifecycleSettings.LIFECYCLE_NAME ); @@ -548,8 +548,10 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac steps.add(replicateForStep); steps.add(dropReplicasStep); } - steps.add(conditionalDeleteForceMergedIndexStep); - steps.add(deleteForceMergedIndexStep); + if (forceMergeIndex) { + steps.add(conditionalDeleteForceMergedIndexStep); + steps.add(deleteForceMergedIndexStep); + } steps.add(isDataStreamBranchingStep); steps.add(replaceDataStreamBackingIndex); steps.add(deleteSourceIndexStep); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java index a55158170dfff..268cf1a57e653 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java @@ -144,8 +144,8 @@ private List expectedStepKeys(String phase, boolean forceMergeIndex, bo new StepKey(phase, NAME, CopySettingsStep.NAME), hasReplicateFor ? new StepKey(phase, NAME, WaitUntilReplicateForTimePassesStep.NAME) : null, hasReplicateFor ? new StepKey(phase, NAME, UpdateSettingsStep.NAME) : null, - new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY), - new StepKey(phase, NAME, SearchableSnapshotAction.DELETE_FORCE_MERGED_INDEX_KEY), + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DELETE_FORCE_MERGED_INDEX_KEY) : null, + forceMergeIndex ? new StepKey(phase, NAME, SearchableSnapshotAction.DELETE_FORCE_MERGED_INDEX_KEY) : null, new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY), new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME), new StepKey(phase, NAME, DeleteStep.NAME), From cda388bb67586e333d4a4aae0e0509ede10a7188 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 07:49:31 -0300 Subject: [PATCH 11/17] Also catch `AssertionError` in tests --- .../xpack/ilm/actions/SearchableSnapshotActionIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index cc879a773191b..eda5e2af2dc59 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -157,7 +157,7 @@ public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { } assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true); - } catch (Exception e) { + } catch (Exception | AssertionError e) { // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. configureClusterAllocation(true); throw e; @@ -210,7 +210,7 @@ public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exce configureClusterAllocation(true); assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true); - } catch (Exception e) { + } catch (Exception | AssertionError e) { // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. configureClusterAllocation(true); throw e; From 439c05150101cc04210c115fed116b758abc8520 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 07:51:21 -0300 Subject: [PATCH 12/17] Update changelog phrasing --- docs/changelog/133954.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/133954.yaml b/docs/changelog/133954.yaml index ed233e652bb4b..a6a4105b343d8 100644 --- a/docs/changelog/133954.yaml +++ b/docs/changelog/133954.yaml @@ -1,5 +1,5 @@ pr: 133954 -summary: "ILM: Force merge on zero-replica cloned index before snapshot" +summary: "ILM: Force merge on zero-replica cloned index before snapshotting for searchable snapshots" area: ILM+SLM type: enhancement issues: From a6b0c507cd2a9f312695caa850f02e1efe3256a5 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 09:18:31 -0300 Subject: [PATCH 13/17] Rename index prefix and fields --- .../metadata/LifecycleExecutionState.java | 6 ++--- .../ilm/IndexLifecycleExplainResponse.java | 6 ++--- .../core/ilm/SearchableSnapshotAction.java | 25 ++++++++++--------- .../actions/SearchableSnapshotActionIT.java | 14 +++++++---- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java index cefac0fb968b1..61347691d6e8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java @@ -65,7 +65,7 @@ public record LifecycleExecutionState( private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name"; private static final String SHRINK_INDEX_NAME = "shrink_index_name"; private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name"; - private static final String FORCE_MERGE_INDEX_NAME = "force_merge_index_name"; + private static final String FORCE_MERGE_CLONE_INDEX_NAME = "force_merge_clone_index_name"; public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build(); @@ -205,7 +205,7 @@ public static LifecycleExecutionState fromCustomMetadata(Map cus if (downsampleIndexName != null) { builder.setDownsampleIndexName(downsampleIndexName); } - String forceMergeIndexName = customData.get(FORCE_MERGE_INDEX_NAME); + String forceMergeIndexName = customData.get(FORCE_MERGE_CLONE_INDEX_NAME); if (forceMergeIndexName != null) { builder.setForceMergeIndexName(forceMergeIndexName); } @@ -282,7 +282,7 @@ public Map asMap() { result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName); } if (forceMergeIndexName != null) { - result.put(FORCE_MERGE_INDEX_NAME, forceMergeIndexName); + result.put(FORCE_MERGE_CLONE_INDEX_NAME, forceMergeIndexName); } return Collections.unmodifiableMap(result); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java index 9937353826bd0..a86051c0d4641 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java @@ -57,7 +57,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl private static final ParseField SHRINK_INDEX_NAME = new ParseField("shrink_index_name"); private static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name"); private static final ParseField SKIP_NAME = new ParseField("skip"); - private static final ParseField FORCE_MERGE_INDEX_NAME = new ParseField("force_merge_index_name"); + private static final ParseField FORCE_MERGE_CLONE_INDEX_NAME = new ParseField("force_merge_clone_index_name"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "index_lifecycle_explain_response", @@ -126,7 +126,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl }, PREVIOUS_STEP_INFO_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SKIP_NAME); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), AGE_IN_MILLIS_FIELD); - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX_NAME); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_CLONE_INDEX_NAME); } private final String index; @@ -613,7 +613,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field(SKIP_NAME.getPreferredName(), skip); if (forceMergeIndexName != null) { - builder.field(FORCE_MERGE_INDEX_NAME.getPreferredName(), forceMergeIndexName); + builder.field(FORCE_MERGE_CLONE_INDEX_NAME.getPreferredName(), forceMergeIndexName); } } builder.endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index fe0136853ea04..27c7c2d2eff33 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -69,12 +69,13 @@ public class SearchableSnapshotAction implements LifecycleAction { public static final String FULL_RESTORED_INDEX_PREFIX = "restored-"; public static final String PARTIAL_RESTORED_INDEX_PREFIX = "partial-"; - public static final String FORCE_MERGE_INDEX_PREFIX = "force-merge-"; + public static final String FORCE_MERGE_CLONE_INDEX_PREFIX = "fm-clone-"; /** An index name supplier that always returns the force merge index name (possibly null). */ - public static final BiFunction FORCE_MERGE_INDEX_NAME_SUPPLIER = (indexName, state) -> state - .forceMergeIndexName(); + public static final BiFunction FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER = ( + indexName, + state) -> state.forceMergeIndexName(); /** An index name supplier that returns the force merge index name if it exists, or the original index name if not. */ - public static final BiFunction FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER = ( + public static final BiFunction FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER = ( indexName, state) -> state.forceMergeIndexName() != null ? state.forceMergeIndexName() : indexName; @@ -375,12 +376,12 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac cleanupClonedIndexKey, generateCloneIndexNameKey, client, - FORCE_MERGE_INDEX_NAME_SUPPLIER + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER ); GenerateUniqueIndexNameStep generateCloneIndexNameStep = new GenerateUniqueIndexNameStep( generateCloneIndexNameKey, cloneIndexKey, - FORCE_MERGE_INDEX_PREFIX, + FORCE_MERGE_CLONE_INDEX_PREFIX, (generatedIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setForceMergeIndexName(generatedIndexName) ); // Clone the index with 0 replicas. @@ -389,7 +390,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac waitForClonedIndexGreenKey, client, ResizeType.CLONE, - FORCE_MERGE_INDEX_NAME_SUPPLIER, + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER, CLONE_SETTINGS_SUPPLIER, null ); @@ -401,7 +402,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac waitForClonedIndexGreenKey, forceMergeStepKey, ClusterHealthStatus.GREEN, - FORCE_MERGE_INDEX_NAME_SUPPLIER + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER ), cleanupClonedIndexKey ); @@ -410,20 +411,20 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac waitForSegmentCountKey, client, 1, - FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER + FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER ); SegmentCountStep segmentCountStep = new SegmentCountStep( waitForSegmentCountKey, generateSnapshotNameKey, client, 1, - FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER + FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER ); GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep( generateSnapshotNameKey, cleanSnapshotKey, snapshotRepository, - FORCE_MERGE_INDEX_NAME_FALLBACK_SUPPLIER + FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER ); CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, createSnapshotKey, client); CreateSnapshotStep createSnapshotStep = new CreateSnapshotStep(createSnapshotKey, waitForDataTierKey, cleanSnapshotKey, client); @@ -482,7 +483,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac deleteForceMergedIndexKey, dataStreamCheckBranchingKey, client, - FORCE_MERGE_INDEX_NAME_SUPPLIER, + FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER, true ); BranchingStep isDataStreamBranchingStep = new BranchingStep( diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index eda5e2af2dc59..cebf85802cb7f 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -64,7 +64,7 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.elasticsearch.xpack.core.ilm.DeleteAction.WITH_SNAPSHOT_DELETE; -import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.FORCE_MERGE_INDEX_PREFIX; +import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.FORCE_MERGE_CLONE_INDEX_PREFIX; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -1194,7 +1194,7 @@ private String prepareDataStreamWithDocs(String phase, int numberOfPrimaries, in * and that its name follows the expected naming convention. */ private static void assertForceMergeCloneIndexSettings(String backingIndexName, int numberOfPrimaries) throws Exception { - final String forceMergeIndexPattern = FORCE_MERGE_INDEX_PREFIX + "*" + backingIndexName; + final String forceMergeIndexPattern = FORCE_MERGE_CLONE_INDEX_PREFIX + "*" + backingIndexName; assertBusy(() -> { // The force-merged index is hidden, so we need to expand wildcards. Request request = new Request("GET", "/" + forceMergeIndexPattern + "/_settings?expand_wildcards=all&flat_settings=true"); @@ -1206,7 +1206,7 @@ private static void assertForceMergeCloneIndexSettings(String backingIndexName, equalTo(1) ); final String forceMergeIndexName = indicesSettings.keySet().iterator().next(); - assertThat(forceMergeIndexName, startsWith(FORCE_MERGE_INDEX_PREFIX)); + assertThat(forceMergeIndexName, startsWith(FORCE_MERGE_CLONE_INDEX_PREFIX)); assertThat(forceMergeIndexName, endsWith(backingIndexName)); @SuppressWarnings("unchecked") final var forceMergeIndexResponse = (Map) indicesSettings.get(forceMergeIndexName); @@ -1238,7 +1238,7 @@ private void assertForceMergedSnapshotDone(String phase, String backingIndexName // Wait for the original index to be deleted, to ensure ILM has finished awaitIndexDoesNotExist(backingIndexName); // Regardless of whether we force merged the backing index or a clone, the cloned index should not exist (anymore). - awaitIndexDoesNotExist(FORCE_MERGE_INDEX_PREFIX + "-*-" + backingIndexName); + awaitIndexDoesNotExist(FORCE_MERGE_CLONE_INDEX_PREFIX + "-*-" + backingIndexName); // Retrieve the number of segments in the first (random) shard of the backing index. final Integer numberOfPrimarySegments = getNumberOfPrimarySegments(client(), restoredIndexName); @@ -1261,7 +1261,11 @@ private void assertForceMergedSnapshotDone(String phase, String backingIndexName final String snapshotIndexName = indices.getFirst(); // If the backing index had replicas, we force merged a clone, so the snapshot index name should match the clone naming pattern. if (withReplicas) { - assertThat("expected index to start with the force merge prefix", snapshotIndexName, startsWith(FORCE_MERGE_INDEX_PREFIX)); + assertThat( + "expected index to start with the force merge prefix", + snapshotIndexName, + startsWith(FORCE_MERGE_CLONE_INDEX_PREFIX) + ); assertThat("expected index to end with the backing index name", snapshotIndexName, endsWith(backingIndexName)); } else { // If the backing index had no replicas, we force merged the backing index itself, so the snapshot index name should be equal From 6e0a04bd7fa9b466287076a33bf75dd6620481cb Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 19:40:12 -0300 Subject: [PATCH 14/17] Rename some more variables --- .../metadata/LifecycleExecutionState.java | 22 ++++++++-------- .../xpack/core/ilm/CreateSnapshotStep.java | 2 +- .../ilm/IndexLifecycleExplainResponse.java | 26 +++++++++---------- .../core/ilm/SearchableSnapshotAction.java | 8 +++--- .../IndexLifecycleExplainResponseTests.java | 2 +- .../actions/SearchableSnapshotActionIT.java | 8 +++--- .../TransportExplainLifecycleAction.java | 2 +- 7 files changed, 35 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java index 61347691d6e8d..f1def7aeb0ebf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java @@ -41,7 +41,7 @@ public record LifecycleExecutionState( String shrinkIndexName, String snapshotIndexName, String downsampleIndexName, - String forceMergeIndexName + String forceMergeCloneIndexName ) { public static final String ILM_CUSTOM_METADATA_KEY = "ilm"; @@ -92,7 +92,7 @@ public static Builder builder(LifecycleExecutionState state) { .setSnapshotIndexName(state.snapshotIndexName) .setDownsampleIndexName(state.downsampleIndexName) .setStepTime(state.stepTime) - .setForceMergeIndexName(state.forceMergeIndexName); + .setForceMergeCloneIndexName(state.forceMergeCloneIndexName); } public static LifecycleExecutionState fromCustomMetadata(Map customData) { @@ -205,9 +205,9 @@ public static LifecycleExecutionState fromCustomMetadata(Map cus if (downsampleIndexName != null) { builder.setDownsampleIndexName(downsampleIndexName); } - String forceMergeIndexName = customData.get(FORCE_MERGE_CLONE_INDEX_NAME); - if (forceMergeIndexName != null) { - builder.setForceMergeIndexName(forceMergeIndexName); + String forceMergeCloneIndexName = customData.get(FORCE_MERGE_CLONE_INDEX_NAME); + if (forceMergeCloneIndexName != null) { + builder.setForceMergeCloneIndexName(forceMergeCloneIndexName); } return builder.build(); } @@ -281,8 +281,8 @@ public Map asMap() { if (downsampleIndexName != null) { result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName); } - if (forceMergeIndexName != null) { - result.put(FORCE_MERGE_CLONE_INDEX_NAME, forceMergeIndexName); + if (forceMergeCloneIndexName != null) { + result.put(FORCE_MERGE_CLONE_INDEX_NAME, forceMergeCloneIndexName); } return Collections.unmodifiableMap(result); } @@ -317,7 +317,7 @@ public static class Builder { private String shrinkIndexName; private String snapshotIndexName; private String downsampleIndexName; - private String forceMergeIndexName; + private String forceMergeCloneIndexName; public Builder setPhase(String phase) { this.phase = phase; @@ -409,8 +409,8 @@ public Builder setDownsampleIndexName(String downsampleIndexName) { return this; } - public Builder setForceMergeIndexName(String forceMergeIndexName) { - this.forceMergeIndexName = forceMergeIndexName; + public Builder setForceMergeCloneIndexName(String forceMergeCloneIndexName) { + this.forceMergeCloneIndexName = forceMergeCloneIndexName; return this; } @@ -434,7 +434,7 @@ public LifecycleExecutionState build() { shrinkIndexName, snapshotIndexName, downsampleIndexName, - forceMergeIndexName + forceMergeCloneIndexName ); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java index f3a2851d62711..8abf938c717f8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java @@ -102,7 +102,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList return; } // If we performed the force merge step on the cloned index, we need to snapshot that index instead of the original. - final String clonedIndexName = lifecycleState.forceMergeIndexName(); + final String clonedIndexName = lifecycleState.forceMergeCloneIndexName(); final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexName; CreateSnapshotRequest request = new CreateSnapshotRequest(TimeValue.MAX_VALUE, snapshotRepository, snapshotName); request.indices(forceMergedIndexName); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java index a86051c0d4641..5970045a5d8b4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java @@ -150,7 +150,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl private final String snapshotName; private final String shrinkIndexName; private final boolean skip; - private final String forceMergeIndexName; + private final String forceMergeCloneIndexName; Supplier nowSupplier = System::currentTimeMillis; // Can be changed for testing @@ -175,7 +175,7 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse( BytesReference previousStepInfo, PhaseExecutionInfo phaseExecutionInfo, boolean skip, - String forceMergeIndexName + String forceMergeCloneIndexName ) { return new IndexLifecycleExplainResponse( index, @@ -199,7 +199,7 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse( previousStepInfo, phaseExecutionInfo, skip, - forceMergeIndexName + forceMergeCloneIndexName ); } @@ -252,7 +252,7 @@ private IndexLifecycleExplainResponse( BytesReference previousStepInfo, PhaseExecutionInfo phaseExecutionInfo, boolean skip, - String forceMergeIndexName + String forceMergeCloneIndexName ) { if (managedByILM) { if (policyName == null) { @@ -321,7 +321,7 @@ private IndexLifecycleExplainResponse( this.snapshotName = snapshotName; this.shrinkIndexName = shrinkIndexName; this.skip = skip; - this.forceMergeIndexName = forceMergeIndexName; + this.forceMergeCloneIndexName = forceMergeCloneIndexName; } public IndexLifecycleExplainResponse(StreamInput in) throws IOException { @@ -361,7 +361,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { skip = false; } // No need for deserialization from this point onwards as this action only runs on the local node. - forceMergeIndexName = null; + forceMergeCloneIndexName = null; } else { policyName = null; lifecycleDate = null; @@ -382,7 +382,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException { shrinkIndexName = null; indexCreationDate = null; skip = false; - forceMergeIndexName = null; + forceMergeCloneIndexName = null; } } @@ -521,8 +521,8 @@ public boolean getSkip() { return skip; } - public String getForceMergeIndexName() { - return forceMergeIndexName; + public String getForceMergeCloneIndexName() { + return forceMergeCloneIndexName; } @Override @@ -612,8 +612,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(PHASE_EXECUTION_INFO.getPreferredName(), phaseExecutionInfo); } builder.field(SKIP_NAME.getPreferredName(), skip); - if (forceMergeIndexName != null) { - builder.field(FORCE_MERGE_CLONE_INDEX_NAME.getPreferredName(), forceMergeIndexName); + if (forceMergeCloneIndexName != null) { + builder.field(FORCE_MERGE_CLONE_INDEX_NAME.getPreferredName(), forceMergeCloneIndexName); } } builder.endObject(); @@ -644,7 +644,7 @@ public int hashCode() { previousStepInfo, phaseExecutionInfo, skip, - forceMergeIndexName + forceMergeCloneIndexName ); } @@ -678,7 +678,7 @@ public boolean equals(Object obj) { && Objects.equals(previousStepInfo, other.previousStepInfo) && Objects.equals(phaseExecutionInfo, other.phaseExecutionInfo) && Objects.equals(skip, other.skip) - && Objects.equals(forceMergeIndexName, other.forceMergeIndexName); + && Objects.equals(forceMergeCloneIndexName, other.forceMergeCloneIndexName); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index 27c7c2d2eff33..50f5d70054282 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -73,11 +73,11 @@ public class SearchableSnapshotAction implements LifecycleAction { /** An index name supplier that always returns the force merge index name (possibly null). */ public static final BiFunction FORCE_MERGE_CLONE_INDEX_NAME_SUPPLIER = ( indexName, - state) -> state.forceMergeIndexName(); + state) -> state.forceMergeCloneIndexName(); /** An index name supplier that returns the force merge index name if it exists, or the original index name if not. */ public static final BiFunction FORCE_MERGE_CLONE_INDEX_NAME_FALLBACK_SUPPLIER = ( indexName, - state) -> state.forceMergeIndexName() != null ? state.forceMergeIndexName() : indexName; + state) -> state.forceMergeCloneIndexName() != null ? state.forceMergeCloneIndexName() : indexName; private static final Settings CLONE_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); private static final Function CLONE_SETTINGS_SUPPLIER = indexMetadata -> CLONE_SETTINGS; @@ -382,7 +382,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac generateCloneIndexNameKey, cloneIndexKey, FORCE_MERGE_CLONE_INDEX_PREFIX, - (generatedIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setForceMergeIndexName(generatedIndexName) + (generatedIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setForceMergeCloneIndexName(generatedIndexName) ); // Clone the index with 0 replicas. ResizeIndexStep cloneIndexStep = new ResizeIndexStep( @@ -475,7 +475,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac (index, project) -> { IndexMetadata indexMetadata = project.index(index); assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state"; - String cloneIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName(); + String cloneIndexName = indexMetadata.getLifecycleExecutionState().forceMergeCloneIndexName(); return cloneIndexName != null && project.index(cloneIndexName) != null; } ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java index ee56b740d6611..03b91e72a6cc6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponseTests.java @@ -337,7 +337,7 @@ protected IndexLifecycleExplainResponse mutateInstance(IndexLifecycleExplainResp phaseExecutionInfo, skip, // We don't mutate any fields from this point onwards as we don't (de)serialize them, as the action is run on the local node - instance.getForceMergeIndexName() + instance.getForceMergeCloneIndexName() ); } else { return switch (between(0, 1)) { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index cebf85802cb7f..39834637d2ba9 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -1205,11 +1205,11 @@ private static void assertForceMergeCloneIndexSettings(String backingIndexName, indicesSettings.size(), equalTo(1) ); - final String forceMergeIndexName = indicesSettings.keySet().iterator().next(); - assertThat(forceMergeIndexName, startsWith(FORCE_MERGE_CLONE_INDEX_PREFIX)); - assertThat(forceMergeIndexName, endsWith(backingIndexName)); + final String forceMergeCloneIndexName = indicesSettings.keySet().iterator().next(); + assertThat(forceMergeCloneIndexName, startsWith(FORCE_MERGE_CLONE_INDEX_PREFIX)); + assertThat(forceMergeCloneIndexName, endsWith(backingIndexName)); @SuppressWarnings("unchecked") - final var forceMergeIndexResponse = (Map) indicesSettings.get(forceMergeIndexName); + final var forceMergeIndexResponse = (Map) indicesSettings.get(forceMergeCloneIndexName); @SuppressWarnings("unchecked") final Map forceMergeIndexSettings = (Map) forceMergeIndexResponse.get("settings"); assertThat(forceMergeIndexSettings.get("index.number_of_shards"), equalTo(String.valueOf(numberOfPrimaries))); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java index d19dcb28cb7d2..463f2d868f30b 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportExplainLifecycleAction.java @@ -224,7 +224,7 @@ static IndexLifecycleExplainResponse getIndexLifecycleExplainResponse( previousStepInfoBytes, phaseExecutionInfo, LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(idxSettings), - lifecycleState.forceMergeIndexName() + lifecycleState.forceMergeCloneIndexName() ); } else { indexResponse = null; From cfd368c64ee109c117f6da29b61f8a1476a8c2f9 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 19:40:43 -0300 Subject: [PATCH 15/17] Rename a leaking variable --- .../xpack/core/ilm/SegmentCountStep.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java index 15b8291720980..7442d143e2436 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java @@ -80,20 +80,20 @@ public int getMaxNumSegments() { @Override public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) { - String forceMergedIndexName = targetIndexNameSupplier.apply( + String targetIndexName = targetIndexNameSupplier.apply( indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState() ); - assert forceMergedIndexName != null : "target index name supplier must not return null"; + assert targetIndexName != null : "target index name supplier must not return null"; getClient(state.projectId()).admin() .indices() - .segments(new IndicesSegmentsRequest(forceMergedIndexName), ActionListener.wrap(response -> { - IndexSegments idxSegments = response.getIndices().get(forceMergedIndexName); + .segments(new IndicesSegmentsRequest(targetIndexName), ActionListener.wrap(response -> { + IndexSegments idxSegments = response.getIndices().get(targetIndexName); if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) { final DefaultShardOperationFailedException[] failures = response.getShardFailures(); logger.info( "[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}", - forceMergedIndexName, + targetIndexName, response.getFailedShards(), failures == null ? "n/a" @@ -112,7 +112,7 @@ public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, L .collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size())); logger.info( "[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}", - forceMergedIndexName, + targetIndexName, maxNumSegments, unmergedShards.size(), unmergedShardCounts From 5806e64f2790d7d7e63070d4ffd18da1ae5f9adb Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 20:39:41 -0300 Subject: [PATCH 16/17] Return sum of all primary shards' number of segments --- .../xpack/TimeSeriesRestDriver.java | 49 +++++++------------ .../actions/SearchableSnapshotActionIT.java | 28 +++++++---- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java index 47a302973883d..b832ef2678861 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java @@ -24,6 +24,8 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.test.rest.Stash; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; @@ -46,9 +48,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween; @@ -373,35 +373,22 @@ private static void ensureGreen(String index) throws IOException { @SuppressWarnings("unchecked") public static Integer getNumberOfPrimarySegments(RestClient client, String index) throws IOException { Response response = client.performRequest(new Request("GET", index + "/_segments")); - XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); - final Map originalResponseEntity = XContentHelper.convertToMap( - entityContentType.xContent(), - response.getEntity().getContent(), - false - ); - if (logger.isTraceEnabled()) { - logger.trace( - "segments response for {}: {}", - index, - originalResponseEntity.keySet() - .stream() - .map(key -> key + "=" + originalResponseEntity.get(key)) - .collect(Collectors.joining(", ", "{", "}")) - ); - } - Map responseEntity = (Map) originalResponseEntity.get("indices"); - responseEntity = (Map) responseEntity.get(index); - responseEntity = (Map) responseEntity.get("shards"); - List> shards = (List>) responseEntity.get("0"); - // We want to mamke sure to get the primary shard because there is a chance the replica doesn't have data yet: - Optional> shardOptional = shards.stream() - .filter(shard -> ((Map) shard.get("routing")).get("primary").equals(true)) - .findAny(); - if (shardOptional.isPresent()) { - return (Integer) shardOptional.get().get("num_search_segments"); - } else { - throw new RuntimeException("No primary shard found for index " + index); - } + final Map originalResponseEntity = ESRestTestCase.entityAsMap(response); + logger.trace("segments response for {}: {}", index, originalResponseEntity); + // We need to use a stash here because the index name is likely a data stream index with a dot. + Stash stash = new Stash(); + stash.stashValue("index", index); + Map responseEntity = new ObjectPath(originalResponseEntity).evaluate("indices.${index}.shards", stash); + return responseEntity.values() + .stream() + .mapToInt( + shardList -> ((List>) shardList).stream() + .filter(shard -> ((Map) shard.get("routing")).get("primary").equals(true)) + .findFirst() + .map(shard -> (Integer) shard.get("num_search_segments")) + .orElseThrow(() -> new IllegalStateException("no primary shard found in " + shardList)) + ) + .sum(); } public static void updatePolicy(RestClient client, String indexName, String policy) throws IOException { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 39834637d2ba9..111de0b6f400a 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -156,7 +156,7 @@ public void testSearchableSnapshotForceMergesClonedIndex() throws Exception { configureClusterAllocation(true); } - assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, true); } catch (Exception | AssertionError e) { // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. configureClusterAllocation(true); @@ -172,12 +172,13 @@ public void testSearchableSnapshotForceMergesSourceIndex() throws Exception { // Data streams have 1 primary shard by default. // The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas. final String phase = randomBoolean() ? "cold" : "frozen"; - final String backingIndexName = prepareDataStreamWithDocs(phase, 1, 0); + final int numberOfPrimaries = 1; + final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, 0); // Enable/start ILM on the data stream. updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy)); - assertForceMergedSnapshotDone(phase, backingIndexName, false, false); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, false); } /** @@ -209,7 +210,7 @@ public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exce configureClusterAllocation(true); - assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true); + assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, true); } catch (Exception | AssertionError e) { // Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected. configureClusterAllocation(true); @@ -1156,6 +1157,13 @@ private Step.StepKey getKeyForIndex(Response response, String indexName) throws * first generation backing index. */ private String prepareDataStreamWithDocs(String phase, int numberOfPrimaries, int numberOfReplicas) throws Exception { + logger.info( + "--> running [{}] with [{}] primaries, [{}] replicas, in phase [{}}", + getTestName(), + numberOfPrimaries, + numberOfReplicas, + phase + ); createSnapshotRepo(client(), snapshotRepo, randomBoolean()); createNewSingletonPolicy(client(), policy, phase, new SearchableSnapshotAction(snapshotRepo, true)); @@ -1223,10 +1231,10 @@ private static void assertForceMergeCloneIndexSettings(String backingIndexName, * * @param phase The phase of the ILM policy that the searchable snapshot action runs in. * @param backingIndexName The original backing index name. - * @param multiplePrimaries True if the original backing index had multiple primaries, affecting segment assertions. + * @param numberOfPrimaries The number of primaries that the original backing index had, affecting segment count assertions. * @param withReplicas True if the original backing index had one or more replicas, affecting snapshot index naming assertions. */ - private void assertForceMergedSnapshotDone(String phase, String backingIndexName, boolean multiplePrimaries, boolean withReplicas) + private void assertForceMergedSnapshotDone(String phase, String backingIndexName, int numberOfPrimaries, boolean withReplicas) throws Exception { final String prefix = phase.equals("cold") ? SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX @@ -1240,11 +1248,11 @@ private void assertForceMergedSnapshotDone(String phase, String backingIndexName // Regardless of whether we force merged the backing index or a clone, the cloned index should not exist (anymore). awaitIndexDoesNotExist(FORCE_MERGE_CLONE_INDEX_PREFIX + "-*-" + backingIndexName); - // Retrieve the number of segments in the first (random) shard of the backing index. + // Retrieve the total number of segments across all primary shards of the restored index. final Integer numberOfPrimarySegments = getNumberOfPrimarySegments(client(), restoredIndexName); - // If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment - if (multiplePrimaries || phase.equals("frozen")) { - assertThat(numberOfPrimarySegments, lessThanOrEqualTo(1)); + // If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment. + if (numberOfPrimaries > 1 || phase.equals("frozen")) { + assertThat(numberOfPrimarySegments, lessThanOrEqualTo(numberOfPrimaries)); } else { // If the backing index had only one primary, we expect exactly 1 segment after force merging. assertThat(numberOfPrimarySegments, equalTo(1)); From 44adfbbf79374a53a5984ebdd9c48d822e75a157 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Thu, 4 Sep 2025 21:01:16 -0300 Subject: [PATCH 17/17] Refactor `ObjectPath` evaluation --- .../java/org/elasticsearch/xpack/TimeSeriesRestDriver.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java index b832ef2678861..299b9e9313b89 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java @@ -25,7 +25,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ObjectPath; -import org.elasticsearch.test.rest.Stash; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; @@ -375,10 +374,7 @@ public static Integer getNumberOfPrimarySegments(RestClient client, String index Response response = client.performRequest(new Request("GET", index + "/_segments")); final Map originalResponseEntity = ESRestTestCase.entityAsMap(response); logger.trace("segments response for {}: {}", index, originalResponseEntity); - // We need to use a stash here because the index name is likely a data stream index with a dot. - Stash stash = new Stash(); - stash.stashValue("index", index); - Map responseEntity = new ObjectPath(originalResponseEntity).evaluate("indices.${index}.shards", stash); + Map responseEntity = new ObjectPath(originalResponseEntity).evaluateExact("indices", index, "shards"); return responseEntity.values() .stream() .mapToInt(