Skip to content

Commit c744240

Browse files
authored
Make ILM AsyncWaitStep project-aware (#129397)
We pass a `ProjectState` to the `AsyncAwaitStep` classes and use the project client to make the steps run on a specific project.
1 parent ebd4138 commit c744240

18 files changed

+246
-211
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
package org.elasticsearch.xpack.core.ilm;
88

99
import org.elasticsearch.client.internal.Client;
10-
import org.elasticsearch.cluster.metadata.Metadata;
10+
import org.elasticsearch.cluster.ProjectState;
11+
import org.elasticsearch.cluster.metadata.ProjectId;
1112
import org.elasticsearch.core.Nullable;
1213
import org.elasticsearch.core.TimeValue;
1314
import org.elasticsearch.index.Index;
@@ -29,12 +30,17 @@ public AsyncWaitStep(StepKey key, StepKey nextStepKey, Client client) {
2930
this.client = client;
3031
}
3132

33+
// For testing only
3234
@Nullable
33-
protected Client getClient() {
35+
Client getClient() {
3436
return client;
3537
}
3638

37-
public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout);
39+
protected Client getClient(ProjectId projectId) {
40+
return client.projectClient(projectId);
41+
}
42+
43+
public abstract void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout);
3844

3945
public interface Listener {
4046

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1515
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
1616
import org.elasticsearch.client.internal.Client;
17-
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.ProjectState;
1818
import org.elasticsearch.cluster.routing.ShardRouting;
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.core.TimeValue;
@@ -56,42 +56,44 @@ public int getMaxNumSegments() {
5656
}
5757

5858
@Override
59-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
60-
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
61-
IndexSegments idxSegments = response.getIndices().get(index.getName());
62-
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
63-
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
64-
logger.info(
65-
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
66-
index.getName(),
67-
response.getFailedShards(),
68-
failures == null
69-
? "n/a"
70-
: Strings.collectionToDelimitedString(Arrays.stream(failures).map(Strings::toString).toList(), ",")
71-
);
72-
listener.onResponse(true, new Info(-1));
73-
} else {
74-
List<ShardSegments> unmergedShards = idxSegments.getShards()
75-
.values()
76-
.stream()
77-
.flatMap(iss -> Arrays.stream(iss.shards()))
78-
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
79-
.toList();
80-
if (unmergedShards.size() > 0) {
81-
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
82-
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
59+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
60+
getClient(state.projectId()).admin()
61+
.indices()
62+
.segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
63+
IndexSegments idxSegments = response.getIndices().get(index.getName());
64+
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
65+
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
8366
logger.info(
84-
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
67+
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
8568
index.getName(),
86-
maxNumSegments,
87-
unmergedShards.size(),
88-
unmergedShardCounts
69+
response.getFailedShards(),
70+
failures == null
71+
? "n/a"
72+
: Strings.collectionToDelimitedString(Arrays.stream(failures).map(Strings::toString).toList(), ",")
8973
);
74+
listener.onResponse(true, new Info(-1));
75+
} else {
76+
List<ShardSegments> unmergedShards = idxSegments.getShards()
77+
.values()
78+
.stream()
79+
.flatMap(iss -> Arrays.stream(iss.shards()))
80+
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
81+
.toList();
82+
if (unmergedShards.size() > 0) {
83+
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
84+
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
85+
logger.info(
86+
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
87+
index.getName(),
88+
maxNumSegments,
89+
unmergedShards.size(),
90+
unmergedShardCounts
91+
);
92+
}
93+
// Force merging is best effort, so always return true that the condition has been met.
94+
listener.onResponse(true, new Info(unmergedShards.size()));
9095
}
91-
// Force merging is best effort, so always return true that the condition has been met.
92-
listener.onResponse(true, new Info(unmergedShards.size()));
93-
}
94-
}, listener::onFailure));
96+
}, listener::onFailure));
9597
}
9698

9799
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.cluster.ProjectState;
1112
import org.elasticsearch.cluster.metadata.IndexMetadata;
12-
import org.elasticsearch.cluster.metadata.Metadata;
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.core.TimeValue;
1515
import org.elasticsearch.index.Index;
@@ -39,8 +39,8 @@ public boolean isRetryable() {
3939
}
4040

