Skip to content

Commit cb6cfe3

Browse files
committed
Clone index with 0 replicas before force merging in ILM
1 parent 699f4cf commit cb6cfe3

File tree

13 files changed

+926
-207
lines changed

13 files changed

+926
-207
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public record LifecycleExecutionState(
4040
String snapshotName,
4141
String shrinkIndexName,
4242
String snapshotIndexName,
43-
String downsampleIndexName
43+
String downsampleIndexName,
44+
String forceMergeIndexName
4445
) {
4546

4647
public static final String ILM_CUSTOM_METADATA_KEY = "ilm";
@@ -64,6 +65,7 @@ public record LifecycleExecutionState(
6465
private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
6566
private static final String SHRINK_INDEX_NAME = "shrink_index_name";
6667
private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name";
68+
private static final String FORCE_MERGE_INDEX_NAME = "force_merge_index_name";
6769

6870
public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build();
6971

@@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) {
8991
.setShrinkIndexName(state.shrinkIndexName)
9092
.setSnapshotIndexName(state.snapshotIndexName)
9193
.setDownsampleIndexName(state.downsampleIndexName)
92-
.setStepTime(state.stepTime);
94+
.setStepTime(state.stepTime)
95+
.setForceMergeIndexName(state.forceMergeIndexName);
9396
}
9497

9598
public static LifecycleExecutionState fromCustomMetadata(Map<String, String> customData) {
@@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map<String, String> cus
202205
if (downsampleIndexName != null) {
203206
builder.setDownsampleIndexName(downsampleIndexName);
204207
}
208+
String forceMergeIndexName = customData.get(FORCE_MERGE_INDEX_NAME);
209+
if (forceMergeIndexName != null) {
210+
builder.setForceMergeIndexName(forceMergeIndexName);
211+
}
205212
return builder.build();
206213
}
207214

@@ -274,6 +281,9 @@ public Map<String, String> asMap() {
274281
if (downsampleIndexName != null) {
275282
result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName);
276283
}
284+
if (forceMergeIndexName != null) {
285+
result.put(FORCE_MERGE_INDEX_NAME, forceMergeIndexName);
286+
}
277287
return Collections.unmodifiableMap(result);
278288
}
279289

