Skip to content

Commit c2bdc4c

Browse files
nielsbaumanmridula-s109
authored andcommitted
Make ILM AsyncActionStep project-aware (elastic#129632)
Updates all ILM async actions to operate on `ProjectState` and use a project client.
1 parent 720ed1e commit c2bdc4c

File tree

48 files changed

+587
-555
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+587
-555
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AbstractUnfollowIndexStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.internal.Client;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.cluster.ClusterStateObserver;
12+
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414

1515
import java.util.Map;
@@ -25,7 +25,7 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep {
2525
@Override
2626
public final void performAction(
2727
IndexMetadata indexMetadata,
28-
ClusterState currentClusterState,
28+
ProjectState currentState,
2929
ClusterStateObserver observer,
3030
ActionListener<Void> listener
3131
) {
@@ -36,8 +36,8 @@ public final void performAction(
3636
return;
3737
}
3838

39-
innerPerformAction(followerIndex, currentClusterState, listener);
39+
innerPerformAction(followerIndex, currentState, listener);
4040
}
4141

42-
abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Void> listener);
42+
abstract void innerPerformAction(String followerIndex, ProjectState currentState, ActionListener<Void> listener);
4343
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncActionStep.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.internal.Client;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.cluster.ClusterStateObserver;
12+
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
15+
import org.elasticsearch.core.Nullable;
1416

1517
/**
1618
* Performs an action which must be performed asynchronously because it may take time to complete.
@@ -24,17 +26,23 @@ public AsyncActionStep(StepKey key, StepKey nextStepKey, Client client) {
2426
this.client = client;
2527
}
2628

27-
protected Client getClient() {
29+
// For testing only
30+
@Nullable
31+
Client getClientWithoutProject() {
2832
return client;
2933
}
3034

35+
protected Client getClient(ProjectId projectId) {
36+
return client.projectClient(projectId);
37+
}
38+
3139
public boolean indexSurvives() {
3240
return true;
3341
}
3442

3543
public abstract void performAction(
3644
IndexMetadata indexMetadata,
37-
ClusterState currentClusterState,
45+
ProjectState currentState,
3846
ClusterStateObserver observer,
3947
ActionListener<Void> listener
4048
);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncBranchingStep.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
import org.apache.lucene.util.SetOnce;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.client.internal.Client;
13-
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.cluster.ClusterStateObserver;
14+
import org.elasticsearch.cluster.ProjectState;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
17+
import org.elasticsearch.common.TriConsumer;
1618

1719
import java.util.Objects;
18-
import java.util.function.BiConsumer;
1920

2021
/**
2122
* This step changes its {@link #getNextStepKey()} depending on the
@@ -26,14 +27,14 @@ public class AsyncBranchingStep extends AsyncActionStep {
2627

2728
private final StepKey nextStepKeyOnFalse;
2829
private final StepKey nextStepKeyOnTrue;
29-
private final BiConsumer<IndexMetadata, ActionListener<Boolean>> asyncPredicate;
30+
private final TriConsumer<ProjectId, IndexMetadata, ActionListener<Boolean>> asyncPredicate;
3031
private final SetOnce<Boolean> predicateValue;
3132

3233
public AsyncBranchingStep(
3334
StepKey key,
3435
StepKey nextStepKeyOnFalse,
3536
StepKey nextStepKeyOnTrue,
36-
BiConsumer<IndexMetadata, ActionListener<Boolean>> asyncPredicate,
37+
TriConsumer<ProjectId, IndexMetadata, ActionListener<Boolean>> asyncPredicate,
3738
Client client
3839
) {
3940
// super.nextStepKey is set to null since it is not used by this step
@@ -52,11 +53,11 @@ public boolean isRetryable() {
5253
@Override
5354
public void performAction(
5455
IndexMetadata indexMetadata,
55-
ClusterState currentClusterState,
56+
ProjectState currentState,
5657
ClusterStateObserver observer,
5758
ActionListener<Void> listener
5859
) {
59-
asyncPredicate.accept(indexMetadata, listener.safeMap(value -> {
60+
asyncPredicate.apply(currentState.projectId(), indexMetadata, listener.safeMap(value -> {
6061
predicateValue.set(value);
6162
return null;
6263
}));
@@ -87,7 +88,7 @@ final StepKey getNextStepKeyOnTrue() {
8788
/**
8889
* @return the next step if {@code predicate} is true
8990
*/
90-
final BiConsumer<IndexMetadata, ActionListener<Boolean>> getAsyncPredicate() {
91+
final TriConsumer<ProjectId, IndexMetadata, ActionListener<Boolean>> getAsyncPredicate() {
9192
return asyncPredicate;
9293
}
9394

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncRetryDuringSnapshotActionStep.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.ClusterStateObserver;
1616
import org.elasticsearch.cluster.NotMasterException;
17+
import org.elasticsearch.cluster.ProjectState;
1718
import org.elasticsearch.cluster.SnapshotsInProgress;
1819
import org.elasticsearch.cluster.metadata.IndexMetadata;
20+
import org.elasticsearch.cluster.metadata.ProjectId;
21+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1922
import org.elasticsearch.cluster.node.DiscoveryNode;
2023
import org.elasticsearch.core.TimeValue;
2124
import org.elasticsearch.index.Index;
@@ -39,44 +42,48 @@ public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Clie
3942
@Override
4043
public final void performAction(
4144
IndexMetadata indexMetadata,
42-
ClusterState currentClusterState,
45+
ProjectState currentState,
4346
ClusterStateObserver observer,
4447
ActionListener<Void> listener
4548
) {
4649
// Wrap the original listener to handle exceptions caused by ongoing snapshots
4750
SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(
51+
currentState.projectId(),
4852
indexMetadata.getIndex(),
4953
listener,
5054
observer,
51-
currentClusterState.nodes().getLocalNode()
55+
currentState.cluster().nodes().getLocalNode()
5256
);
53-
performDuringNoSnapshot(indexMetadata, currentClusterState, snapshotExceptionListener);
57+
performDuringNoSnapshot(indexMetadata, currentState.metadata(), snapshotExceptionListener);
5458
}
5559

5660
/**
5761
* Method to be performed during which no snapshots for the index are already underway.
5862
*/
59-
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener);
63+
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener);
6064

