Skip to content

Commit 2b0853b

Browse files
committed
Make ILM AsyncActionStep project-aware
1 parent 21d1c78 commit 2b0853b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+473
-474
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AbstractUnfollowIndexStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.internal.Client;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.cluster.ClusterStateObserver;
12+
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414

1515
import java.util.Map;
@@ -25,7 +25,7 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep {
2525
@Override
2626
public final void performAction(
2727
IndexMetadata indexMetadata,
28-
ClusterState currentClusterState,
28+
ProjectState currentState,
2929
ClusterStateObserver observer,
3030
ActionListener<Void> listener
3131
) {
@@ -36,8 +36,8 @@ public final void performAction(
3636
return;
3737
}
3838

39-
innerPerformAction(followerIndex, currentClusterState, listener);
39+
innerPerformAction(followerIndex, currentState, listener);
4040
}
4141

42-
abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Void> listener);
42+
abstract void innerPerformAction(String followerIndex, ProjectState currentState, ActionListener<Void> listener);
4343
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncActionStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.internal.Client;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.cluster.ClusterStateObserver;
12+
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414

1515
/**
@@ -34,7 +34,7 @@ public boolean indexSurvives() {
3434

3535
public abstract void performAction(
3636
IndexMetadata indexMetadata,
37-
ClusterState currentClusterState,
37+
ProjectState currentState,
3838
ClusterStateObserver observer,
3939
ActionListener<Void> listener
4040
);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncBranchingStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import org.apache.lucene.util.SetOnce;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.client.internal.Client;
13-
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.cluster.ClusterStateObserver;
14+
import org.elasticsearch.cluster.ProjectState;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
1616

1717
import java.util.Objects;
@@ -52,7 +52,7 @@ public boolean isRetryable() {
5252
@Override
5353
public void performAction(
5454
IndexMetadata indexMetadata,
55-
ClusterState currentClusterState,
55+
ProjectState currentState,
5656
ClusterStateObserver observer,
5757
ActionListener<Void> listener
5858
) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncRetryDuringSnapshotActionStep.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.ClusterStateObserver;
1616
import org.elasticsearch.cluster.NotMasterException;
17+
import org.elasticsearch.cluster.ProjectState;
1718
import org.elasticsearch.cluster.SnapshotsInProgress;
1819
import org.elasticsearch.cluster.metadata.IndexMetadata;
20+
import org.elasticsearch.cluster.metadata.ProjectId;
21+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1922
import org.elasticsearch.cluster.node.DiscoveryNode;
2023
import org.elasticsearch.core.TimeValue;
2124
import org.elasticsearch.index.Index;
@@ -39,44 +42,48 @@ public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Clie
3942
@Override
4043
public final void performAction(
4144
IndexMetadata indexMetadata,
42-
ClusterState currentClusterState,
45+
ProjectState currentState,
4346
ClusterStateObserver observer,
4447
ActionListener<Void> listener
4548
) {
4649
// Wrap the original listener to handle exceptions caused by ongoing snapshots
4750
SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(
51+
currentState.projectId(),
4852
indexMetadata.getIndex(),
4953
listener,
5054
observer,
51-
currentClusterState.nodes().getLocalNode()
55+
currentState.cluster().nodes().getLocalNode()
5256
);
53-
performDuringNoSnapshot(indexMetadata, currentClusterState, snapshotExceptionListener);
57+
performDuringNoSnapshot(indexMetadata, currentState.metadata(), snapshotExceptionListener);
5458
}
5559

5660
/**
5761
* Method to be performed during which no snapshots for the index are already underway.
5862
*/
59-
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener);
63+
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener);
6064

