Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public ClusterState updatedState(Consumer<ProjectMetadata.Builder> projectBuilde
return ClusterState.builder(cluster).putProjectMetadata(projectBuilder).build();
}

/**
* Build a new {@link ClusterState} with the updated project.
*/
public ClusterState updatedState(ProjectMetadata updatedProject) {
return ClusterState.builder(cluster).putProjectMetadata(updatedProject).build();
}

/**
* Build a new {@link ProjectState} with the updated {@code project}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
nowWayBackInThePastSupplier,
indexLifecycleService.getPolicyRegistry(),
state -> {}
).execute(currentState);
).execute(currentState.projectState());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -84,22 +84,22 @@ Step.StepKey getNextStepKey() {
* @throws IOException if any exceptions occur
*/
@Override
public ClusterState doExecute(final ClusterState currentState) throws IOException {
public ClusterState doExecute(final ProjectState currentState) throws IOException {
Step currentStep = startStep;
IndexMetadata indexMetadata = currentState.metadata().getProject().index(index);
IndexMetadata indexMetadata = currentState.metadata().index(index);
if (indexMetadata == null) {
logger.debug("lifecycle for index [{}] executed but index no longer exists", index.getName());
// This index doesn't exist any more, there's nothing to execute currently
return currentState;
return currentState.cluster();
}
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetadata);
if (currentStep.equals(registeredCurrentStep) == false) {
// either we are no longer the master or the step is now
// not the same as when we submitted the update task. In
// either case we don't want to do anything now
return currentState;
return currentState.cluster();
}
ClusterState state = currentState;
ProjectState state = currentState;
// We can do cluster state steps all together until we
// either get to a step that isn't a cluster state step or a
// cluster state wait step returns not completed
Expand All @@ -114,7 +114,7 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
return moveToErrorStep(state, currentStep.getKey(), exception);
}
if (nextStepKey == null) {
return state;
return state.cluster();
} else {
state = moveToNextStep(state);
}
Expand All @@ -123,14 +123,14 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
// loop, if we are about to go into a new phase, return so that
// other processing can occur
if (currentStep.getKey().phase().equals(currentStep.getNextStepKey().phase()) == false) {
return state;
return state.cluster();
}
currentStep = policyStepsRegistry.getStep(indexMetadata, currentStep.getNextStepKey());
}
return state;
return state.cluster();
}

private ClusterState executeActionStep(ClusterState state, Step currentStep) {
private ProjectState executeActionStep(ProjectState state, Step currentStep) {
// cluster state action step so do the action and
// move the cluster state to the next step
logger.trace(
Expand All @@ -140,7 +140,7 @@ private ClusterState executeActionStep(ClusterState state, Step currentStep) {
currentStep.getKey()
);
ClusterStateActionStep actionStep = (ClusterStateActionStep) currentStep;
state = actionStep.performAction(index, state.projectState()).cluster();
state = actionStep.performAction(index, state);
// If this step (usually a CopyExecutionStateStep step) has brought the
// index to where it needs to have async actions invoked, then add that
// index to the list so that when the new cluster state has been
Expand All @@ -153,7 +153,7 @@ private ClusterState executeActionStep(ClusterState state, Step currentStep) {
return state;
}

private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
private ProjectState executeWaitStep(ProjectState state, Step currentStep) {
// cluster state wait step so evaluate the
// condition, if the condition is met move to the
// next step, if its not met return the current
Expand All @@ -166,7 +166,7 @@ private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
currentStep.getClass().getSimpleName(),
currentStep.getKey()
);
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state.projectState());
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
// some steps can decide to change the next step to execute after waiting for some time for the condition
// to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
// re-evaluate what the next step is after we evaluate the condition
Expand Down Expand Up @@ -198,35 +198,23 @@ private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
if (stepInfo == null) {
return state;
}
return ClusterState.builder(state)
.putProjectMetadata(IndexLifecycleTransition.addStepInfoToProject(index, state.metadata().getProject(), stepInfo))
.build();
return state.updateProject(IndexLifecycleTransition.addStepInfoToProject(index, state.metadata(), stepInfo));
}
}

