Skip to content

Commit ecee8b1

Browse files
authored
Make IndexLifecycleClusterStateUpdateTask project-aware (#129366)
Updates the abstract class and its subclasses to be able to handle multiple projects.
1 parent d384e2f commit ecee8b1

File tree

12 files changed

+227
-233
lines changed

12 files changed

+227
-233
lines changed

server/src/main/java/org/elasticsearch/cluster/ProjectState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ public ClusterState updatedState(Consumer<ProjectMetadata.Builder> projectBuilde
7676
return ClusterState.builder(cluster).putProjectMetadata(projectBuilder).build();
7777
}
7878

79+
/**
80+
* Build a new {@link ClusterState} with the updated project.
81+
*/
82+
public ClusterState updatedState(ProjectMetadata updatedProject) {
83+
return ClusterState.builder(cluster).putProjectMetadata(updatedProject).build();
84+
}
85+
7986
/**
8087
* Build a new {@link ProjectState} with the updated {@code project}.
8188
*/

x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ClusterStateWaitThresholdBreachTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
149149
nowWayBackInThePastSupplier,
150150
indexLifecycleService.getPolicyRegistry(),
151151
state -> {}
152-
).execute(currentState);
152+
).execute(currentState.projectState());
153153
}
154154

155155
@Override

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.cluster.ClusterState;
1313
import org.elasticsearch.cluster.ClusterStateUpdateTask;
14+
import org.elasticsearch.cluster.ProjectState;
1415
import org.elasticsearch.cluster.metadata.IndexMetadata;
1516
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
16-
import org.elasticsearch.cluster.metadata.Metadata;
1717
import org.elasticsearch.common.Strings;
1818
import org.elasticsearch.index.Index;
1919
import org.elasticsearch.xcontent.ToXContentObject;
@@ -84,22 +84,22 @@ Step.StepKey getNextStepKey() {
8484
* @throws IOException if any exceptions occur
8585
*/
8686
@Override
87-
public ClusterState doExecute(final ClusterState currentState) throws IOException {
87+
public ClusterState doExecute(final ProjectState currentState) throws IOException {
8888
Step currentStep = startStep;
89-
IndexMetadata indexMetadata = currentState.metadata().getProject().index(index);
89+
IndexMetadata indexMetadata = currentState.metadata().index(index);
9090
if (indexMetadata == null) {
9191
logger.debug("lifecycle for index [{}] executed but index no longer exists", index.getName());
9292
// This index doesn't exist any more, there's nothing to execute currently
93-
return currentState;
93+
return currentState.cluster();
9494
}
9595
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetadata);
9696
if (currentStep.equals(registeredCurrentStep) == false) {
9797
// either we are no longer the master or the step is now
9898
// not the same as when we submitted the update task. In
9999
// either case we don't want to do anything now
100-
return currentState;
100+
return currentState.cluster();
101101
}
102-
ClusterState state = currentState;
102+
ProjectState state = currentState;
103103
// We can do cluster state steps all together until we
104104
// either get to a step that isn't a cluster state step or a
105105
// cluster state wait step returns not completed
@@ -114,7 +114,7 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
114114
return moveToErrorStep(state, currentStep.getKey(), exception);
115115
}
116116
if (nextStepKey == null) {
117-
return state;
117+
return state.cluster();
118118
} else {
119119
state = moveToNextStep(state);
120120
}
@@ -123,14 +123,14 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
123123
// loop, if we are about to go into a new phase, return so that
124124
// other processing can occur
125125
if (currentStep.getKey().phase().equals(currentStep.getNextStepKey().phase()) == false) {
126-
return state;
126+
return state.cluster();
127127
}
128128
currentStep = policyStepsRegistry.getStep(indexMetadata, currentStep.getNextStepKey());
129129
}
130-
return state;
130+
return state.cluster();
131131
}
132132