@@ -307,6 +317,7 @@ public static class Builder {
307317
private String shrinkIndexName;
308318
private String snapshotIndexName;
309319
private String downsampleIndexName;
320+
private String forceMergeIndexName;
310321

311322
public Builder setPhase(String phase) {
312323
this.phase = phase;
@@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) {
398409
return this;
399410
}
400411

412+
public Builder setForceMergeIndexName(String forceMergeIndexName) {
413+
this.forceMergeIndexName = forceMergeIndexName;
414+
return this;
415+
}
416+
401417
public LifecycleExecutionState build() {
402418
return new LifecycleExecutionState(
403419
phase,
@@ -417,7 +433,8 @@ public LifecycleExecutionState build() {
417433
snapshotName,
418434
shrinkIndexName,
419435
snapshotIndexName,
420-
downsampleIndexName
436+
downsampleIndexName,
437+
forceMergeIndexName
421438
);
422439
}
423440
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.elasticsearch.cluster.metadata.DataStream;
8585
import org.elasticsearch.cluster.metadata.IndexMetadata;
8686
import org.elasticsearch.cluster.metadata.Metadata;
87+
import org.elasticsearch.cluster.metadata.ProjectId;
8788
import org.elasticsearch.cluster.metadata.ProjectMetadata;
8889
import org.elasticsearch.cluster.node.DiscoveryNode;
8990
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -1844,6 +1845,19 @@ public static void awaitIndexExists(String index, Client client, TimeValue timeo
18441845
);
18451846
}
18461847

1848+
/**
1849+
*
1850+
* Waits until the specified index no longer exists in the cluster.
1851+
* This method blocks until the index is deleted or times out.
1852+
* Note that this method waits by listening to cluster state updates <i>on the master node</i>.
1853+
* Meaning that if this method returns, all other nodes are aware that that index is deleted from the cluster state as well.
1854+
*
1855+
* @param index the name of the index to wait for deletion
1856+
*/
1857+
public static void awaitIndexNotExists(String index) {
1858+
awaitClusterState(state -> state.metadata().getProject(ProjectId.DEFAULT).hasIndex(index) == false);
1859+
}
1860+
18471861
/**
18481862
* Syntactic sugar for enabling allocation for <code>indices</code>
18491863
*/

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeAction.java

Lines changed: 220 additions & 42 deletions
Large diffs are not rendered by default.

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public void performAction(
5151
ClusterStateObserver observer,
5252
ActionListener<Void> listener
5353
) {
54-
String indexName = indexMetadata.getIndex().getName();
54+
// Use the cloned index name if we have one, otherwise fall back to the original index name.
55+
String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
56+
String indexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName();
5557
ForceMergeRequest request = new ForceMergeRequest(indexName);
5658
request.maxNumSegments(maxNumSegments);
5759
getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
5757
private static final ParseField SHRINK_INDEX_NAME = new ParseField("shrink_index_name");
5858
private static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name");
5959
private static final ParseField SKIP_NAME = new ParseField("skip");
60+
private static final ParseField FORCE_MERGE_INDEX_NAME = new ParseField("force_merge_index_name");
6061

6162
public static final ConstructingObjectParser<IndexLifecycleExplainResponse, Void> PARSER = new ConstructingObjectParser<>(
6263
"index_lifecycle_explain_response",
@@ -81,7 +82,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
8182
(BytesReference) a[11],
8283
(BytesReference) a[21],
8384
(PhaseExecutionInfo) a[12],
84-
Objects.requireNonNullElse((Boolean) a[22], false)
85+
Objects.requireNonNullElse((Boolean) a[22], false),
86+
(String) a[24]
8587
// a[13] == "age"
8688
// a[20] == "time_since_index_creation"
8789
// a[23] = "age_in_millis"
@@ -124,6 +126,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
124126
}, PREVIOUS_STEP_INFO_FIELD);
125127
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SKIP_NAME);
126128
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), AGE_IN_MILLIS_FIELD);
129+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX_NAME);
127130
}
128131

129132
private final String index;
@@ -147,6 +150,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
147150
private final String snapshotName;
148151
private final String shrinkIndexName;
149152
private final boolean skip;
153+
private final String forceMergeIndexName;
150154

151155
Supplier<Long> nowSupplier = System::currentTimeMillis; // Can be changed for testing
152156

@@ -170,7 +174,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse(
170174
BytesReference stepInfo,
171175
BytesReference previousStepInfo,
172176
PhaseExecutionInfo phaseExecutionInfo,
173-
boolean skip
177+
boolean skip,
178+
String forceMergeIndexName
174179
) {
175180
return new IndexLifecycleExplainResponse(
176181
index,
@@ -193,7 +198,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse(
193198
stepInfo,
194199
previousStepInfo,
195200
phaseExecutionInfo,
196-
skip
201+
skip,
202+
forceMergeIndexName
197203
);
198204
}
199205

@@ -219,7 +225,8 @@ public static IndexLifecycleExplainResponse newUnmanagedIndexResponse(String ind
219225
null,
220226
null,
221227
null,
222-
false
228+
false,
229+
null
223230
);
224231
}
225232

@@ -244,7 +251,8 @@ private IndexLifecycleExplainResponse(
244251
BytesReference stepInfo,
245252
BytesReference previousStepInfo,
246253
PhaseExecutionInfo phaseExecutionInfo,
247-
boolean skip
254+
boolean skip,
255+
String forceMergeIndexName
248256
) {
249257
if (managedByILM) {
250258
if (policyName == null) {
@@ -313,6 +321,7 @@ private IndexLifecycleExplainResponse(
313321
this.snapshotName = snapshotName;
314322
this.shrinkIndexName = shrinkIndexName;
315323
this.skip = skip;
324+
this.forceMergeIndexName = forceMergeIndexName;
316325
}
317326

318327
public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
@@ -351,6 +360,8 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
351360
} else {
352361
skip = false;
353362
}
363+
// No need for serialization from this point onwards as this action only runs on the local node.
364+
forceMergeIndexName = null;
354365
} else {
355366
policyName = null;
356367
lifecycleDate = null;
@@ -371,6 +382,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
371382
shrinkIndexName = null;
372383
indexCreationDate = null;
373384
skip = false;
385+
forceMergeIndexName = null;
374386
}
375387
}
376388

