Skip to content

Commit 680a5ce

Browse files
committed
Add ForceMergeActionIT
1 parent f82222f commit 680a5ce

File tree

8 files changed

+538
-48
lines changed

8 files changed

+538
-48
lines changed

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ public static void setAllElapsedMillis(ClusterStatePublicationEvent clusterState
239239
clusterStatePublicationEvent.setMasterApplyElapsedMillis(0L);
240240
}
241241

242-
public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate, ClusterService clusterService)
243-
throws Exception {
242+
public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate, ClusterService clusterService) {
244243
final var listener = addTemporaryStateListener(clusterService, statePredicate, ESTestCase.TEST_REQUEST_TIMEOUT);
245244
ESTestCase.safeAwait(listener, ESTestCase.TEST_REQUEST_TIMEOUT);
246245
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.elasticsearch.cluster.metadata.DataStream;
8686
import org.elasticsearch.cluster.metadata.IndexMetadata;
8787
import org.elasticsearch.cluster.metadata.Metadata;
88+
import org.elasticsearch.cluster.metadata.ProjectId;
8889
import org.elasticsearch.cluster.metadata.ProjectMetadata;
8990
import org.elasticsearch.cluster.node.DiscoveryNode;
9091
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -1219,8 +1220,8 @@ public static PendingClusterTasksResponse getClusterPendingTasks(Client client)
12191220
}
12201221
}
12211222

1222-
protected void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
1223-
awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
1223+
protected static void awaitClusterState(Predicate<ClusterState> statePredicate) {
1224+
awaitClusterState(null, internalCluster().getMasterName(), statePredicate);
12241225
}
12251226

12261227
protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
@@ -1231,7 +1232,7 @@ public static void awaitClusterState(Logger logger, Predicate<ClusterState> stat
12311232
awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
12321233
}
12331234

1234-
public static void awaitClusterState(Logger logger, String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
1235+
public static void awaitClusterState(Logger logger, String viaNode, Predicate<ClusterState> statePredicate) {
12351236
ClusterServiceUtils.awaitClusterState(logger, statePredicate, internalCluster().getInstance(ClusterService.class, viaNode));
12361237
}
12371238

@@ -1853,6 +1854,19 @@ public static void awaitIndexExists(String index, Client client, TimeValue timeo
18531854
);
18541855
}
18551856

1857+
/**
1858+
*
1859+
* Waits until the specified index no longer exists in the cluster.
1860+
* This method blocks until the index is deleted or times out.
1861+
* Note that this method waits by listening to cluster state updates <i>on the master node</i>.
1862+
* Meaning that if this method returns, all other nodes are aware that that index is deleted from the cluster state as well.
1863+
*
1864+
* @param index the name of the index to wait for deletion
1865+
*/
1866+
public static void awaitIndexNotExists(String index) {
1867+
awaitClusterState(state -> state.metadata().getProject(ProjectId.DEFAULT).hasIndex(index) == false);
1868+
}
1869+
18561870
/**
18571871
* Syntactic sugar for enabling allocation for <code>indices</code>
18581872
*/

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeAction.java

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@ public class ForceMergeAction implements LifecycleAction {
4545
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
4646
.build();
4747
private static final Settings CLONE_SETTINGS_WITH_CODEC = Settings.builder()
48-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
49-
.put(EngineConfig.INDEX_CODEC_SETTING.getKey(), CodecService.BEST_COMPRESSION_CODEC)
48+
.put(CLONE_SETTINGS_WITHOUT_CODEC)
49+
.put(BEST_COMPRESSION_SETTINGS)
5050
.build();
5151

5252
public static final String NAME = "forcemerge";
5353
public static final ParseField MAX_NUM_SEGMENTS_FIELD = new ParseField("max_num_segments");
5454
public static final ParseField CODEC = new ParseField("index_codec");
5555

56-
public static final String FORCE_MERGED_INDEX_PREFIX = "force-merged-";
57-
public static final BiFunction<String, LifecycleExecutionState, String> FORCE_MERGED_INDEX_NAME_SUPPLIER = (indexName, state) -> state
56+
public static final String FORCE_MERGE_INDEX_PREFIX = "force-merge-";
57+
public static final BiFunction<String, LifecycleExecutionState, String> FORCE_MERGE_INDEX_NAME_SUPPLIER = (indexName, state) -> state
5858
.forceMergeIndexName();
5959

6060
public static final String CONDITIONAL_SKIP_FORCE_MERGE_STEP = BranchingStep.NAME + "-forcemerge-check-prerequisites";
@@ -167,6 +167,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
167167
StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
168168
StepKey deleteSourceIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
169169

170+
// If the index is mounted as a searchable snapshot, skip the whole force-merge action
170171
BranchingStep conditionalSkipForceMergeStep = new BranchingStep(
171172
conditionalSkipForceMergeKey,
172173
checkNotWriteIndexKey,
@@ -189,30 +190,33 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
189190
}
190191
);
191192

