Skip to content

Commit f82222f

Browse files
committed
Force merge clone WIP
1 parent c76bd8e commit f82222f

File tree

13 files changed

+651
-203
lines changed

13 files changed

+651
-203
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/LifecycleExecutionState.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public record LifecycleExecutionState(
4040
String snapshotName,
4141
String shrinkIndexName,
4242
String snapshotIndexName,
43-
String downsampleIndexName
43+
String downsampleIndexName,
44+
String forceMergeIndexName
4445
) {
4546

4647
public static final String ILM_CUSTOM_METADATA_KEY = "ilm";
@@ -64,6 +65,7 @@ public record LifecycleExecutionState(
6465
private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
6566
private static final String SHRINK_INDEX_NAME = "shrink_index_name";
6667
private static final String DOWNSAMPLE_INDEX_NAME = "rollup_index_name";
68+
private static final String FORCE_MERGE_INDEX_NAME = "force_merge_index_name";
6769

6870
public static final LifecycleExecutionState EMPTY_STATE = LifecycleExecutionState.builder().build();
6971

@@ -89,7 +91,8 @@ public static Builder builder(LifecycleExecutionState state) {
8991
.setShrinkIndexName(state.shrinkIndexName)
9092
.setSnapshotIndexName(state.snapshotIndexName)
9193
.setDownsampleIndexName(state.downsampleIndexName)
92-
.setStepTime(state.stepTime);
94+
.setStepTime(state.stepTime)
95+
.setForceMergeIndexName(state.forceMergeIndexName);
9396
}
9497

9598
public static LifecycleExecutionState fromCustomMetadata(Map<String, String> customData) {
@@ -202,6 +205,10 @@ public static LifecycleExecutionState fromCustomMetadata(Map<String, String> cus
202205
if (downsampleIndexName != null) {
203206
builder.setDownsampleIndexName(downsampleIndexName);
204207
}
208+
String forceMergeIndexName = customData.get(FORCE_MERGE_INDEX_NAME);
209+
if (forceMergeIndexName != null) {
210+
builder.setForceMergeIndexName(forceMergeIndexName);
211+
}
205212
return builder.build();
206213
}
207214

@@ -274,6 +281,9 @@ public Map<String, String> asMap() {
274281
if (downsampleIndexName != null) {
275282
result.put(DOWNSAMPLE_INDEX_NAME, downsampleIndexName);
276283
}
284+
if (forceMergeIndexName != null) {
285+
result.put(FORCE_MERGE_INDEX_NAME, forceMergeIndexName);
286+
}
277287
return Collections.unmodifiableMap(result);
278288
}
279289

@@ -307,6 +317,7 @@ public static class Builder {
307317
private String shrinkIndexName;
308318
private String snapshotIndexName;
309319
private String downsampleIndexName;
320+
private String forceMergeIndexName;
310321

311322
public Builder setPhase(String phase) {
312323
this.phase = phase;
@@ -398,6 +409,11 @@ public Builder setDownsampleIndexName(String downsampleIndexName) {
398409
return this;
399410
}
400411

412+
public Builder setForceMergeIndexName(String forceMergeIndexName) {
413+
this.forceMergeIndexName = forceMergeIndexName;
414+
return this;
415+
}
416+
401417
public LifecycleExecutionState build() {
402418
return new LifecycleExecutionState(
403419
phase,
@@ -417,7 +433,8 @@ public LifecycleExecutionState build() {
417433
snapshotName,
418434
shrinkIndexName,
419435
snapshotIndexName,
420-
downsampleIndexName
436+
downsampleIndexName,
437+
forceMergeIndexName
421438
);
422439
}
423440
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.core.ilm;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
17+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.index.IndexNotFoundException;
21+
22+
/**
23+
* Deletes the index identified by the clone index name stored in the lifecycle state of the managed index (if any was generated)
24+
* TODO: should we just generalize CleanupShrinkIndexStep to CleanupIndexStep and use it for both shrink and clone?
25+
*/
26+
public class CleanupClonedIndexStep extends AsyncRetryDuringSnapshotActionStep {
27+
public static final String NAME = "cleanup-cloned-index";
28+
private static final Logger logger = LogManager.getLogger(CleanupClonedIndexStep.class);
29+
30+
public CleanupClonedIndexStep(StepKey key, StepKey nextStepKey, Client client) {
31+
super(key, nextStepKey, client);
32+
}
33+
34+
@Override
35+
public boolean isRetryable() {
36+
return true;
37+
}
38+
39+
@Override
40+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
41+
final String clonedIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
42+
if (Strings.isNullOrEmpty(clonedIndexSource) == false) {
43+
// the current managed index is a cloned index
44+
if (currentProject.index(clonedIndexSource) == null) {
45+
// if the source index does not exist, we'll skip deleting the
46+
// (managed) cloned index as that will cause data loss
47+
String policyName = indexMetadata.getLifecyclePolicyName();
48+
logger.warn(
49+
"managed index [{}] as part of policy [{}] is a cloned index and the source index [{}] does not exist "
50+
+ "anymore. will skip the [{}] step",
51+
indexMetadata.getIndex().getName(),
52+
policyName,
53+
clonedIndexSource,
54+
NAME
55+
);
56+
listener.onResponse(null);
57+
return;
58+
}
59+
}
60+
61+
LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
62+
final String cloneIndexName = lifecycleState.forceMergeIndexName();
63+
// if the clone index was not generated there is nothing to delete so we move on
64+
if (Strings.hasText(cloneIndexName) == false) {
65+
listener.onResponse(null);
66+
return;
67+
}
68+
getClient(currentProject.id()).admin()
69+
.indices()
70+
.delete(new DeleteIndexRequest(cloneIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() {
71+
@Override
72+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
73+
// even if not all nodes acked the delete request yet we can consider this operation as successful as
74+
// we'll generate a new index name and attempt to clone into the newly generated name
75+
listener.onResponse(null);
76+
}
77+
78+
@Override
79+
public void onFailure(Exception e) {
80+
if (e instanceof IndexNotFoundException) {
81+
// we can move on if the index was deleted in the meantime
82+
listener.onResponse(null);
83+
} else {
84+
listener.onFailure(e);
85+
}
86+
}
87+
});
88+
}
89+
90+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.core.ilm;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
13+
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
14+
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.cluster.ClusterStateObserver;
16+
import org.elasticsearch.cluster.ProjectState;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.core.TimeValue;
21+
22+
import java.util.Objects;
23+
import java.util.function.BiFunction;
24+
25+
/**
26+
* Clones the index with the specified settings, using the name that was generated in a previous {@link GenerateUniqueIndexNameStep} step.
27+
*/
28+
public class CloneIndexStep extends AsyncActionStep {
29+
30+
public static final String NAME = "clone-index";
31+
private static final Logger logger = LogManager.getLogger(CloneIndexStep.class);
32+
33+
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;
34+
private final Settings targetIndexSettings;
35+
36+
public CloneIndexStep(
37+
StepKey key,
38+
StepKey nextStepKey,
39+
Client client,
40+
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
41+
Settings targetIndexSettings
42+
) {
43+
super(key, nextStepKey, client);
44+
this.targetIndexNameSupplier = targetIndexNameSupplier;
45+
this.targetIndexSettings = targetIndexSettings;
46+
}
47+
48+
@Override
49+
public boolean isRetryable() {
50+
return true;
51+
}
52+
53+
@Override
54+
public void performAction(
55+
IndexMetadata indexMetadata,
56+
ProjectState currentState,
57+
ClusterStateObserver observer,
58+
ActionListener<Void> listener
59+
) {
60+
final String targetIndexName = targetIndexNameSupplier.apply(
61+
indexMetadata.getIndex().getName(),
62+
indexMetadata.getLifecycleExecutionState()
63+
);
64+
if (currentState.metadata().index(targetIndexName) != null) {
65+
logger.warn(
66+
"skipping [{}] step for index [{}] as part of policy [{}] as the target index [{}] already exists",
67+
CloneIndexStep.NAME,
68+
indexMetadata.getIndex().getName(),
69+
indexMetadata.getLifecyclePolicyName(),
70+
targetIndexName
71+
);
72+
listener.onResponse(null);
73+
return;
74+
}
75+
76+
Settings relevantTargetSettings = Settings.builder()
77+
.put(targetIndexSettings)
78+
// We add the skip setting to prevent ILM from processing the cloned index before the execution state has been copied - which
79+
// could happen if the shards of the cloned index take a long time to allocate.
80+
.put(LifecycleSettings.LIFECYCLE_SKIP, true)
81+
.build();
82+
ResizeRequest resizeRequest = new ResizeRequest(targetIndexName, indexMetadata.getIndex().getName()).masterNodeTimeout(
83+
TimeValue.MAX_VALUE
84+
);
85+
resizeRequest.setResizeType(ResizeType.CLONE);
86+
resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);
87+
88+
getClient(currentState.projectId()).admin()
89+
.indices()
90+
.resizeIndex(resizeRequest, listener.delegateFailureAndWrap((l, response) -> l.onResponse(null)));
91+
}
92+
93+
@Override
94+
public int hashCode() {
95+
return Objects.hash(super.hashCode(), targetIndexSettings);
96+
}
97+
98+
@Override
99+
public boolean equals(Object obj) {
100+
if (obj == null) {
101+
return false;
102+
}
103+
if (getClass() != obj.getClass()) {
104+
return false;
105+
}
106+
CloneIndexStep other = (CloneIndexStep) obj;
107+
return super.equals(obj) && Objects.equals(targetIndexSettings, other.targetIndexSettings);
108+
}
109+
110+
}

0 commit comments

Comments
 (0)