Skip to content

Commit 0a54600

Browse files
committed
Make ILM AsyncWaitStep project-aware
1 parent 21d1c78 commit 0a54600

19 files changed

+261
-230
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
package org.elasticsearch.xpack.core.ilm;
88

99
import org.elasticsearch.client.internal.Client;
10-
import org.elasticsearch.cluster.metadata.Metadata;
10+
import org.elasticsearch.cluster.ProjectState;
1111
import org.elasticsearch.core.Nullable;
1212
import org.elasticsearch.core.TimeValue;
1313
import org.elasticsearch.index.Index;
@@ -34,7 +34,7 @@ protected Client getClient() {
3434
return client;
3535
}
3636

37-
public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout);
37+
public abstract void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout);
3838

3939
public interface Listener {
4040

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1515
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
1616
import org.elasticsearch.client.internal.Client;
17-
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.ProjectState;
1818
import org.elasticsearch.cluster.routing.ShardRouting;
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.core.TimeValue;
@@ -56,42 +56,45 @@ public int getMaxNumSegments() {
5656
}
5757

5858
@Override
59-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
60-
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
61-
IndexSegments idxSegments = response.getIndices().get(index.getName());
62-
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
63-
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
64-
logger.info(
65-
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
66-
index.getName(),
67-
response.getFailedShards(),
68-
failures == null
69-
? "n/a"
70-
: Strings.collectionToDelimitedString(Arrays.stream(failures).map(Strings::toString).toList(), ",")
71-
);
72-
listener.onResponse(true, new Info(-1));
73-
} else {
74-
List<ShardSegments> unmergedShards = idxSegments.getShards()
75-
.values()
76-
.stream()
77-
.flatMap(iss -> Arrays.stream(iss.shards()))
78-
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
79-
.toList();
80-
if (unmergedShards.size() > 0) {
81-
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
82-
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
59+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
60+
getClient().projectClient(state.projectId())
61+
.admin()
62+
.indices()
63+
.segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
64+
IndexSegments idxSegments = response.getIndices().get(index.getName());
65+
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
66+
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
8367
logger.info(
84-
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
68+
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
8569
index.getName(),
86-
maxNumSegments,
87-
unmergedShards.size(),
88-
unmergedShardCounts
70+
response.getFailedShards(),
71+
failures == null
72+
? "n/a"
73+
: Strings.collectionToDelimitedString(Arrays.stream(failures).map(Strings::toString).toList(), ",")
8974
);
75+
listener.onResponse(true, new Info(-1));
76+
} else {
77+
List<ShardSegments> unmergedShards = idxSegments.getShards()
78+
.values()
79+
.stream()
80+
.flatMap(iss -> Arrays.stream(iss.shards()))
81+
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
82+
.toList();
83+
if (unmergedShards.size() > 0) {
84+
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
85+
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
86+
logger.info(
87+
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
88+
index.getName(),
89+
maxNumSegments,
90+
unmergedShards.size(),
91+
unmergedShardCounts
92+
);
93+
}
94+
// Force merging is best effort, so always return true that the condition has been met.
95+
listener.onResponse(true, new Info(unmergedShards.size()));
9096
}
91-
// Force merging is best effort, so always return true that the condition has been met.
92-
listener.onResponse(true, new Info(unmergedShards.size()));
93-
}
94-
}, listener::onFailure));
97+
}, listener::onFailure));
9598
}
9699

97100
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.cluster.ProjectState;
1112
import org.elasticsearch.cluster.metadata.IndexMetadata;
12-
import org.elasticsearch.cluster.metadata.Metadata;
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.core.TimeValue;
1515
import org.elasticsearch.index.Index;
@@ -39,8 +39,8 @@ public boolean isRetryable() {
3939
}
4040

4141
@Override
42-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
43-
IndexMetadata indexMetadata = metadata.getProject().index(index);
42+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
43+
IndexMetadata indexMetadata = state.metadata().index(index);
4444
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
4545
if (customIndexMetadata == null) {
4646
listener.onResponse(true, null);
@@ -49,11 +49,8 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
4949

5050
FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
5151
request.setIndices(new String[] { index.getName() });
52-
getClient().execute(
53-
FollowStatsAction.INSTANCE,
54-
request,
55-
ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure)
56-
);
52+
getClient().projectClient(state.projectId())
53+
.execute(FollowStatsAction.INSTANCE, request, ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure));
5754
}
5855