@@ -405,6 +417,7 @@ public void writeTo(StreamOutput out) throws IOException {
405417
|| out.getTransportVersion().onOrAfter(TransportVersions.ILM_ADD_SKIP_SETTING)) {
406418
out.writeBoolean(skip);
407419
}
420+
// No need for deserialization from this point onwards as this action only runs on the local node.
408421
}
409422
}
410423

@@ -508,6 +521,10 @@ public boolean getSkip() {
508521
return skip;
509522
}
510523

524+
public String getForceMergeIndexName() {
525+
return forceMergeIndexName;
526+
}
527+
511528
@Override
512529
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
513530
builder.startObject();
@@ -595,6 +612,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
595612
builder.field(PHASE_EXECUTION_INFO.getPreferredName(), phaseExecutionInfo);
596613
}
597614
builder.field(SKIP_NAME.getPreferredName(), skip);
615+
if (forceMergeIndexName != null) {
616+
builder.field(FORCE_MERGE_INDEX_NAME.getPreferredName(), forceMergeIndexName);
617+
}
598618
}
599619
builder.endObject();
600620
return builder;
@@ -623,7 +643,8 @@ public int hashCode() {
623643
stepInfo,
624644
previousStepInfo,
625645
phaseExecutionInfo,
626-
skip
646+
skip,
647+
forceMergeIndexName
627648
);
628649
}
629650

@@ -656,7 +677,8 @@ public boolean equals(Object obj) {
656677
&& Objects.equals(stepInfo, other.stepInfo)
657678
&& Objects.equals(previousStepInfo, other.previousStepInfo)
658679
&& Objects.equals(phaseExecutionInfo, other.phaseExecutionInfo)
659-
&& Objects.equals(skip, other.skip);
680+
&& Objects.equals(skip, other.skip)
681+
&& Objects.equals(forceMergeIndexName, other.forceMergeIndexName);
660682
}
661683

662684
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.cluster.routing.ShardRouting;
2020
import org.elasticsearch.common.Strings;
2121
import org.elasticsearch.core.TimeValue;
22-
import org.elasticsearch.index.Index;
2322
import org.elasticsearch.xcontent.ConstructingObjectParser;
2423
import org.elasticsearch.xcontent.ParseField;
2524
import org.elasticsearch.xcontent.ToXContentObject;
@@ -58,16 +57,18 @@ public int getMaxNumSegments() {
5857

5958
@Override
6059
public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
61-
Index index = indexMetadata.getIndex();
60+
// Use the cloned index name if we have one, otherwise fall back to the original index name.
61+
String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
62+
String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName();
6263
getClient(state.projectId()).admin()
6364
.indices()
64-
.segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
65-
IndexSegments idxSegments = response.getIndices().get(index.getName());
65+
.segments(new IndicesSegmentsRequest(forceMergedIndexName), ActionListener.wrap(response -> {
66+
IndexSegments idxSegments = response.getIndices().get(forceMergedIndexName);
6667
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
6768
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
6869
logger.info(
6970
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
70-
index.getName(),
71+
forceMergedIndexName,
7172
response.getFailedShards(),
7273
failures == null
7374
? "n/a"
@@ -86,7 +87,7 @@ public void evaluateCondition(ProjectState state, IndexMetadata indexMetadata, L
8687
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
8788
logger.info(
8889
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
89-
index.getName(),
90+
forceMergedIndexName,
9091
maxNumSegments,
9192
unmergedShards.size(),
9293
unmergedShardCounts

0 commit comments

Comments
 (0)