Skip to content

Commit f2aa2fd

Browse files
committed
Force merge on clone in snapshot
1 parent d55a91b commit f2aa2fd

File tree

16 files changed

+499
-108
lines changed

16 files changed

+499
-108
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public record LifecycleExecutionState(
4040
String snapshotName,
4141
String shrinkIndexName,
4242
String snapshotIndexName,
43-
String downsampleIndexName
43+
String downsampleIndexName,
44+
String forceMergeIndexName
4445
) {
4546

4647
public static final String ILM_CUSTOM_METADATA_KEY = "ilm";
@@ -64,6 +65,7 @@ public record LifecycleExecutionState(
6465
private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
6566
private static final String SHRINK_INDEX_NAME = "shrink_index_name";
6667
private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name";
68+
private static final String FORCE_MERGE_INDEX_NAME = "force_merge_index_name";
6769

6870
public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build();
6971

@@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) {
8991
.setShrinkIndexName(state.shrinkIndexName)
9092
.setSnapshotIndexName(state.snapshotIndexName)
9193
.setDownsampleIndexName(state.downsampleIndexName)
92-
.setStepTime(state.stepTime);
94+
.setStepTime(state.stepTime)
95+
.setForceMergeIndexName(state.forceMergeIndexName);
9396
}
9497

9598
public static LifecycleExecutionState fromCustomMetadata(Map<String, String> customData) {
@@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map<String, String> cus
202205
if (downsampleIndexName != null) {
203206
builder.setDownsampleIndexName(downsampleIndexName);
204207
}
208+
String forceMergeIndexName = customData.get(FORCE_MERGE_INDEX_NAME);
209+
if (forceMergeIndexName != null) {
210+
builder.setForceMergeIndexName(forceMergeIndexName);
211+
}
205212
return builder.build();
206213
}
207214

@@ -274,6 +281,9 @@ public Map<String, String> asMap() {
274281
if (downsampleIndexName != null) {
275282
result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName);
276283
}
284+
if (forceMergeIndexName != null) {
285+
result.put(FORCE_MERGE_INDEX_NAME, forceMergeIndexName);
286+
}
277287
return Collections.unmodifiableMap(result);
278288
}
279289

@@ -307,6 +317,7 @@ public static class Builder {
307317
private String shrinkIndexName;
308318
private String snapshotIndexName;
309319
private String downsampleIndexName;
320+
private String forceMergeIndexName;
310321

311322
public Builder setPhase(String phase) {
312323
this.phase = phase;
@@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) {
398409
return this;
399410
}
400411

412+
public Builder setForceMergeIndexName(String forceMergeIndexName) {
413+
this.forceMergeIndexName = forceMergeIndexName;
414+
return this;
415+
}
416+
401417
public LifecycleExecutionState build() {
402418
return new LifecycleExecutionState(
403419
phase,
@@ -417,7 +433,8 @@ public LifecycleExecutionState build() {
417433
snapshotName,
418434
shrinkIndexName,
419435
snapshotIndexName,
420-
downsampleIndexName
436+
downsampleIndexName,
437+
forceMergeIndexName
421438
);
422439
}
423440
}

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2087,6 +2087,10 @@ protected static void awaitIndexExists(String index, TimeValue timeout) throws I
20872087
ensureHealth(client(), index, request -> request.addParameter("timeout", timeout.toString()));
20882088
}
20892089

2090+
protected static void awaitIndexDoesNotExist(String index) throws Exception {
2091+
awaitIndexDoesNotExist(index, TimeValue.timeValueSeconds(10));
2092+
}
2093+
20902094
protected static void awaitIndexDoesNotExist(String index, TimeValue timeout) throws Exception {
20912095
assertBusy(() -> assertFalse(indexExists(index)), timeout.millis(), TimeUnit.MILLISECONDS);
20922096
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,11 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
101101
);
102102
return;
103103
}
104+
// If we performed the force merge step on the cloned index, we need to snapshot that index instead of the original.
105+
final String clonedIndexName = lifecycleState.forceMergeIndexName();
106+
final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : indexName;
104107
CreateSnapshotRequest request = new CreateSnapshotRequest(TimeValue.MAX_VALUE, snapshotRepository, snapshotName);
105-
request.indices(indexName);
108+
request.indices(forceMergedIndexName);
106109
// this is safe as the snapshot creation will still be async, it's just that the listener will be notified when the snapshot is
107110
// complete
108111
request.waitForCompletion(true);
@@ -112,7 +115,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
112115
logger.debug(
113116
"create snapshot response for policy [{}] and index [{}] is: {}",
114117
policyName,
115-
indexName,
118+
forceMergedIndexName,
116119
Strings.toString(response)
117120
);
118121
final SnapshotInfo snapInfo = response.getSnapshotInfo();
@@ -128,7 +131,7 @@ void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionList
128131
snapshotRepository,
129132
snapshotName,
130133
policyName,
131-
indexName,
134+
forceMergedIndexName,
132135
snapInfo.failedShards(),
133136
snapInfo.totalShards()
134137
)

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,54 @@
1616
import org.elasticsearch.cluster.metadata.DataStream;
1717
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1818
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
1920
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2021
import org.elasticsearch.common.Strings;
2122
import org.elasticsearch.core.TimeValue;
2223
import org.elasticsearch.index.Index;
2324