5956
static void handleResponse(FollowStatsAction.StatsResponses responses, Listener listener) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
1515
import org.elasticsearch.action.admin.indices.stats.ShardStats;
1616
import org.elasticsearch.client.internal.Client;
17-
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.ProjectState;
18+
import org.elasticsearch.cluster.metadata.ProjectId;
1819
import org.elasticsearch.core.TimeValue;
1920
import org.elasticsearch.index.Index;
2021
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
@@ -52,26 +53,27 @@ public boolean isRetryable() {
5253
}
5354

5455
@Override
55-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
56+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
5657
XPackInfoRequest xPackInfoRequest = new XPackInfoRequest();
5758
xPackInfoRequest.setCategories(EnumSet.of(XPackInfoRequest.Category.FEATURES));
58-
getClient().execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
59-
XPackInfoResponse.FeatureSetsInfo.FeatureSet featureSet = xPackInfoResponse.getInfo();
60-
if (featureSet != null && featureSet.enabled() == false) {
61-
listener.onResponse(true, null);
62-
return;
63-
}
64-
leaderIndexCheck(metadata, index, listener, masterTimeout);
65-
}, listener::onFailure));
59+
getClient().projectClient(state.projectId())
60+
.execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
61+
XPackInfoResponse.FeatureSetsInfo.FeatureSet featureSet = xPackInfoResponse.getInfo();
62+
if (featureSet != null && featureSet.enabled() == false) {
63+
listener.onResponse(true, null);
64+
return;
65+
}
66+
leaderIndexCheck(state.projectId(), index, listener);
67+
}, listener::onFailure));
6668
}
6769