private ClusterState moveToNextStep(ClusterState state) {
private ProjectState moveToNextStep(ProjectState state) {
if (nextStepKey == null) {
return state;
}
logger.trace("[{}] moving cluster state to next step [{}]", index.getName(), nextStepKey);
return ClusterState.builder(state)
.putProjectMetadata(
IndexLifecycleTransition.moveIndexToStep(
index,
state.metadata().getProject(),
nextStepKey,
nowSupplier,
policyStepsRegistry,
false
)
)
.build();
return state.updateProject(
IndexLifecycleTransition.moveIndexToStep(index, state.metadata(), nextStepKey, nowSupplier, policyStepsRegistry, false)
);
}

@Override
public void onClusterStateProcessed(ClusterState newState) {
final Metadata metadata = newState.metadata();
final IndexMetadata indexMetadata = metadata.getProject().index(index);
public void onClusterStateProcessed(ProjectState newState) {
final IndexMetadata indexMetadata = newState.metadata().index(index);
if (indexMetadata != null) {

LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
Expand All @@ -246,16 +234,16 @@ public void onClusterStateProcessed(ClusterState newState) {
// After the cluster state has been processed and we have moved
// to a new step, we need to conditionally execute the step iff
// it is an `AsyncAction` so that it is executed exactly once.
lifecycleRunner.maybeRunAsyncAction(newState, indexMetadata, policy, nextStepKey);
lifecycleRunner.maybeRunAsyncAction(newState.cluster(), indexMetadata, policy, nextStepKey);
}
}
assert indexToStepKeysForAsyncActions.size() <= 1 : "we expect a maximum of one single spawned index currently";
for (Map.Entry<String, Step.StepKey> indexAndStepKey : indexToStepKeysForAsyncActions.entrySet()) {
final String indexName = indexAndStepKey.getKey();
final Step.StepKey nextStep = indexAndStepKey.getValue();
final IndexMetadata indexMeta = metadata.getProject().index(indexName);
final IndexMetadata indexMeta = newState.metadata().index(indexName);
if (indexMeta != null) {
if (newState.metadata().getProject().isIndexManagedByILM(indexMeta)) {
if (newState.metadata().isIndexManagedByILM(indexMeta)) {
if (nextStep != null && nextStep != TerminalPolicyStep.KEY) {
logger.trace(
"[{}] index has been spawed from a different index's ({}) "
Expand All @@ -265,7 +253,7 @@ public void onClusterStateProcessed(ClusterState newState) {
nextStep
);
final String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMeta.getSettings());
lifecycleRunner.maybeRunAsyncAction(newState, indexMeta, policyName, nextStep);
lifecycleRunner.maybeRunAsyncAction(newState.cluster(), indexMeta, policyName, nextStep);
}
}
}
Expand All @@ -277,7 +265,7 @@ public void handleFailure(Exception e) {
logger.warn(() -> format("policy [%s] for index [%s] failed on step [%s].", policy, index, startStep.getKey()), e);
}

private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) {
private ClusterState moveToErrorStep(final ProjectState state, Step.StepKey currentStepKey, Exception cause) {
this.failure = cause;
logger.warn(
() -> format(
Expand All @@ -288,12 +276,9 @@ private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey curr
),
cause
);
final var project = state.metadata().getProject();
return ClusterState.builder(state)
.putProjectMetadata(
IndexLifecycleTransition.moveIndexToErrorStep(index, project, cause, nowSupplier, policyStepsRegistry::getStep)
)
.build();
return state.updatedState(
IndexLifecycleTransition.moveIndexToErrorStep(index, state.metadata(), cause, nowSupplier, policyStepsRegistry::getStep)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.Step;
Expand Down Expand Up @@ -42,18 +43,22 @@ final Step.StepKey getCurrentStepKey() {

private boolean executed;

public final ClusterState execute(ClusterState currentState) throws Exception {
/**
* Executes the task on the current project state. We return a cluster state instead of a project state because we're only interested
* in the resulting cluster state, so we avoid constructing a project state just to only access the cluster state.
*/
public final ClusterState execute(ProjectState currentState) throws Exception {
assert executed == false;
final ClusterState updatedState = doExecute(currentState);
if (currentState != updatedState) {
if (currentState.cluster() != updatedState) {
executed = true;
}
return updatedState;
}

protected abstract ClusterState doExecute(ClusterState currentState) throws Exception;
protected abstract ClusterState doExecute(ProjectState currentState) throws Exception;

public final void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
public final void clusterStateProcessed(ProjectState newState) {
listener.onResponse(null);
if (executed) {
onClusterStateProcessed(newState);
Expand All @@ -68,7 +73,7 @@ public final void onFailure(Exception e) {

/**
* Add a listener that is resolved once this update has been processed or failed and before either the
* {@link #onClusterStateProcessed(ClusterState)} or the {@link #handleFailure(Exception)} hooks are
* {@link #onClusterStateProcessed(ProjectState)} or the {@link #handleFailure(Exception)} hooks are
* executed.
*/
public final void addListener(ActionListener<Void> actionListener) {
Expand All @@ -78,10 +83,10 @@ public final void addListener(ActionListener<Void> actionListener) {
/**
* This method is functionally the same as {@link ClusterStateUpdateTask#clusterStateProcessed}
* and implementations can override it as they would override {@code ClusterStateUpdateTask#clusterStateProcessed}.
* The only difference to {@link ClusterStateUpdateTask#clusterStateProcessed} is that if the {@link #execute(ClusterState)}
* The only difference to {@link ClusterStateUpdateTask#clusterStateProcessed} is that if the {@link #execute(ProjectState)}
* implementation was a noop and returned the input cluster state, then this method will not be invoked.
*/
protected void onClusterStateProcessed(ClusterState newState) {}
protected void onClusterStateProcessed(ProjectState newState) {}

@Override
public abstract boolean equals(Object other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -67,12 +68,13 @@ public ClusterState execute(BatchExecutionContext<IndexLifecycleClusterStateUpda
for (final var taskContext : batchExecutionContext.taskContexts()) {
try {
final var task = taskContext.getTask();
// Retrieving the project ID and building the ProjectState inside the loop would be quite inefficient if it weren't
// for the fact that ILM will never run in multi-project mode - these methods shortcut in single-project mode.
final var projectId = batchExecutionContext.initialState().metadata().projectFor(task.getIndex()).id();
try (var ignored = taskContext.captureResponseHeaders()) {
state = task.execute(state);
state = task.execute(state.projectState(projectId));
}
taskContext.success(
publishedState -> task.clusterStateProcessed(batchExecutionContext.initialState(), publishedState)
);
taskContext.success(publishedState -> task.clusterStateProcessed(publishedState.projectState(projectId)));
} catch (Exception e) {
taskContext.onFailure(e);
}
Expand Down Expand Up @@ -477,11 +479,11 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey,
currentStepKey,
newStepKey
),
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> {
IndexMetadata indexMetadata = clusterState.metadata().getProject().index(index);
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
IndexMetadata indexMetadata = state.metadata().index(index);
registerSuccessfulOperation(indexMetadata);
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
maybeRunAsyncAction(clusterState, indexMetadata, policy, newStepKey);
maybeRunAsyncAction(state.cluster(), indexMetadata, policy, newStepKey);
}
})
);
Expand All @@ -497,8 +499,8 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
);
submitUnlessAlreadyQueued(
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
IndexMetadata indexMetadata = clusterState.metadata().getProject().index(index);
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
IndexMetadata indexMetadata = state.metadata().index(index);
registerFailedOperation(indexMetadata, e);
})
);
Expand Down Expand Up @@ -656,15 +658,15 @@ private final class MoveToRetryFailedStepUpdateTask extends IndexLifecycleCluste
}

@Override
protected ClusterState doExecute(ClusterState currentState) {
protected ClusterState doExecute(ProjectState currentState) {
final var updatedProject = IndexLifecycleTransition.moveIndexToPreviouslyFailedStep(
currentState.metadata().getProject(),
currentState.metadata(),
index.getName(),
nowSupplier,
stepRegistry,
true
);
return ClusterState.builder(currentState).putProjectMetadata(updatedProject).build();
return currentState.updatedState(updatedProject);
}

@Override
Expand Down Expand Up @@ -693,8 +695,8 @@ protected void handleFailure(Exception e) {
}

@Override
protected void onClusterStateProcessed(ClusterState newState) {
IndexMetadata newIndexMeta = newState.metadata().getProject().index(index);
protected void onClusterStateProcessed(ProjectState newState) {
IndexMetadata newIndexMeta = newState.metadata().index(index);
if (newIndexMeta == null) {
// index was deleted
return;
Expand All @@ -713,7 +715,7 @@ protected void onClusterStateProcessed(ClusterState newState) {
index,
stepKey
);
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
maybeRunAsyncAction(newState.cluster(), newIndexMeta, policy, stepKey);
}

}
Expand Down
Loading