6165
/**
6266
* SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular
6367
* action failed due to a {@code SnapshotInProgressException}. If it did, then it registers a
6468
* ClusterStateObserver listener waiting for the next time the snapshot is not running,
65-
* re-running the step's {@link #performAction(IndexMetadata, ClusterState, ClusterStateObserver, ActionListener)}
69+
* re-running the step's {@link #performAction(IndexMetadata, ProjectState, ClusterStateObserver, ActionListener)}
6670
* method when the snapshot is no longer running.
6771
*/
6872
class SnapshotExceptionListener implements ActionListener<Void> {
73+
private final ProjectId projectId;
6974
private final Index index;
7075
private final ActionListener<Void> originalListener;
7176
private final ClusterStateObserver observer;
7277
private final DiscoveryNode localNode;
7378

7479
SnapshotExceptionListener(
80+
ProjectId projectId,
7581
Index index,
7682
ActionListener<Void> originalListener,
7783
ClusterStateObserver observer,
7884
DiscoveryNode localNode
7985
) {
86+
this.projectId = projectId;
8087
this.index = index;
8188
this.originalListener = originalListener;
8289
this.observer = observer;
@@ -106,13 +113,14 @@ public void onNewClusterState(ClusterState state) {
106113
}
107114
try {
108115
logger.debug("[{}] retrying ILM step after snapshot has completed", indexName);
109-
IndexMetadata idxMeta = state.metadata().getProject().index(index);
116+
final var projectState = state.projectState(projectId);
117+
IndexMetadata idxMeta = projectState.metadata().index(index);
110118
if (idxMeta == null) {
111119
// The index has since been deleted, mission accomplished!
112120
originalListener.onResponse(null);
113121
} else {
114122
// Re-invoke the performAction method with the new state
115-
performAction(idxMeta, state, observer, originalListener);
123+
performAction(idxMeta, projectState, observer, originalListener);
116124
}
117125
} catch (Exception e) {
118126
originalListener.onFailure(e);
@@ -133,7 +141,7 @@ public void onTimeout(TimeValue timeout) {
133141
// ILM actions should only run on master, lets bail on failover
134142
return true;
135143
}
136-
if (state.metadata().getProject().index(index) == null) {
144+
if (state.metadata().getProject(projectId).index(index) == null) {
137145
// The index has since been deleted, mission accomplished!
138146
return true;
139147
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupShrinkIndexStep.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1313
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.cluster.ClusterState;
1615
import org.elasticsearch.cluster.metadata.IndexMetadata;
1716
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
17+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1818
import org.elasticsearch.common.Strings;
1919
import org.elasticsearch.core.TimeValue;
2020
import org.elasticsearch.index.IndexNotFoundException;
@@ -36,11 +36,11 @@ public boolean isRetryable() {
3636
}
3737

3838
@Override
39-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
39+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
4040
final String shrunkenIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
4141
if (Strings.isNullOrEmpty(shrunkenIndexSource) == false) {
4242
// the current managed index is a shrunk index
43-
if (currentClusterState.metadata().getProject().index(shrunkenIndexSource) == null) {
43+
if (currentProject.index(shrunkenIndexSource) == null) {
4444
// if the source index does not exist, we'll skip deleting the
4545
// (managed) shrunk index as that will cause data loss
4646
String policyName = indexMetadata.getLifecyclePolicyName();
@@ -64,7 +64,8 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
6464
listener.onResponse(null);
6565
return;
6666
}
67-
getClient().admin()
67+
getClient().projectClient(currentProject.id())
68+
.admin()
6869
.indices()
6970
.delete(new DeleteIndexRequest(shrinkIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() {
7071
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupSnapshotStep.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1111
import org.elasticsearch.client.internal.Client;
12-
import org.elasticsearch.cluster.ClusterState;
1312
import org.elasticsearch.cluster.metadata.IndexMetadata;
1413
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
14+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.core.TimeValue;
1717
import org.elasticsearch.repositories.RepositoryMissingException;
@@ -33,7 +33,7 @@ public boolean isRetryable() {
3333
}
3434

3535
@Override
36-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
36+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
3737
final String indexName = indexMetadata.getIndex().getName();
3838

3939
LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
@@ -48,7 +48,8 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
4848
listener.onResponse(null);
4949
return;
5050
}
51-
getClient().admin()
51+
getClient().projectClient(currentProject.id())
52+
.admin()
5253
.cluster()
5354
.prepareDeleteSnapshot(TimeValue.MAX_VALUE, repositoryName, snapshotName)
5455
.execute(new ActionListener<>() {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CloseFollowerIndexStep.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1212
import org.elasticsearch.client.internal.Client;
13-
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1515
import org.elasticsearch.core.TimeValue;
1616

1717
import java.util.Map;
@@ -32,7 +32,7 @@ public boolean isRetryable() {
3232
}
3333

3434
@Override
35-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
35+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
3636
String followerIndex = indexMetadata.getIndex().getName();
3737
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
3838
if (customIndexMetadata == null) {
@@ -42,12 +42,15 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
4242

4343
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
4444
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
45-
getClient().admin().indices().close(closeIndexRequest, listener.delegateFailureAndWrap((l, r) -> {
46-
if (r.isAcknowledged() == false) {
47-
throw new ElasticsearchException("close index request failed to be acknowledged");
48-
}
49-
l.onResponse(null);
50-
}));
45+
getClient().projectClient(currentProject.id())
46+
.admin()
47+
.indices()
48+
.close(closeIndexRequest, listener.delegateFailureAndWrap((l, r) -> {
49+
if (r.isAcknowledged() == false) {
50+
throw new ElasticsearchException("close index request failed to be acknowledged");
51+
}
52+
l.onResponse(null);
53+
}));
5154
} else {
5255
listener.onResponse(null);
5356
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CloseIndexStep.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1313
import org.elasticsearch.client.internal.Client;
14-
import org.elasticsearch.cluster.ClusterState;
1514
import org.elasticsearch.cluster.ClusterStateObserver;
15+
import org.elasticsearch.cluster.ProjectState;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
1717
import org.elasticsearch.core.TimeValue;
1818

@@ -30,18 +30,21 @@ public class CloseIndexStep extends AsyncActionStep {
3030
@Override
3131
public void performAction(
3232
IndexMetadata indexMetadata,
33-
ClusterState currentClusterState,
33+
ProjectState currentState,
3434
ClusterStateObserver observer,
3535
ActionListener<Void> listener
3636
) {
3737
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
3838
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE);
39-
getClient().admin().indices().close(request, listener.delegateFailureAndWrap((l, closeIndexResponse) -> {
40-
if (closeIndexResponse.isAcknowledged() == false) {
41-
throw new ElasticsearchException("close index request failed to be acknowledged");
42-
}
43-
l.onResponse(null);
44-
}));
39+
getClient().projectClient(currentState.projectId())
40+
.admin()
41+
.indices()
42+
.close(request, listener.delegateFailureAndWrap((l, closeIndexResponse) -> {
43+
if (closeIndexResponse.isAcknowledged() == false) {
44+
throw new ElasticsearchException("close index request failed to be acknowledged");
45+
}
46+
l.onResponse(null);
47+
}));
4548
} else {
4649
listener.onResponse(null);
4750
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStep.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.cluster.ClusterState;
1615
import org.elasticsearch.cluster.metadata.IndexMetadata;
1716
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
18+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1819
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.core.TimeValue;
2021
import org.elasticsearch.snapshots.SnapshotInfo;
@@ -49,8 +50,8 @@ public boolean isRetryable() {
4950
}
5051

5152
@Override
52-
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
53-
createSnapshot(indexMetadata, new ActionListener<>() {
53+
void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
54+
createSnapshot(currentProject.id(), indexMetadata, new ActionListener<>() {
5455
@Override
5556
public void onResponse(Boolean complete) {
5657
// based on the result of action we'll decide what the next step will be
@@ -77,7 +78,7 @@ public void onFailure(Exception e) {
7778
});
7879
}
7980

80-
void createSnapshot(IndexMetadata indexMetadata, ActionListener<Boolean> listener) {
81+
void createSnapshot(ProjectId projectId, IndexMetadata indexMetadata, ActionListener<Boolean> listener) {
8182
final String indexName = indexMetadata.getIndex().getName();
8283

8384
final LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
@@ -107,7 +108,7 @@ void createSnapshot(IndexMetadata indexMetadata, ActionListener<Boolean> listene
107108
request.waitForCompletion(true);
108109
request.includeGlobalState(false);
109110

110-
getClient().admin().cluster().createSnapshot(request, listener.map(response -> {
111+
getClient().projectClient(projectId).admin().cluster().createSnapshot(request, listener.map(response -> {
111112
logger.debug(
112113
"create snapshot response for policy [{}] and index [{}] is: {}",
113114
policyName,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteStep.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
1414
import org.elasticsearch.action.support.master.MasterNodeRequest;
1515
import org.elasticsearch.client.internal.Client;
16-
import org.elasticsearch.cluster.ClusterState;
1716
import org.elasticsearch.cluster.metadata.DataStream;
1817
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1918
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2020
import org.elasticsearch.common.Strings;
2121
import org.elasticsearch.core.TimeValue;
2222
import org.elasticsearch.index.Index;
@@ -33,10 +33,10 @@ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
3333
}
3434

3535
@Override
36-
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, ActionListener<Void> listener) {
36+
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ProjectMetadata currentProject, ActionListener<Void> listener) {
3737
String policyName = indexMetadata.getLifecyclePolicyName();
3838
String indexName = indexMetadata.getIndex().getName();
39-
IndexAbstraction indexAbstraction = currentState.metadata().getProject().getIndicesLookup().get(indexName);
39+
IndexAbstraction indexAbstraction = currentProject.getIndicesLookup().get(indexName);
4040
assert indexAbstraction != null : "invalid cluster metadata. index [" + indexName + "] was not found";
4141
DataStream dataStream = indexAbstraction.getParentDataStream();
4242

@@ -56,11 +56,12 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState cu
5656
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
5757
dataStream.getName()
5858
);
59-
getClient().execute(
60-
DeleteDataStreamAction.INSTANCE,
61-
deleteReq,
62-
listener.delegateFailureAndWrap((l, response) -> l.onResponse(null))
63-
);
59+
getClient().projectClient(currentProject.id())
60+
.execute(
61+
DeleteDataStreamAction.INSTANCE,
62+
deleteReq,
63+
listener.delegateFailureAndWrap((l, response) -> l.onResponse(null))
64+
);
6465
return;
6566
} else if (isFailureStoreWriteIndex || dataStream.getWriteIndex().getName().equals(indexName)) {
6667
String errorMessage = Strings.format(
@@ -78,7 +79,8 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState cu
7879
}
7980
}
8081

81-
getClient().admin()
82+
getClient().projectClient(currentProject.id())
83+
.admin()
8284
.indices()
8385
.delete(
8486
new DeleteIndexRequest(indexName).masterNodeTimeout(TimeValue.MAX_VALUE),

0 commit comments

Comments
 (0)