192-
// Indices in this step key can skip the no-op step and jump directly to the step with closeIndexKey/forcemergeKey key
193+
// Wait for the index to not be the write index of a data stream
193194
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(
194195
checkNotWriteIndexKey,
195196
waitUntilTimeSeriesEndTimeKey
196197
);
197198

199+
// If the index is a time series index, wait until its end time has passed
198200
WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep(
199201
waitUntilTimeSeriesEndTimeKey,
200202
conditionalSkipCloneKey,
201203
Instant::now
202204
);
203205

206+
// If the index already has 0 replicas, we can skip the clone steps. If the action is configured to change the codec, we must
207+
// first update the codec of the original index before proceeding to the force-merge step.
204208
BranchingStep conditionalSkipCloneStep = new BranchingStep(
205209
conditionalSkipCloneKey,
206210
cleanupClonedIndexKey,
207211
codecChange ? closeIndexKey : forceMergeKey,
208212
(index, project) -> {
209213
IndexMetadata indexMetadata = project.index(index);
210214
assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state";
211-
// Returns true if the index already has zero replicas, which means we can avoid the "clone with 0 replicas" dance
212-
return indexMetadata.getSettings().getAsInt(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) == 0;
215+
return indexMetadata.getNumberOfReplicas() == 0;
213216
}
214217
);
215218

219+
// Closing the index is required to change the index codec.
216220
CloseIndexStep closeIndexStep = new CloseIndexStep(closeIndexKey, updateBestCompressionKey, client);
217221
UpdateSettingsStep updateBestCompressionStep = new UpdateSettingsStep(
218222
updateBestCompressionKey,
@@ -227,6 +231,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
227231
ClusterHealthStatus.GREEN
228232
);
229233

234+
// If a previous force-merge action created a clone index but the action did not complete, we need to clean up the old clone index.
230235
CleanupClonedIndexStep cleanupClonedIndexStep = new CleanupClonedIndexStep(cleanupClonedIndexKey, readOnlyKey, client);
231236
// The readOnlyKey used to exist for BwC reasons (as the step was removed at some point). It has now been reintroduced with the
232237
// changes to make the force-merge action use a cloned index. Therefore, we intentionally put this step before the
@@ -236,81 +241,86 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
236241
GenerateUniqueIndexNameStep generateCloneIndexNameStep = new GenerateUniqueIndexNameStep(
237242
generateCloneIndexNameKey,
238243
cloneIndexKey,
239-
FORCE_MERGED_INDEX_PREFIX,
244+
FORCE_MERGE_INDEX_PREFIX,
240245
(generatedIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setForceMergeIndexName(generatedIndexName)
241246
);
242247
// Clone the index with 0 replicas and the best compression codec if configured.
243248
CloneIndexStep cloneIndexStep = new CloneIndexStep(
244249
cloneIndexKey,
245250
waitForClonedIndexGreenKey,
246251
client,
247-
FORCE_MERGED_INDEX_NAME_SUPPLIER,
252+
FORCE_MERGE_INDEX_NAME_SUPPLIER,
248253
codecChange ? CLONE_SETTINGS_WITH_CODEC : CLONE_SETTINGS_WITHOUT_CODEC
249254
);
255+
// Wait for the cloned index to be green before proceeding with the force-merge. We wrap this with a
256+
// ClusterStateWaitUntilThresholdStep to avoid waiting forever if the index cannot be started for some reason. On timeout,
257+
// ILM will move back to the cleanup step, remove the cloned index, and retry the clone.
250258
ClusterStateWaitUntilThresholdStep waitForClonedIndexGreenStep = new ClusterStateWaitUntilThresholdStep(
251259
new WaitForIndexColorStep(
252260
waitForClonedIndexGreenKey,
253261
forceMergeKey,
254262
ClusterHealthStatus.GREEN,
255-
FORCE_MERGED_INDEX_NAME_SUPPLIER
263+
FORCE_MERGE_INDEX_NAME_SUPPLIER
256264
),
257265
cleanupClonedIndexKey
258266
);
259267

268+
// Execute the force merge (either on the original index or the cloned index).
260269
ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeKey, segmentCountKey, client, maxNumSegments);
270+
// This step simply logs whether the force-merge resulted in the desired number of segments or not.
261271
SegmentCountStep segmentCountStep = new SegmentCountStep(
262272
segmentCountKey,
263273
conditionalConfigureClonedIndexKey,
264274
client,
265275
maxNumSegments
266276
);
267277