25+
import java.util.function.BiFunction;
26+
2427
/**
2528
* Deletes a single index.
2629
*/
2730
public class DeleteStep extends AsyncRetryDuringSnapshotActionStep {
31+
2832
public static final String NAME = "delete";
2933
private static final Logger logger = LogManager.getLogger(DeleteStep.class);
34+
private static final BiFunction<String, LifecycleExecutionState, String> DEFAULT_TARGET_INDEX_NAME_SUPPLIER = (
35+
indexName,
36+
lifecycleState) -> indexName;
37+
38+
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;
39+
private final boolean indexSurvives;
3040

41+
/**
42+
* Use this constructor to delete the index that ILM is currently operating on.
43+
*/
3144
public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
45+
this(key, nextStepKey, client, DEFAULT_TARGET_INDEX_NAME_SUPPLIER, false);
46+
}
47+
48+
/**
49+
* Use this constructor to delete a specific index, potentially different from the one that ILM is currently operating on.
50+
*/
51+
public DeleteStep(
52+
StepKey key,
53+
StepKey nextStepKey,
54+
Client client,
55+
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
56+
boolean indexSurvives
57+
) {
3258
super(key, nextStepKey, client);
59+
this.targetIndexNameSupplier = targetIndexNameSupplier;
60+
this.indexSurvives = indexSurvives;
3361
}
3462

3563
@Override
3664
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
3765
String policyName = indexMetadata.getLifecyclePolicyName();
38-
String indexName = indexMetadata.getIndex().getName();
66+
String indexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), indexMetadata.getLifecycleExecutionState());
3967
IndexAbstraction indexAbstraction = currentProject.getIndicesLookup().get(indexName);
4068
assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found";
4169
DataStream dataStream = indexAbstraction.getParentDataStream();
@@ -88,7 +116,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata
88116

89117
@Override
90118
public boolean indexSurvives() {
91-
return false;
119+
return indexSurvives;
92120
}
93121

94122
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public void performAction(
5151
ClusterStateObserver observer,
5252
ActionListener<Void> listener
5353
) {
54-
String indexName = indexMetadata.getIndex().getName();
54+
// Use the cloned index name if we have one, otherwise fall back to the original index name.
55+
String clonedIndexName = indexMetadata.getLifecycleExecutionState().forceMergeIndexName();
56+
String indexName = clonedIndexName != null ? clonedIndexName : indexMetadata.getIndex().getName();
5557
ForceMergeRequest request = new ForceMergeRequest(indexName);
5658
request.maxNumSegments(maxNumSegments);
5759
getClient(currentState.projectId()).admin().indices().forceMerge(request, listener.delegateFailureAndWrap((l, response) -> {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,12 @@ public ProjectState performAction(Index index, ProjectState projectState) {
7272
+ "] cannot continue until the repository is created or the policy is changed"
7373
);
7474
}
75+
// If we performed the force merge step on the cloned index, we perform the snapshot on that index instead of the original.
76+
final String clonedIndexName = lifecycleState.forceMergeIndexName();
77+
final String forceMergedIndexName = clonedIndexName != null ? clonedIndexName : index.getName();
7578

7679
LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState);
77-
newLifecycleState.setSnapshotIndexName(index.getName());
80+
newLifecycleState.setSnapshotIndexName(forceMergedIndexName);
7881
newLifecycleState.setSnapshotRepository(snapshotRepository);
7982
if (lifecycleState.snapshotName() == null) {
8083
// generate and validate the snapshotName

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/IndexLifecycleExplainResponse.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
5757
private static final ParseField SHRINK_INDEX_NAME = new ParseField("shrink_index_name");
5858
private static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name");
5959
private static final ParseField SKIP_NAME = new ParseField("skip");
60+
private static final ParseField FORCE_MERGE_INDEX_NAME = new ParseField("force_merge_index_name");
6061

6162
public static final ConstructingObjectParser<IndexLifecycleExplainResponse, Void> PARSER = new ConstructingObjectParser<>(
6263
"index_lifecycle_explain_response",
@@ -81,7 +82,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
8182
(BytesReference) a[11],
8283
(BytesReference) a[21],
8384
(PhaseExecutionInfo) a[12],
84-
Objects.requireNonNullElse((Boolean) a[22], false)
85+
Objects.requireNonNullElse((Boolean) a[22], false),
86+
(String) a[24]
8587
// a[13] == "age"
8688
// a[20] == "time_since_index_creation"
8789
// a[23] = "age_in_millis"
@@ -124,6 +126,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
124126
}, PREVIOUS_STEP_INFO_FIELD);
125127
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SKIP_NAME);
126128
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), AGE_IN_MILLIS_FIELD);
129+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX_NAME);
127130
}
128131

129132
private final String index;
@@ -147,6 +150,7 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
147150
private final String snapshotName;
148151
private final String shrinkIndexName;
149152
private final boolean skip;
153+
private final String forceMergeIndexName;
150154