6165
/**
6266
* SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular
6367
* action failed due to a {@code SnapshotInProgressException}. If it did, then it registers a
6468
* ClusterStateObserver listener waiting for the next time the snapshot is not running,
65-
* re-running the step's {@link #performAction(IndexMetadata, ClusterState, ClusterStateObserver, ActionListener)}
69+
* re-running the step's {@link #performAction(IndexMetadata, ProjectState, ClusterStateObserver, ActionListener)}
6670
* method when the snapshot is no longer running.
6771
*/
6872
class SnapshotExceptionListener implements ActionListener<Void> {
73+
private final ProjectId projectId;
6974
private final Index index;
7075
private final ActionListener<Void> originalListener;
7176
private final ClusterStateObserver observer;
7277
private final DiscoveryNode localNode;
7378

7479
SnapshotExceptionListener(
80+
ProjectId projectId,
7581
Index index,
7682
ActionListener<Void> originalListener,
7783
ClusterStateObserver observer,
7884
DiscoveryNode localNode
7985
) {
86+
this.projectId = projectId;
8087
this.index = index;
8188
this.originalListener = originalListener;
8289
this.observer = observer;
@@ -106,13 +113,14 @@ public void onNewClusterState(ClusterState state) {
106113
}
107114
try {
108115
logger.debug("[{}] retrying ILM step after snapshot has completed", indexName);
109-
IndexMetadata idxMeta = state.metadata().getProject().index(index);
116+
final var projectState = state.projectState(projectId);
117+
IndexMetadata idxMeta = projectState.metadata().index(index);
110118
if (idxMeta == null) {
111119
// The index has since been deleted, mission accomplished!
112120
originalListener.onResponse(null);
113121
} else {
114122
// Re-invoke the performAction method with the new state
115-
performAction(idxMeta, state, observer, originalListener);
123+
performAction(idxMeta, projectState, observer, originalListener);
116124
}
117125
} catch (Exception e) {
118126
originalListener.onFailure(e);
@@ -133,7 +141,7 @@ public void onTimeout(TimeValue timeout) {
133141
// ILM actions should only run on master, lets bail on failover
134142
return true;
135143
}
136-
if (state.metadata().getProject().index(index) == null) {
144+
if (state.metadata().getProject(projectId).index(index) == null) {
137145
// The index has since been deleted, mission accomplished!
138146
return true;
139147
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupShrinkIndexStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1313
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.cluster.ClusterState;
1615
import org.elasticsearch.cluster.metadata.IndexMetadata;
1716
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
17+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1818
import org.elasticsearch.common.Strings;
1919
import org.elasticsearch.core.TimeValue;
2020
import org.elasticsearch.index.IndexNotFoundException;
@@ -36,11 +36,11 @@ public boolean isRetryable() {
3636
}
3737

3838
@Override
39-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
39+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
4040
final String shrunkenIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
4141
if (Strings.isNullOrEmpty(shrunkenIndexSource) == false) {
4242
// the current managed index is a shrunk index
43-
if (currentClusterState.metadata().getProject().index(shrunkenIndexSource) == null) {
43+
if (currentProject.index(shrunkenIndexSource) == null) {
4444
// if the source index does not exist, we'll skip deleting the
4545
// (managed) shrunk index as that will cause data loss
4646
String policyName = indexMetadata.getLifecyclePolicyName();
@@ -64,7 +64,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
6464
listener.onResponse(null);
6565
return;
6666
}
67-
getClient().admin()
67+
getClient(currentProject.id()).admin()
6868
.indices()
6969
.delete(new DeleteIndexRequest(shrinkIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() {
7070
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupSnapshotStep.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1111
import org.elasticsearch.client.internal.Client;
12-
import org.elasticsearch.cluster.ClusterState;
1312
import org.elasticsearch.cluster.metadata.IndexMetadata;
1413
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
14+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.core.TimeValue;
1717
import org.elasticsearch.repositories.RepositoryMissingException;
@@ -33,7 +33,7 @@ public boolean isRetryable() {
3333
}
3434

3535
@Override
36-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
36+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
3737
final String indexName = indexMetadata.getIndex().getName();
3838

3939
LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
@@ -48,7 +48,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
4848
listener.onResponse(null);
4949
return;
5050
}
51-
getClient().admin()
51+
getClient(currentProject.id()).admin()
5252
.cluster()
5353
.prepareDeleteSnapshot(TimeValue.MAX_VALUE, repositoryName, snapshotName)
5454
.execute(new ActionListener<>() {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CloseFollowerIndexStep.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1212
import org.elasticsearch.client.internal.Client;
13-
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1515
import org.elasticsearch.core.TimeValue;
1616

1717
import java.util.Map;
@@ -32,7 +32,7 @@ public boolean isRetryable() {
3232
}
3333

3434
@Override
35-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
35+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
3636
String followerIndex = indexMetadata.getIndex().getName();
3737
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
3838
if (customIndexMetadata == null) {
@@ -42,7 +42,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
4242

4343
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
4444
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
45-
getClient().admin().indices().close(closeIndexRequest, listener.delegateFailureAndWrap((l, r) -> {
45+
getClient(currentProject.id()).admin().indices().close(closeIndexRequest, listener.delegateFailureAndWrap((l, r) -> {
4646
if (r.isAcknowledged() == false) {
4747
throw new ElasticsearchException("close index request failed to be acknowledged");
4848
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CloseIndexStep.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1313
import org.elasticsearch.client.internal.Client;
14-
import org.elasticsearch.cluster.ClusterState;
1514
import org.elasticsearch.cluster.ClusterStateObserver;
15+
import org.elasticsearch.cluster.ProjectState;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
1717
import org.elasticsearch.core.TimeValue;
1818

@@ -30,18 +30,20 @@ public class CloseIndexStep extends AsyncActionStep {
3030
@Override
3131
public void performAction(
3232
IndexMetadata indexMetadata,
33-
ClusterState currentClusterState,
33+
ProjectState currentState,
3434
ClusterStateObserver observer,
3535
ActionListener<Void> listener
3636
) {
3737
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
3838
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE);
39-
getClient().admin().indices().close(request, listener.delegateFailureAndWrap((l, closeIndexResponse) -> {
40-
if (closeIndexResponse.isAcknowledged() == false) {
41-
throw new ElasticsearchException("close index request failed to be acknowledged");
42-
}
43-
l.onResponse(null);
44-
}));
39+
getClient(currentState.projectId()).admin()
40+
.indices()
41+
.close(request, listener.delegateFailureAndWrap((l, closeIndexResponse) -> {
42+
if (closeIndexResponse.isAcknowledged() == false) {
43+
throw new ElasticsearchException("close index request failed to be acknowledged");
44+
}
45+
l.onResponse(null);
46+
}));
4547
} else {
4648
listener.onResponse(null);
4749
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.cluster.ClusterState;
1615
import org.elasticsearch.cluster.metadata.IndexMetadata;
1716
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
18+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1819
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.core.TimeValue;
2021
import org.elasticsearch.snapshots.SnapshotInfo;
@@ -49,8 +50,8 @@ public boolean isRetryable() {
4950
}
5051

5152
@Override
52-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
53-
createSnapshot(indexMetadata, new ActionListener<>() {
53+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
54+
createSnapshot(currentProject.id(), indexMetadata, new ActionListener<>() {
5455
@Override
5556
public void onResponse(Boolean complete) {
5657
// based on the result of action we'll decide what the next step will be
@@ -77,7 +78,7 @@ public void onFailure(Exception e) {
7778
});
7879
}
7980

80-
void createSnapshot(IndexMetadata indexMetadata, ActionListener<Boolean> listener) {
81+
void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionListener<Boolean> listener) {
8182
final String indexName = indexMetadata.getIndex().getName();
8283

8384
final LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
@@ -107,7 +108,7 @@ void createSnapshot(IndexMetadata indexMetadata, ActionListener<Boolean> listene
107108
request.waitForCompletion(true);
108109
request.includeGlobalState(false);
109110

110-
getClient().admin().cluster().createSnapshot(request, listener.map(response -> {
111+
getClient(projectId).admin().cluster().createSnapshot(request, listener.map(response -> {
111112
logger.debug(
112113
"create snapshot response for policy [{}] and index [{}] is: {}",
113114
policyName,

0 commit comments

Comments
 (0)