4141
@Override
42-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
43-
IndexMetadata indexMetadata = metadata.getProject().index(index);
42+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
43+
IndexMetadata indexMetadata = state.metadata().index(index);
4444
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
4545
if (customIndexMetadata == null) {
4646
listener.onResponse(true, null);
@@ -49,7 +49,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
4949

5050
FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
5151
request.setIndices(new String[] { index.getName() });
52-
getClient().execute(
52+
getClient(state.projectId()).execute(
5353
FollowStatsAction.INSTANCE,
5454
request,
5555
ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure)

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
1515
import org.elasticsearch.action.admin.indices.stats.ShardStats;
1616
import org.elasticsearch.client.internal.Client;
17-
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.ProjectState;
18+
import org.elasticsearch.cluster.metadata.ProjectId;
1819
import org.elasticsearch.core.TimeValue;
1920
import org.elasticsearch.index.Index;
2021
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
@@ -52,26 +53,26 @@ public boolean isRetryable() {
5253
}
5354

5455
@Override
55-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
56+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
5657
XPackInfoRequest xPackInfoRequest = new XPackInfoRequest();
5758
xPackInfoRequest.setCategories(EnumSet.of(XPackInfoRequest.Category.FEATURES));
58-
getClient().execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
59+
getClient(state.projectId()).execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
5960
XPackInfoResponse.FeatureSetsInfo.FeatureSet featureSet = xPackInfoResponse.getInfo();
6061
if (featureSet != null && featureSet.enabled() == false) {
6162
listener.onResponse(true, null);
6263
return;
6364
}
64-
leaderIndexCheck(metadata, index, listener, masterTimeout);
65+
leaderIndexCheck(state.projectId(), index, listener);
6566
}, listener::onFailure));
6667
}
6768