133-
private ClusterState executeActionStep(ClusterState state, Step currentStep) {
133+
private ProjectState executeActionStep(ProjectState state, Step currentStep) {
134134
// cluster state action step so do the action and
135135
// move the cluster state to the next step
136136
logger.trace(
@@ -140,7 +140,7 @@ private ClusterState executeActionStep(ClusterState state, Step currentStep) {
140140
currentStep.getKey()
141141
);
142142
ClusterStateActionStep actionStep = (ClusterStateActionStep) currentStep;
143-
state = actionStep.performAction(index, state.projectState()).cluster();
143+
state = actionStep.performAction(index, state);
144144
// If this step (usually a CopyExecutionStateStep step) has brought the
145145
// index to where it needs to have async actions invoked, then add that
146146
// index to the list so that when the new cluster state has been
@@ -153,7 +153,7 @@ private ClusterState executeActionStep(ClusterState state, Step currentStep) {
153153
return state;
154154
}
155155

156-
private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
156+
private ProjectState executeWaitStep(ProjectState state, Step currentStep) {
157157
// cluster state wait step so evaluate the
158158
// condition, if the condition is met move to the
159159
// next step, if its not met return the current
@@ -166,7 +166,7 @@ private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
166166
currentStep.getClass().getSimpleName(),
167167
currentStep.getKey()
168168
);
169-
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state.projectState());
169+
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
170170
// some steps can decide to change the next step to execute after waiting for some time for the condition
171171
// to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
172172
// re-evaluate what the next step is after we evaluate the condition
@@ -198,35 +198,23 @@ private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
198198
if (stepInfo == null) {
199199
return state;
200200
}
201-
return ClusterState.builder(state)
202-
.putProjectMetadata(IndexLifecycleTransition.addStepInfoToProject(index, state.metadata().getProject(), stepInfo))
203-
.build();
201+
return state.updateProject(IndexLifecycleTransition.addStepInfoToProject(index, state.metadata(), stepInfo));
204202
}
205203
}
206204