278+
// If we cloned the index, we need to complete the setup of the cloned index and swap it with the original index.
279+
// If we did not clone the index, there's nothing else for us to do.
268280
BranchingStep conditionalConfigureClonedIndexStep = new BranchingStep(
269281
conditionalConfigureClonedIndexKey,
270-
nextStepKey, // If we didn't have a clone, there's nothing else for us to do
271-
reconfigureReplicasKey, // Otherwise, we need to finish the setup of the cloned (and force-merged) index
282+
nextStepKey,
283+
reconfigureReplicasKey,
272284
(index, project) -> {
273285
IndexMetadata indexMetadata = project.index(index);
274286
assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state";
275-
// Returns true if the index has a clone index name set, which means we need to reconfigure the cloned index
276287
String cloneIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
277288
if (cloneIndexName == null) {
278289
return false;
279290
}
280-
// If for some reason the cloned index does not exist, we don't want to fail the next steps, so we skip them.
281-
// This should not happen in ordinary circumstances.
291+
// If for some reason the cloned index does not exist in the cluster state, we don't want to fail the next steps,
292+
// so we skip them. This should not happen in ordinary circumstances.
282293
boolean clonedIndexExists = project.index(cloneIndexName) != null;
283294
assert clonedIndexExists
284295
: "index [" + index.getName() + "] has cloned index name [" + cloneIndexName + "] but it does not exist in the cluster";
285296
return clonedIndexExists;
286297
}
287298
);
288299