68-
private void leaderIndexCheck(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
69+
private void leaderIndexCheck(ProjectId projectId, Index index, Listener listener) {
6970
IndicesStatsRequest request = new IndicesStatsRequest();
7071
request.clear();
7172
String indexName = index.getName();
7273
request.indices(indexName);
7374

74-
getClient().admin().indices().stats(request, ActionListener.wrap((response) -> {
75+
getClient(projectId).admin().indices().stats(request, ActionListener.wrap((response) -> {
7576
IndexStats indexStats = response.getIndex(indexName);
7677
if (indexStats == null) {
7778
// Index was probably deleted

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1515
import org.elasticsearch.action.support.IndexComponentSelector;
1616
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.ProjectState;
1718
import org.elasticsearch.cluster.metadata.DataStream;
1819
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1920
import org.elasticsearch.cluster.metadata.IndexMetadata;
2021
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
21-
import org.elasticsearch.cluster.metadata.Metadata;
2222
import org.elasticsearch.common.Strings;
2323
import org.elasticsearch.common.unit.ByteSizeValue;
2424
import org.elasticsearch.core.TimeValue;
@@ -80,8 +80,8 @@ public boolean isRetryable() {
8080
}
8181

8282
@Override
83-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
84-
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(index.getName());
83+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
84+
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(index.getName());
8585
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
8686
final String rolloverTarget;
8787
final boolean targetFailureStore;
@@ -95,14 +95,14 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
9595
index.getName(),
9696
targetFailureStore ? "failure store " : "",
9797
dataStream.getName(),
98-
metadata.getProject().index(index).getLifecyclePolicyName()
98+
state.metadata().index(index).getLifecyclePolicyName()
9999
);
100100
listener.onResponse(true, EmptyInfo.INSTANCE);
101101
return;
102102
}
103103
rolloverTarget = dataStream.getName();
104104
} else {
105-
IndexMetadata indexMetadata = metadata.getProject().index(index);
105+
IndexMetadata indexMetadata = state.metadata().index(index);
106106
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
107107

108108
if (Strings.isNullOrEmpty(rolloverAlias)) {
@@ -200,15 +200,17 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
200200

201201
// if we should only rollover if not empty, *and* if neither an explicit min_docs nor an explicit min_primary_shard_docs
202202
// has been specified on this policy, then inject a default min_docs: 1 condition so that we do not rollover empty indices
203-
boolean rolloverOnlyIfHasDocuments = LifecycleSettings.LIFECYCLE_ROLLOVER_ONLY_IF_HAS_DOCUMENTS_SETTING.get(metadata.settings());
203+
boolean rolloverOnlyIfHasDocuments = LifecycleSettings.LIFECYCLE_ROLLOVER_ONLY_IF_HAS_DOCUMENTS_SETTING.get(
204+
state.cluster().metadata().settings()
205+
);
204206
RolloverRequest rolloverRequest = createRolloverRequest(
205207
rolloverTarget,
206208
masterTimeout,
207209
rolloverOnlyIfHasDocuments,
208210
targetFailureStore
209211
);
210212

211-
getClient().admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> {
213+
getClient(state.projectId()).admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> {
212214
final var conditionStatus = response.getConditionStatus();
213215
final var conditionsMet = rolloverRequest.getConditions().areConditionsMet(conditionStatus);
214216
if (conditionsMet) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForSnapshotStep.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
1313
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.cluster.ProjectState;
1415
import org.elasticsearch.cluster.metadata.IndexMetadata;
15-
import org.elasticsearch.cluster.metadata.Metadata;
1616
import org.elasticsearch.common.Strings;
1717
import org.elasticsearch.core.TimeValue;
1818
import org.elasticsearch.index.Index;
@@ -54,8 +54,8 @@ public class WaitForSnapshotStep extends AsyncWaitStep {
5454
}
5555

5656
@Override
57-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
58-
IndexMetadata indexMetadata = metadata.getProject().index(index);
57+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
58+
IndexMetadata indexMetadata = state.metadata().index(index);
5959
if (indexMetadata == null) {
6060
listener.onFailure(error(NO_INDEX_METADATA_MESSAGE, index.getName()));
6161
return;
@@ -68,7 +68,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
6868
return;
6969
}
7070

71-
SnapshotLifecycleMetadata snapMeta = metadata.getProject().custom(SnapshotLifecycleMetadata.TYPE);
71+
SnapshotLifecycleMetadata snapMeta = state.metadata().custom(SnapshotLifecycleMetadata.TYPE);
7272
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
7373
listener.onFailure(error(POLICY_NOT_FOUND_MESSAGE, policy));
7474
return;
@@ -108,7 +108,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
108108
.snapshots(new String[] { snapshotName })
109109
.includeIndexNames(true)
110110
.verbose(false);
111-
getClient().admin().cluster().getSnapshots(request, ActionListener.wrap(response -> {
111+
getClient(state.projectId()).admin().cluster().getSnapshots(request, ActionListener.wrap(response -> {
112112
if (response.getSnapshots().size() != 1) {
113113
listener.onFailure(error(UNEXPECTED_SNAPSHOT_STATE_MESSAGE, repositoryName, snapshotName, response.getSnapshots().size()));
114114
} else {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilReplicateForTimePassesStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77

88
package org.elasticsearch.xpack.core.ilm;
99

10+
import org.elasticsearch.cluster.ProjectState;
1011
import org.elasticsearch.cluster.metadata.IndexMetadata;
1112
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
12-
import org.elasticsearch.cluster.metadata.Metadata;
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.core.TimeValue;
1515
import org.elasticsearch.index.Index;
@@ -66,12 +66,12 @@ public boolean equals(Object obj) {
6666
}
6767

6868
@Override
69-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
70-
IndexMetadata indexMetadata = metadata.getProject().index(index);
69+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
70+
IndexMetadata indexMetadata = state.metadata().index(index);
7171
assert indexMetadata != null
7272
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";
7373

74-
final LifecycleExecutionState executionState = metadata.getProject().index(index.getName()).getLifecycleExecutionState();
74+
final LifecycleExecutionState executionState = state.metadata().index(index.getName()).getLifecycleExecutionState();
7575
assert executionState != null
7676
: "the lifecycle execution state for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";
7777

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStep.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
*/
77
package org.elasticsearch.xpack.core.ilm;
88

9+
import org.elasticsearch.cluster.ProjectState;
910
import org.elasticsearch.cluster.metadata.IndexMetadata;
10-
import org.elasticsearch.cluster.metadata.Metadata;
1111
import org.elasticsearch.common.Strings;
1212
import org.elasticsearch.core.TimeValue;
1313
import org.elasticsearch.index.Index;
@@ -43,8 +43,8 @@ public boolean isRetryable() {
4343
}
4444

4545
@Override
46-
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
47-
IndexMetadata indexMetadata = metadata.getProject().index(index);
46+
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
47+
IndexMetadata indexMetadata = state.metadata().index(index);
4848
assert indexMetadata != null
4949
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";
5050

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public void setupClient() {
4040
adminClient = Mockito.mock(AdminClient.class);
4141
indicesClient = Mockito.mock(IndicesAdminClient.class);
4242

43+
Mockito.when(client.projectClient(Mockito.any())).thenReturn(client);
4344
Mockito.when(client.admin()).thenReturn(adminClient);
4445
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
4546
}

0 commit comments

Comments
 (0)