207-
private ClusterState moveToNextStep(ClusterState state) {
205+
private ProjectState moveToNextStep(ProjectState state) {
208206
if (nextStepKey == null) {
209207
return state;
210208
}
211209
logger.trace("[{}] moving cluster state to next step [{}]", index.getName(), nextStepKey);
212-
return ClusterState.builder(state)
213-
.putProjectMetadata(
214-
IndexLifecycleTransition.moveIndexToStep(
215-
index,
216-
state.metadata().getProject(),
217-
nextStepKey,
218-
nowSupplier,
219-
policyStepsRegistry,
220-
false
221-
)
222-
)
223-
.build();
210+
return state.updateProject(
211+
IndexLifecycleTransition.moveIndexToStep(index, state.metadata(), nextStepKey, nowSupplier, policyStepsRegistry, false)
212+
);
224213
}
225214

226215
@Override
227-
public void onClusterStateProcessed(ClusterState newState) {
228-
final Metadata metadata = newState.metadata();
229-
final IndexMetadata indexMetadata = metadata.getProject().index(index);
216+
public void onClusterStateProcessed(ProjectState newState) {
217+
final IndexMetadata indexMetadata = newState.metadata().index(index);
230218
if (indexMetadata != null) {
231219

232220
LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
@@ -246,16 +234,16 @@ public void onClusterStateProcessed(ClusterState newState) {
246234
// After the cluster state has been processed and we have moved
247235
// to a new step, we need to conditionally execute the step iff
248236
// it is an `AsyncAction` so that it is executed exactly once.
249-
lifecycleRunner.maybeRunAsyncAction(newState, indexMetadata, policy, nextStepKey);
237+
lifecycleRunner.maybeRunAsyncAction(newState.cluster(), indexMetadata, policy, nextStepKey);
250238
}
251239
}
252240
assert indexToStepKeysForAsyncActions.size() <= 1 : "we expect a maximum of one single spawned index currently";
253241
for (Map.Entry<String, Step.StepKey> indexAndStepKey : indexToStepKeysForAsyncActions.entrySet()) {
254242
final String indexName = indexAndStepKey.getKey();
255243
final Step.StepKey nextStep = indexAndStepKey.getValue();
256-
final IndexMetadata indexMeta = metadata.getProject().index(indexName);
244+
final IndexMetadata indexMeta = newState.metadata().index(indexName);
257245
if (indexMeta != null) {
258-
if (newState.metadata().getProject().isIndexManagedByILM(indexMeta)) {
246+
if (newState.metadata().isIndexManagedByILM(indexMeta)) {
259247
if (nextStep != null && nextStep != TerminalPolicyStep.KEY) {
260248
logger.trace(
261249
"[{}] index has been spawed from a different index's ({}) "
@@ -265,7 +253,7 @@ public void onClusterStateProcessed(ClusterState newState) {
265253
nextStep
266254
);
267255
final String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMeta.getSettings());
268-
lifecycleRunner.maybeRunAsyncAction(newState, indexMeta, policyName, nextStep);
256+
lifecycleRunner.maybeRunAsyncAction(newState.cluster(), indexMeta, policyName, nextStep);
269257
}
270258
}
271259
}
@@ -277,7 +265,7 @@ public void handleFailure(Exception e) {
277265
logger.warn(() -> format("policy [%s] for index [%s] failed on step [%s].", policy, index, startStep.getKey()), e);
278266
}
279267

280-
private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) {
268+
private ClusterState moveToErrorStep(final ProjectState state, Step.StepKey currentStepKey, Exception cause) {
281269
this.failure = cause;
282270
logger.warn(
283271
() -> format(
@@ -288,12 +276,9 @@ private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey curr
288276
),
289277
cause
290278
);
291-
final var project = state.metadata().getProject();
292-
return ClusterState.builder(state)
293-
.putProjectMetadata(
294-
IndexLifecycleTransition.moveIndexToErrorStep(index, project, cause, nowSupplier, policyStepsRegistry::getStep)
295-
)
296-
.build();
279+
return state.updatedState(
280+
IndexLifecycleTransition.moveIndexToErrorStep(index, state.metadata(), cause, nowSupplier, policyStepsRegistry::getStep)
281+
);
297282
}
298283

299284
@Override

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.cluster.ClusterState;
1212
import org.elasticsearch.cluster.ClusterStateTaskListener;
1313
import org.elasticsearch.cluster.ClusterStateUpdateTask;
14+
import org.elasticsearch.cluster.ProjectState;
1415
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1516
import org.elasticsearch.index.Index;
1617
import org.elasticsearch.xpack.core.ilm.Step;
@@ -42,18 +43,22 @@ final Step.StepKey getCurrentStepKey() {
4243

4344
private boolean executed;
4445

45-
public final ClusterState execute(ClusterState currentState) throws Exception {
46+
/**
47+
* Executes the task on the current project state. We return a cluster state instead of a project state because we're only interested
48+
* in the resulting cluster state, so we avoid constructing a project state just to only access the cluster state.
49+
*/
50+
public final ClusterState execute(ProjectState currentState) throws Exception {
4651
assert executed == false;
4752
final ClusterState updatedState = doExecute(currentState);
48-
if (currentState != updatedState) {
53+
if (currentState.cluster() != updatedState) {
4954
executed = true;
5055
}
5156
return updatedState;
5257
}
5358

54-
protected abstract ClusterState doExecute(ClusterState currentState) throws Exception;
59+
protected abstract ClusterState doExecute(ProjectState currentState) throws Exception;
5560

56-
public final void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
61+
public final void clusterStateProcessed(ProjectState newState) {
5762
listener.onResponse(null);
5863
if (executed) {
5964
onClusterStateProcessed(newState);
@@ -68,7 +73,7 @@ public final void onFailure(Exception e) {
6873

6974
/**
7075
* Add a listener that is resolved once this update has been processed or failed and before either the
71-
* {@link #onClusterStateProcessed(ClusterState)} or the {@link #handleFailure(Exception)} hooks are
76+
* {@link #onClusterStateProcessed(ProjectState)} or the {@link #handleFailure(Exception)} hooks are
7277
* executed.
7378
*/
7479
public final void addListener(ActionListener<Void> actionListener) {
@@ -78,10 +83,10 @@ public final void addListener(ActionListener<Void> actionListener) {
7883
/**
7984
* This method is functionally the same as {@link ClusterStateUpdateTask#clusterStateProcessed}
8085
* and implementations can override it as they would override {@code ClusterStateUpdateTask#clusterStateProcessed}.
81-
* The only difference to {@link ClusterStateUpdateTask#clusterStateProcessed} is that if the {@link #execute(ClusterState)}
86+
* The only difference to {@link ClusterStateUpdateTask#clusterStateProcessed} is that if the {@link #execute(ProjectState)}
8287
* implementation was a noop and returned the input cluster state, then this method will not be invoked.
8388
*/
84-
protected void onClusterStateProcessed(ClusterState newState) {}
89+
protected void onClusterStateProcessed(ProjectState newState) {}
8590

8691
@Override
8792
public abstract boolean equals(Object other);

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.ClusterState;
1313
import org.elasticsearch.cluster.ClusterStateObserver;
1414
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
15+
import org.elasticsearch.cluster.ProjectState;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
1617
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
1718
import org.elasticsearch.cluster.metadata.Metadata;
@@ -67,12 +68,13 @@ public ClusterState execute(BatchExecutionContext<IndexLifecycleClusterStateUpda
6768
for (final var taskContext : batchExecutionContext.taskContexts()) {
6869
try {
6970
final var task = taskContext.getTask();
71+
// Retrieving the project ID and building the ProjectState inside the loop would be quite inefficient if it weren't
72+
// for the fact that ILM will never run in multi-project mode - these methods shortcut in single-project mode.
73+
final var projectId = batchExecutionContext.initialState().metadata().projectFor(task.getIndex()).id();
7074
try (var ignored = taskContext.captureResponseHeaders()) {
71-
state = task.execute(state);
75+
state = task.execute(state.projectState(projectId));
7276
}
73-
taskContext.success(
74-
publishedState -> task.clusterStateProcessed(batchExecutionContext.initialState(), publishedState)
75-
);
77+
taskContext.success(publishedState -> task.clusterStateProcessed(publishedState.projectState(projectId)));
7678
} catch (Exception e) {
7779
taskContext.onFailure(e);
7880
}
@@ -477,11 +479,11 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey,
477479
currentStepKey,
478480
newStepKey
479481
),
480-
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> {
481-
IndexMetadata indexMetadata = clusterState.metadata().getProject().index(index);
482+
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
483+
IndexMetadata indexMetadata = state.metadata().index(index);
482484
registerSuccessfulOperation(indexMetadata);
483485
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
484-
maybeRunAsyncAction(clusterState, indexMetadata, policy, newStepKey);
486+
maybeRunAsyncAction(state.cluster(), indexMetadata, policy, newStepKey);
485487
}
486488
})
487489
);
@@ -497,8 +499,8 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
497499
);
498500
submitUnlessAlreadyQueued(
499501
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
500-
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
501-
IndexMetadata indexMetadata = clusterState.metadata().getProject().index(index);
502+
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
503+
IndexMetadata indexMetadata = state.metadata().index(index);
502504
registerFailedOperation(indexMetadata, e);
503505
})
504506
);
@@ -656,15 +658,15 @@ private final class MoveToRetryFailedStepUpdateTask extends IndexLifecycleCluste
656658
}
657659

658660
@Override
659-
protected ClusterState doExecute(ClusterState currentState) {
661+
protected ClusterState doExecute(ProjectState currentState) {
660662
final var updatedProject = IndexLifecycleTransition.moveIndexToPreviouslyFailedStep(
661-
currentState.metadata().getProject(),
663+
currentState.metadata(),
662664
index.getName(),
663665
nowSupplier,
664666
stepRegistry,
665667
true
666668
);
667-
return ClusterState.builder(currentState).putProjectMetadata(updatedProject).build();
669+
return currentState.updatedState(updatedProject);
668670
}
669671

670672
@Override
@@ -693,8 +695,8 @@ protected void handleFailure(Exception e) {
693695
}
694696

695697
@Override
696-
protected void onClusterStateProcessed(ClusterState newState) {
697-
IndexMetadata newIndexMeta = newState.metadata().getProject().index(index);
698+
protected void onClusterStateProcessed(ProjectState newState) {
699+
IndexMetadata newIndexMeta = newState.metadata().index(index);
698700
if (newIndexMeta == null) {
699701
// index was deleted
700702
return;
@@ -713,7 +715,7 @@ protected void onClusterStateProcessed(ClusterState newState) {
713715
index,
714716
stepKey
715717
);
716-
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
718+
maybeRunAsyncAction(newState.cluster(), newIndexMeta, policy, stepKey);
717719
}
718720

719721
}

0 commit comments

Comments
 (0)