289-
// Reset the number of replicas to the value of the original index and remove the write block
300+
// Reset the number of replicas to the value of the original index and remove the write block.
290301
UpdateSettingsStep reconfigureReplicasStep = new UpdateSettingsStep(
291302
reconfigureReplicasKey,
292303
copyMetadataKey,
293304
client,
294-
FORCE_MERGED_INDEX_NAME_SUPPLIER,
305+
FORCE_MERGE_INDEX_NAME_SUPPLIER,
295306
(indexMetadata) -> Settings.builder()
296-
.put(
297-
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
298-
indexMetadata.getSettings().get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
299-
)
307+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, indexMetadata.getNumberOfReplicas())
300308
.put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), (String) null)
301309
.build()
302310
);
311+
// Copy the lifecycle execution state from the original index to the cloned index. This also remove the skip setting so that
312+
// the cloned index can continue through the ILM phases.
303313
CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(
304314
copyMetadataKey,
305315
aliasDataStreamBranchingKey,
306-
FORCE_MERGED_INDEX_NAME_SUPPLIER,
316+
FORCE_MERGE_INDEX_NAME_SUPPLIER,
307317
nextStepKey
308318
);
309-
// by the time we get to this step we have 2 indices, the source and the cloned one. we now need to choose an index
310-
// swapping strategy such that the cloned index takes the place of the source index (which is also deleted).
311-
// if the source index is part of a data stream it's a matter of replacing it with the cloned index one in the data stream and
312-
// then deleting the source index; otherwise we'll use the alias management api to atomically transfer the aliases from source to
313-
// the cloned index and delete the source
319+
// By the time we get to this step we have 2 indices, the source and the cloned one. We now need to choose an index
320+
// swapping strategy such that the cloned index takes the place of the source index (which should also be deleted).
321+
// If the source index is part of a data stream it's a matter of replacing it with the cloned index one in the data stream and
322+
// then deleting the source index; otherwise we'll use the alias management API to atomically transfer the aliases from
323+
// the source index to the cloned index and delete the source.
314324
BranchingStep aliasDataStreamBranchingStep = new BranchingStep(
315325
aliasDataStreamBranchingKey,
316326
aliasSwapAndDeleteKey,
@@ -325,13 +335,13 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
325335
aliasSwapAndDeleteKey,
326336
nextStepKey,
327337
client,
328-
FORCE_MERGED_INDEX_NAME_SUPPLIER,
338+
FORCE_MERGE_INDEX_NAME_SUPPLIER,
329339
true
330340
);
331341
ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndexStep = new ReplaceDataStreamBackingIndexStep(
332342
replaceDataStreamIndexKey,
333343
deleteSourceIndexKey,
334-
FORCE_MERGED_INDEX_NAME_SUPPLIER
344+
FORCE_MERGE_INDEX_NAME_SUPPLIER
335345
);
336346
DeleteStep deleteSourceIndexStep = new DeleteStep(deleteSourceIndexKey, nextStepKey, client);
337347

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public void performAction(
5151
ClusterStateObserver observer,
5252
ActionListener<Void> listener
5353
) {
54+
// Use the cloned index name if we have one, otherwise fall back to the original index name.
5455
String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
5556
String indexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName();
5657
ForceMergeRequest request = new ForceMergeRequest(indexName);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public int getMaxNumSegments() {
5757

5858
@Override
5959
public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
60+
// Use the cloned index name if we have one, otherwise fall back to the original index name.
6061
String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
6162
String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName();
6263
getClient(state.projectId()).admin()

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/PhaseCacheManagementTests.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,6 @@ public void testReadStepKeys() {
260260
String phaseDef = Strings.toString(pei);
261261
logger.info("--> phaseDef: {}", phaseDef);
262262

263-
assumeFalse("", true);
264263
assertThat(
265264
readStepKeys(REGISTRY, client, phaseDef, "phase", null),
266265
contains(
@@ -269,10 +268,25 @@ public void testReadStepKeys() {
269268
new Step.StepKey("phase", "forcemerge", ForceMergeAction.CONDITIONAL_SKIP_FORCE_MERGE_STEP),
270269
new Step.StepKey("phase", "forcemerge", CheckNotDataStreamWriteIndexStep.NAME),
271270
new Step.StepKey("phase", "forcemerge", WaitUntilTimeSeriesEndTimePassesStep.NAME),
272-
// This read-only key is now a noop step but we preserved it for backwards compatibility
273-
new Step.StepKey("phase", "forcemerge", ReadOnlyAction.NAME),
274-
new Step.StepKey("phase", "forcemerge", ForceMergeAction.NAME),
275-
new Step.StepKey("phase", "forcemerge", SegmentCountStep.NAME)
271+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.CONDITIONAL_SKIP_CLONE_STEP),
272+
new Step.StepKey("phase", "forcemerge", CloseIndexStep.NAME),
273+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.UPDATE_COMPRESSION_SETTINGS_STEP),
274+
new Step.StepKey("phase", "forcemerge", OpenIndexStep.NAME),
275+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.WAIT_FOR_COMPRESSION_SETTINGS_GREEN),
276+
new Step.StepKey("phase", "forcemerge", CleanupClonedIndexStep.NAME),
277+
new Step.StepKey("phase", "forcemerge", ReadOnlyStep.NAME),
278+
new Step.StepKey("phase", "forcemerge", GenerateUniqueIndexNameStep.NAME),
279+
new Step.StepKey("phase", "forcemerge", CloneIndexStep.NAME),
280+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.WAIT_FOR_CLONED_INDEX_GREEN),
281+
new Step.StepKey("phase", "forcemerge", ForceMergeStep.NAME),
282+
new Step.StepKey("phase", "forcemerge", SegmentCountStep.NAME),
283+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.CONDITIONAL_CONFIGURE_CLONED_INDEX_STEP),
284+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.UPDATE_CLONED_INDEX_SETTINGS_STEP),
285+
new Step.StepKey("phase", "forcemerge", CopyExecutionStateStep.NAME),
286+
new Step.StepKey("phase", "forcemerge", ForceMergeAction.CONDITIONAL_DATA_STREAM_CHECK_STEP),
287+
new Step.StepKey("phase", "forcemerge", ShrinkSetAliasStep.NAME),
288+
new Step.StepKey("phase", "forcemerge", ReplaceDataStreamBackingIndexStep.NAME),
289+
new Step.StepKey("phase", "forcemerge", DeleteStep.NAME)
276290
)
277291
);
278292
}

0 commit comments

Comments
 (0)