151155
Supplier<Long> nowSupplier = System::currentTimeMillis; // Can be changed for testing
152156

@@ -170,7 +174,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse(
170174
BytesReference stepInfo,
171175
BytesReference previousStepInfo,
172176
PhaseExecutionInfo phaseExecutionInfo,
173-
boolean skip
177+
boolean skip,
178+
String forceMergeIndexName
174179
) {
175180
return new IndexLifecycleExplainResponse(
176181
index,
@@ -193,7 +198,8 @@ public static IndexLifecycleExplainResponse newManagedIndexResponse(
193198
stepInfo,
194199
previousStepInfo,
195200
phaseExecutionInfo,
196-
skip
201+
skip,
202+
forceMergeIndexName
197203
);
198204
}
199205

@@ -219,7 +225,8 @@ public static IndexLifecycleExplainResponse newUnmanagedIndexResponse(String ind
219225
null,
220226
null,
221227
null,
222-
false
228+
false,
229+
null
223230
);
224231
}
225232

@@ -244,7 +251,8 @@ private IndexLifecycleExplainResponse(
244251
BytesReference stepInfo,
245252
BytesReference previousStepInfo,
246253
PhaseExecutionInfo phaseExecutionInfo,
247-
boolean skip
254+
boolean skip,
255+
String forceMergeIndexName
248256
) {
249257
if (managedByILM) {
250258
if (policyName == null) {
@@ -313,6 +321,7 @@ private IndexLifecycleExplainResponse(
313321
this.snapshotName = snapshotName;
314322
this.shrinkIndexName = shrinkIndexName;
315323
this.skip = skip;
324+
this.forceMergeIndexName = forceMergeIndexName;
316325
}
317326

318327
public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
@@ -351,6 +360,8 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
351360
} else {
352361
skip = false;
353362
}
363+
// No need for serialization from this point onwards as this action only runs on the local node.
364+
forceMergeIndexName = null;
354365
} else {
355366
policyName = null;
356367
lifecycleDate = null;
@@ -371,6 +382,7 @@ public IndexLifecycleExplainResponse(StreamInput in) throws IOException {
371382
shrinkIndexName = null;
372383
indexCreationDate = null;
373384
skip = false;
385+
forceMergeIndexName = null;
374386
}
375387
}
376388

@@ -405,6 +417,7 @@ public void writeTo(StreamOutput out) throws IOException {
405417
|| out.getTransportVersion().onOrAfter(TransportVersions.ILM_ADD_SKIP_SETTING)) {
406418
out.writeBoolean(skip);
407419
}
420+
// No need for deserialization from this point onwards as this action only runs on the local node.
408421
}
409422
}
410423

@@ -508,6 +521,10 @@ public boolean getSkip() {
508521
return skip;
509522
}
510523

524+
public String getForceMergeIndexName() {
525+
return forceMergeIndexName;
526+
}
527+
511528
@Override
512529
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
513530
builder.startObject();
@@ -595,6 +612,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
595612
builder.field(PHASE_EXECUTION_INFO.getPreferredName(), phaseExecutionInfo);
596613
}
597614
builder.field(SKIP_NAME.getPreferredName(), skip);
615+
if (forceMergeIndexName != null) {
616+
builder.field(FORCE_MERGE_INDEX_NAME.getPreferredName(), forceMergeIndexName);
617+
}
598618
}
599619
builder.endObject();
600620
return builder;
@@ -623,7 +643,8 @@ public int hashCode() {
623643
stepInfo,
624644
previousStepInfo,
625645
phaseExecutionInfo,
626-
skip
646+
skip,
647+
forceMergeIndexName
627648
);
628649
}
629650

@@ -656,7 +677,8 @@ public boolean equals(Object obj) {
656677
&& Objects.equals(stepInfo, other.stepInfo)
657678
&& Objects.equals(previousStepInfo, other.previousStepInfo)
658679
&& Objects.equals(phaseExecutionInfo, other.phaseExecutionInfo)
659-
&& Objects.equals(skip, other.skip);
680+
&& Objects.equals(skip, other.skip)
681+
&& Objects.equals(forceMergeIndexName, other.forceMergeIndexName);
660682
}
661683

662684
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,14 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren
141141
logger.debug(
142142
"index [{}] using policy [{}] does not have a stored snapshot index name, "
143143
+ "using our best effort guess of [{}] for the original snapshotted index name",
144-
indexMetadata.getIndex().getName(),
144+
indexName,
145145
policyName,
146146
indexName
147147
);
148148
} else {
149149
indexName = searchableSnapshotMetadata.sourceIndex();
150150
}
151-
} else {
151+
} else if (snapshotIndexName.equals(indexName) == false) {
152152
// Use the name of the snapshot as specified in the metadata, because the current index
153153
// name not might not reflect the name of the index actually in the snapshot
154154
logger.debug(
@@ -158,6 +158,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata curren
158158
policyName,
159159
snapshotIndexName
160160
);
161+
// Note: this will update the indexName to the force-merged index name if we performed the force-merge on a cloned index.
161162
indexName = snapshotIndexName;
162163
}
163164

0 commit comments

Comments
 (0)