68-
private void leaderIndexCheck(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
70+
private void leaderIndexCheck(ProjectId projectId, Index index, Listener listener) {
6971
IndicesStatsRequest request = new IndicesStatsRequest();
7072
request.clear();
7173
String indexName = index.getName();
7274
request.indices(indexName);
7375

74-
getClient().admin().indices().stats(request, ActionListener.wrap((response) -> {
76+
getClient().projectClient(projectId).admin().indices().stats(request, ActionListener.wrap((response) -> {
7577
IndexStats indexStats = response.getIndex(indexName);
7678
if (indexStats == null) {
7779
// Index was probably deleted

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1515
import org.elasticsearch.action.support.IndexComponentSelector;
1616
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.ProjectState;
1718
import org.elasticsearch.cluster.metadata.DataStream;
1819
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1920
import org.elasticsearch.cluster.metadata.IndexMetadata;
2021
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
21-
import org.elasticsearch.cluster.metadata.Metadata;
2222
import org.elasticsearch.common.Strings;
2323
import org.elasticsearch.common.unit.ByteSizeValue;
2424
import org.elasticsearch.core.TimeValue;
@@ -80,8 +80,8 @@ public boolean isRetryable() {
8080
}
8181

8282
@Override
83-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
84-
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(index.getName());
83+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
84+
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(index.getName());
8585
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
8686
final String rolloverTarget;
8787
final boolean targetFailureStore;
@@ -95,14 +95,14 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
9595
index.getName(),
9696
targetFailureStore ? "failure store " : "",
9797
dataStream.getName(),
98-
metadata.getProject().index(index).getLifecyclePolicyName()
98+
state.metadata().index(index).getLifecyclePolicyName()
9999
);
100100
listener.onResponse(true, EmptyInfo.INSTANCE);
101101
return;
102102
}
103103
rolloverTarget = dataStream.getName();
104104
} else {
105-
IndexMetadata indexMetadata = metadata.getProject().index(index);
105+
IndexMetadata indexMetadata = state.metadata().index(index);
106106
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
107107

108108
if (Strings.isNullOrEmpty(rolloverAlias)) {
@@ -200,15 +200,17 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
200200

201201
// if we should only rollover if not empty, *and* if neither an explicit min_docs nor an explicit min_primary_shard_docs
202202
// has been specified on this policy, then inject a default min_docs: 1 condition so that we do not rollover empty indices
203-
boolean rolloverOnlyIfHasDocuments = LifecycleSettings.LIFECYCLE_ROLLOVER_ONLY_IF_HAS_DOCUMENTS_SETTING.get(metadata.settings());
203+
boolean rolloverOnlyIfHasDocuments = LifecycleSettings.LIFECYCLE_ROLLOVER_ONLY_IF_HAS_DOCUMENTS_SETTING.get(
204+
state.cluster().metadata().settings()
205+
);
204206
RolloverRequest rolloverRequest = createRolloverRequest(
205207
rolloverTarget,
206208
masterTimeout,
207209
rolloverOnlyIfHasDocuments,
208210
targetFailureStore
209211
);
210212

211-
getClient().admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> {
213+
getClient().projectClient(state.projectId()).admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> {
212214
final var conditionStatus = response.getConditionStatus();
213215
final var conditionsMet = rolloverRequest.getConditions().areConditionsMet(conditionStatus);
214216
if (conditionsMet) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForSnapshotStep.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
1313
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.cluster.ProjectState;
1415
import org.elasticsearch.cluster.metadata.IndexMetadata;
15-
import org.elasticsearch.cluster.metadata.Metadata;
1616
import org.elasticsearch.common.Strings;
1717
import org.elasticsearch.core.TimeValue;
1818
import org.elasticsearch.index.Index;
@@ -54,8 +54,8 @@ public class WaitForSnapshotStep extends AsyncWaitStep {
5454
}
5555

5656
@Override
57-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
58-
IndexMetadata indexMetadata = metadata.getProject().index(index);
57+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
58+
IndexMetadata indexMetadata = state.metadata().index(index);
5959
if (indexMetadata == null) {
6060
listener.onFailure(error(NO_INDEX_METADATA_MESSAGE, index.getName()));
6161
return;
@@ -68,7 +68,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
6868
return;
6969
}
7070

71-
SnapshotLifecycleMetadata snapMeta = metadata.getProject().custom(SnapshotLifecycleMetadata.TYPE);
71+
SnapshotLifecycleMetadata snapMeta = state.metadata().custom(SnapshotLifecycleMetadata.TYPE);
7272
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
7373
listener.onFailure(error(POLICY_NOT_FOUND_MESSAGE, policy));
7474
return;
@@ -108,7 +108,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
108108
.snapshots(new String[] { snapshotName })
109109
.includeIndexNames(true)
110110
.verbose(false);
111-
getClient().admin().cluster().getSnapshots(request, ActionListener.wrap(response -> {
111+
getClient().projectClient(state.projectId()).admin().cluster().getSnapshots(request, ActionListener.wrap(response -> {
112112
if (response.getSnapshots().size() != 1) {
113113
listener.onFailure(error(UNEXPECTED_SNAPSHOT_STATE_MESSAGE, repositoryName, snapshotName, response.getSnapshots().size()));
114114
} else {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilReplicateForTimePassesStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77

88
package org.elasticsearch.xpack.core.ilm;
99

10+
import org.elasticsearch.cluster.ProjectState;
1011
import org.elasticsearch.cluster.metadata.IndexMetadata;
1112
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
12-
import org.elasticsearch.cluster.metadata.Metadata;
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.core.TimeValue;
1515
import org.elasticsearch.index.Index;
@@ -66,12 +66,12 @@ public boolean equals(Object obj) {
6666
}
6767

6868
@Override
69-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
70-
IndexMetadata indexMetadata = metadata.getProject().index(index);
69+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
70+
IndexMetadata indexMetadata = state.metadata().index(index);
7171
assert indexMetadata != null
7272
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";
7373

74-
final LifecycleExecutionState executionState = metadata.getProject().index(index.getName()).getLifecycleExecutionState();
74+
final LifecycleExecutionState executionState = state.metadata().index(index.getName()).getLifecycleExecutionState();
7575
assert executionState != null
7676
: "the lifecycle execution state for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";
7777

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStep.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
*/
77
package org.elasticsearch.xpack.core.ilm;
88

9+
import org.elasticsearch.cluster.ProjectState;
910
import org.elasticsearch.cluster.metadata.IndexMetadata;
10-
import org.elasticsearch.cluster.metadata.Metadata;
1111
import org.elasticsearch.common.Strings;
1212
import org.elasticsearch.core.TimeValue;
1313
import org.elasticsearch.index.Index;
@@ -43,8 +43,8 @@ public boolean isRetryable() {
4343
}
4444

4545
@Override
46-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
47-
IndexMetadata indexMetadata = metadata.getProject().index(index);
46+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
47+
IndexMetadata indexMetadata = state.metadata().index(index);
4848
assert indexMetadata != null
4949
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";
5050

0 commit comments

Comments
 (0)