diff --git a/muted-tests.yml b/muted-tests.yml index bb31605b3fcc5..078c207f45757 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -547,21 +547,9 @@ tests: - class: org.elasticsearch.xpack.profiling.action.GetStatusActionIT method: testWaitsUntilResourcesAreCreated issue: https://github.com/elastic/elasticsearch/issues/129486 -- class: org.elasticsearch.xpack.ilm.TimeseriesMoveToStepIT - method: testMoveToRolloverStep - issue: https://github.com/elastic/elasticsearch/issues/129489 -- class: org.elasticsearch.xpack.ilm.TimeseriesMoveToStepIT - method: testMoveToAllocateStep - issue: https://github.com/elastic/elasticsearch/issues/129490 -- class: org.elasticsearch.xpack.ilm.actions.ShrinkActionIT - method: testShrinkDuringSnapshot - issue: https://github.com/elastic/elasticsearch/issues/129491 - class: org.elasticsearch.xpack.security.PermissionsIT method: testWhenUserLimitedByOnlyAliasOfIndexCanWriteToIndexWhichWasRolledoverByILMPolicy issue: https://github.com/elastic/elasticsearch/issues/129481 -- class: org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT - method: testFullPolicy - issue: https://github.com/elastic/elasticsearch/issues/129510 - class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT method: test {knn-function.KnnSearchWithKOption SYNC} issue: https://github.com/elastic/elasticsearch/issues/129512 diff --git a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ClusterStateWaitThresholdBreachTests.java b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ClusterStateWaitThresholdBreachTests.java index fc3bfd437e5c3..82879e28249ba 100644 --- a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ClusterStateWaitThresholdBreachTests.java +++ b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ClusterStateWaitThresholdBreachTests.java @@ -141,7 +141,9 @@ public void testWaitInShrunkShardsAllocatedExceedsThreshold() throws Exception { clusterService.submitUnbatchedStateUpdateTask("testing-move-to-step-to-manipulate-step-time", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { + final var projectState = currentState.projectState(); return new MoveToNextStepUpdateTask( + projectState.projectId(), managedIndexMetadata.getIndex(), policy, currentStepKey, @@ -149,7 +151,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { nowWayBackInThePastSupplier, indexLifecycleService.getPolicyRegistry(), state -> {} - ).execute(currentState.projectState()); + ).execute(projectState); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java index fc40d352483f0..1fa870286b148 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.index.Index; import org.elasticsearch.xcontent.ToXContentObject; @@ -45,6 +46,7 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask private Exception failure = null; public ExecuteStepsUpdateTask( + ProjectId projectId, String policy, Index index, Step startStep, @@ -52,7 +54,7 @@ public ExecuteStepsUpdateTask( IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier ) { - super(index, startStep.getKey()); + super(projectId, index, startStep.getKey()); this.policy = policy; this.startStep = startStep; this.policyStepsRegistry = policyStepsRegistry; @@ -234,7 +236,7 @@ public void onClusterStateProcessed(ProjectState newState) { // After the cluster state has been processed and we have moved // to a new step, we need to conditionally execute the step iff // it is an `AsyncAction` so that it is executed exactly once. - lifecycleRunner.maybeRunAsyncAction(newState.cluster(), indexMetadata, policy, nextStepKey); + lifecycleRunner.maybeRunAsyncAction(newState, indexMetadata, policy, nextStepKey); } } assert indexToStepKeysForAsyncActions.size() <= 1 : "we expect a maximum of one single spawned index currently"; @@ -253,7 +255,7 @@ public void onClusterStateProcessed(ProjectState newState) { nextStep ); final String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMeta.getSettings()); - lifecycleRunner.maybeRunAsyncAction(newState.cluster(), indexMeta, policyName, nextStep); + lifecycleRunner.maybeRunAsyncAction(newState, indexMeta, policyName, nextStep); } } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java index 3a024027f6aaa..a2f531997a92a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleClusterStateUpdateTask.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.Step; @@ -24,11 +25,13 @@ public abstract class IndexLifecycleClusterStateUpdateTask implements ClusterSta private final ListenableFuture listener = new ListenableFuture<>(); + /** We need to store the project ID along with the index because an index might get deleted, but we still want to run the step */ + protected final ProjectId projectId; protected final Index index; - protected final Step.StepKey currentStepKey; - protected IndexLifecycleClusterStateUpdateTask(Index index, Step.StepKey currentStepKey) { + protected IndexLifecycleClusterStateUpdateTask(ProjectId projectId, Index index, Step.StepKey currentStepKey) { + this.projectId = projectId; this.index = index; this.currentStepKey = currentStepKey; } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 6e7b19ecf32dc..e6b9fb86a4f70 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -15,7 +15,7 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -68,13 +68,10 @@ public ClusterState execute(BatchExecutionContext task.clusterStateProcessed(publishedState.projectState(projectId))); + taskContext.success(publishedState -> task.clusterStateProcessed(publishedState.projectState(task.projectId))); } catch (Exception e) { taskContext.onFailure(e); } @@ -177,7 +174,7 @@ boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetadata * Run the current step, only if it is an asynchronous wait step. These * wait criteria are checked periodically from the ILM scheduler */ - void runPeriodicStep(String policy, Metadata metadata, IndexMetadata indexMetadata) { + void runPeriodicStep(ProjectState state, String policy, IndexMetadata indexMetadata) { String index = indexMetadata.getIndex().getName(); if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) { logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP); @@ -188,13 +185,13 @@ void runPeriodicStep(String policy, Metadata metadata, IndexMetadata indexMetada try { currentStep = getCurrentStep(stepRegistry, policy, indexMetadata, lifecycleState); } catch (Exception e) { - markPolicyRetrievalError(policy, indexMetadata.getIndex(), lifecycleState, e); + markPolicyRetrievalError(state.projectId(), policy, indexMetadata.getIndex(), lifecycleState, e); return; } if (currentStep == null) { if (stepRegistry.policyExists(policy) == false) { - markPolicyDoesNotExist(policy, indexMetadata.getIndex(), lifecycleState); + markPolicyDoesNotExist(state.projectId(), policy, indexMetadata.getIndex(), lifecycleState); return; } else { Step.StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); @@ -213,7 +210,7 @@ void runPeriodicStep(String policy, Metadata metadata, IndexMetadata indexMetada logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index); return; } else if (currentStep instanceof ErrorStep) { - onErrorMaybeRetryFailedStep(policy, currentStep.getKey(), indexMetadata); + onErrorMaybeRetryFailedStep(state.projectId(), policy, currentStep.getKey(), indexMetadata); return; } @@ -235,25 +232,26 @@ void runPeriodicStep(String policy, Metadata metadata, IndexMetadata indexMetada } // Only proceed to the next step if enough time has elapsed to go into the next phase if (isReadyToTransitionToThisPhase(policy, indexMetadata, currentStep.getNextStepKey().phase())) { - moveToStep(indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + moveToStep(state.projectId(), indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } } else if (currentStep instanceof AsyncWaitStep) { logger.debug("[{}] running periodic policy with current-step [{}]", index, currentStep.getKey()); + final var metadata = state.cluster().metadata(); ((AsyncWaitStep) currentStep).evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject stepInfo) { logger.trace("cs-change-async-wait-callback, [{}] current-step: {}", index, currentStep.getKey()); if (conditionMet) { - moveToStep(indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + moveToStep(state.projectId(), indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } else if (stepInfo != null) { - setStepInfo(indexMetadata.getIndex(), policy, currentStep.getKey(), stepInfo); + setStepInfo(state.projectId(), indexMetadata.getIndex(), policy, currentStep.getKey(), stepInfo); } } @Override public void onFailure(Exception e) { - moveToErrorStep(indexMetadata.getIndex(), policy, currentStep.getKey(), e); + moveToErrorStep(state.projectId(), indexMetadata.getIndex(), policy, currentStep.getKey(), e); } }, TimeValue.MAX_VALUE); } else { @@ -266,7 +264,7 @@ public void onFailure(Exception e) { * execution state to the previously failed step, incrementing the retry * counter. */ - void onErrorMaybeRetryFailedStep(String policy, StepKey currentStep, IndexMetadata indexMetadata) { + void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey currentStep, IndexMetadata indexMetadata) { String index = indexMetadata.getIndex().getName(); LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState(); Step failedStep = stepRegistry.getStep( @@ -298,7 +296,7 @@ void onErrorMaybeRetryFailedStep(String policy, StepKey currentStep, IndexMetada // to move it back into the failed step, so we'll try again submitUnlessAlreadyQueued( Strings.format("ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}", policy, index, failedStep.getKey()), - new MoveToRetryFailedStepUpdateTask(indexMetadata.getIndex(), policy, currentStep, failedStep) + new MoveToRetryFailedStepUpdateTask(projectId, indexMetadata.getIndex(), policy, currentStep, failedStep) ); } else { logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index); @@ -308,7 +306,8 @@ void onErrorMaybeRetryFailedStep(String policy, StepKey currentStep, IndexMetada /** * If the current step (matching the expected step key) is an asynchronous action step, run it */ - void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) { + void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) { + final var projectId = state.projectId(); String index = indexMetadata.getIndex().getName(); if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) { logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP); @@ -319,7 +318,7 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata, try { currentStep = getCurrentStep(stepRegistry, policy, indexMetadata, lifecycleState); } catch (Exception e) { - markPolicyRetrievalError(policy, indexMetadata.getIndex(), lifecycleState, e); + markPolicyRetrievalError(projectId, policy, indexMetadata.getIndex(), lifecycleState, e); return; } if (currentStep == null) { @@ -360,7 +359,7 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata, logger.debug("[{}] running policy with async action step [{}]", index, currentStep.getKey()); ((AsyncActionStep) currentStep).performAction( indexMetadata, - currentState, + state.cluster(), new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new ActionListener<>() { @@ -368,7 +367,7 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata, public void onResponse(Void unused) { logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); if (((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + moveToStep(projectId, indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } else { // Delete needs special handling, because after this step we // will no longer have access to any information about the @@ -379,7 +378,7 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) { - moveToErrorStep(indexMetadata.getIndex(), policy, currentStep.getKey(), e); + moveToErrorStep(projectId, indexMetadata.getIndex(), policy, currentStep.getKey(), e); } } ); @@ -392,7 +391,7 @@ public void onFailure(Exception e) { * Run the current step that either waits for index age, or updates/waits-on cluster state. * Invoked after the cluster state has been changed */ - void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) { + void runPolicyAfterStateChange(ProjectId projectId, String policy, IndexMetadata indexMetadata) { String index = indexMetadata.getIndex().getName(); if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) { logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP); @@ -408,12 +407,12 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) { try { currentStep = getCurrentStep(stepRegistry, policy, indexMetadata, lifecycleState); } catch (Exception e) { - markPolicyRetrievalError(policy, indexMetadata.getIndex(), lifecycleState, e); + markPolicyRetrievalError(projectId, policy, indexMetadata.getIndex(), lifecycleState, e); return; } if (currentStep == null) { if (stepRegistry.policyExists(policy) == false) { - markPolicyDoesNotExist(policy, indexMetadata.getIndex(), lifecycleState); + markPolicyDoesNotExist(projectId, policy, indexMetadata.getIndex(), lifecycleState); return; } else { if (TerminalPolicyStep.KEY.equals(currentStepKey)) { @@ -452,13 +451,13 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) { } // Only proceed to the next step if enough time has elapsed to go into the next phase if (isReadyToTransitionToThisPhase(policy, indexMetadata, currentStep.getNextStepKey().phase())) { - moveToStep(indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + moveToStep(projectId, indexMetadata.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } } else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey()); submitUnlessAlreadyQueued( Strings.format("ilm-execute-cluster-state-steps [%s]", currentStep), - new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier) + new ExecuteStepsUpdateTask(projectId, policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier) ); } else { logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey()); @@ -469,7 +468,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) { * Move the index to the given {@code newStepKey}, always checks to ensure that the index's * current step matches the {@code currentStepKey} prior to changing the state. */ - private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) { + private void moveToStep(ProjectId projectId, Index index, String policy, StepKey currentStepKey, StepKey newStepKey) { logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey); submitUnlessAlreadyQueued( Strings.format( @@ -479,11 +478,11 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, currentStepKey, newStepKey ), - new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> { + new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> { IndexMetadata indexMetadata = state.metadata().index(index); registerSuccessfulOperation(indexMetadata); if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) { - maybeRunAsyncAction(state.cluster(), indexMetadata, policy, newStepKey); + maybeRunAsyncAction(state, indexMetadata, policy, newStepKey); } }) ); @@ -492,14 +491,14 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, /** * Move the index to the ERROR step. */ - private void moveToErrorStep(Index index, String policy, Step.StepKey currentStepKey, Exception e) { + private void moveToErrorStep(ProjectId projectId, Index index, String policy, StepKey currentStepKey, Exception e) { logger.error( () -> format("policy [%s] for index [%s] failed on step [%s]. Moving to ERROR step", policy, index.getName(), currentStepKey), e ); submitUnlessAlreadyQueued( Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey), - new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> { + new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> { IndexMetadata indexMetadata = state.metadata().index(index); registerFailedOperation(indexMetadata, e); }) @@ -510,18 +509,19 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte * Set step info for the given index inside of its {@link LifecycleExecutionState} without * changing other execution state. */ - private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) { + private void setStepInfo(ProjectId projectId, Index index, String policy, @Nullable StepKey currentStepKey, ToXContentObject stepInfo) { submitUnlessAlreadyQueued( Strings.format("ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey), - new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo) + new SetStepInfoUpdateTask(projectId, index, policy, currentStepKey, stepInfo) ); } /** * Mark the index with step info explaining that the policy doesn't exist. */ - private void markPolicyDoesNotExist(String policyName, Index index, LifecycleExecutionState executionState) { + private void markPolicyDoesNotExist(ProjectId projectId, String policyName, Index index, LifecycleExecutionState executionState) { markPolicyRetrievalError( + projectId, policyName, index, executionState, @@ -535,7 +535,13 @@ private void markPolicyDoesNotExist(String policyName, Index index, LifecycleExe * the ERROR step, however, the policy may be unparseable in which case there is no way to move * to the ERROR step, so this is the best effort at capturing the error retrieving the policy. */ - private void markPolicyRetrievalError(String policyName, Index index, LifecycleExecutionState executionState, Exception e) { + private void markPolicyRetrievalError( + ProjectId projectId, + String policyName, + Index index, + LifecycleExecutionState executionState, + Exception e + ) { logger.debug( () -> format( "unable to retrieve policy [%s] for index [%s], recording this in step_info for this index", @@ -544,7 +550,7 @@ private void markPolicyRetrievalError(String policyName, Index index, LifecycleE ), e ); - setStepInfo(index, policyName, Step.getCurrentStepKey(executionState), new SetStepInfoUpdateTask.ExceptionWrapper(e)); + setStepInfo(projectId, index, policyName, Step.getCurrentStepKey(executionState), new SetStepInfoUpdateTask.ExceptionWrapper(e)); } /** @@ -651,8 +657,8 @@ private final class MoveToRetryFailedStepUpdateTask extends IndexLifecycleCluste private final String policy; private final Step failedStep; - MoveToRetryFailedStepUpdateTask(Index index, String policy, StepKey currentStep, Step failedStep) { - super(index, currentStep); + MoveToRetryFailedStepUpdateTask(ProjectId projectId, Index index, String policy, StepKey currentStep, Step failedStep) { + super(projectId, index, currentStep); this.policy = policy; this.failedStep = failedStep; } @@ -715,7 +721,7 @@ protected void onClusterStateProcessed(ProjectState newState) { index, stepKey ); - maybeRunAsyncAction(newState.cluster(), newIndexMeta, policy, stepKey); + maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey); } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 7742eb70ca918..e023a58034ecf 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; @@ -131,7 +132,8 @@ public IndexLifecycleService( } public void maybeRunAsyncAction(ClusterState clusterState, IndexMetadata indexMetadata, StepKey nextStepKey) { - lifecycleRunner.maybeRunAsyncAction(clusterState, indexMetadata, indexMetadata.getLifecyclePolicyName(), nextStepKey); + final var state = clusterState.projectState(); + lifecycleRunner.maybeRunAsyncAction(state, indexMetadata, indexMetadata.getLifecyclePolicyName(), nextStepKey); } /** @@ -184,7 +186,8 @@ void onMaster(ClusterState clusterState) { // TODO multi-project: this probably needs a per-project iteration @FixForMultiProject - final ProjectMetadata projectMetadata = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID); + final ProjectState state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID); + final ProjectMetadata projectMetadata = state.metadata(); final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE); if (currentMetadata != null) { OperationMode currentMode = currentILMMode(projectMetadata); @@ -211,7 +214,7 @@ void onMaster(ClusterState clusterState) { policyName, stepKey.name() ); - lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); + lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop safeToStop = false; } else { @@ -223,7 +226,7 @@ void onMaster(ClusterState clusterState) { ); } } else { - lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); + lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); } } catch (Exception e) { if (logger.isTraceEnabled()) { @@ -462,7 +465,8 @@ public boolean policyExists(String policyId) { @FixForMultiProject void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) { @FixForMultiProject - final var projectMetadata = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID); + final var state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID); + final var projectMetadata = state.metadata(); IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE); OperationMode currentMode = currentILMMode(projectMetadata); @@ -499,9 +503,9 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) stepKey.name() ); if (fromClusterStateChange) { - lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); + lifecycleRunner.runPolicyAfterStateChange(state.projectId(), policyName, idxMeta); } else { - lifecycleRunner.runPeriodicStep(policyName, clusterState.metadata(), idxMeta); + lifecycleRunner.runPeriodicStep(state, policyName, idxMeta); } // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop safeToStop = false; @@ -515,9 +519,9 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) } } else { if (fromClusterStateChange) { - lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); + lifecycleRunner.runPolicyAfterStateChange(state.projectId(), policyName, idxMeta); } else { - lifecycleRunner.runPeriodicStep(policyName, clusterState.metadata(), idxMeta); + lifecycleRunner.runPeriodicStep(state, policyName, idxMeta); } } } catch (Exception e) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java index 26e5d5e918798..9013cacdd01e7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.Step; @@ -38,6 +39,7 @@ public class MoveToErrorStepUpdateTask extends IndexLifecycleClusterStateUpdateT private final Exception cause; public MoveToErrorStepUpdateTask( + ProjectId projectId, Index index, String policy, Step.StepKey currentStepKey, @@ -46,7 +48,7 @@ public MoveToErrorStepUpdateTask( BiFunction stepLookupFunction, Consumer stateChangeConsumer ) { - super(index, currentStepKey); + super(projectId, index, currentStepKey); this.index = index; this.policy = policy; this.currentStepKey = currentStepKey; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java index b0e03902864b2..af58a4d55a8ce 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.Step; @@ -32,6 +33,7 @@ public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTa private final Consumer stateChangeConsumer; public MoveToNextStepUpdateTask( + ProjectId projectId, Index index, String policy, Step.StepKey currentStepKey, @@ -40,7 +42,7 @@ public MoveToNextStepUpdateTask( PolicyStepsRegistry stepRegistry, Consumer stateChangeConsumer ) { - super(index, currentStepKey); + super(projectId, index, currentStepKey); this.policy = policy; this.nextStepKey = nextStepKey; this.nowSupplier = nowSupplier; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java index 3f5e794461172..4e371fa100a27 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.index.Index; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -31,8 +32,8 @@ public class SetStepInfoUpdateTask extends IndexLifecycleClusterStateUpdateTask private final String policy; private final ToXContentObject stepInfo; - public SetStepInfoUpdateTask(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) { - super(index, currentStepKey); + public SetStepInfoUpdateTask(ProjectId projectId, Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) { + super(projectId, index, currentStepKey); this.policy = policy; this.stepInfo = stepInfo; } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java index caad8a325c50c..f48d40252bd5f 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java @@ -45,7 +45,7 @@ *
    *
  • * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#maybeRunAsyncAction( - * org.elasticsearch.cluster.ClusterState, + * org.elasticsearch.cluster.ProjectState, * org.elasticsearch.cluster.metadata.IndexMetadata, * java.lang.String, org.elasticsearch.xpack.core.ilm.Step.StepKey * )} @@ -53,6 +53,7 @@ *
  • *
  • * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPolicyAfterStateChange( + * org.elasticsearch.cluster.metadata.ProjectId, * java.lang.String, * org.elasticsearch.cluster.metadata.IndexMetadata * )} @@ -61,8 +62,8 @@ *
  • *
  • * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPeriodicStep( + * org.elasticsearch.cluster.ProjectState, * java.lang.String, - * org.elasticsearch.cluster.metadata.Metadata, * org.elasticsearch.cluster.metadata.IndexMetadata * )} * handles the execution of async {@link org.elasticsearch.xpack.core.ilm.AsyncWaitStep} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTaskTests.java index 11aa8816e55ee..a9136f7d69bf6 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTaskTests.java @@ -160,7 +160,15 @@ public void testNeverExecuteNonClusterStateStep() throws Exception { setStateToKey(thirdStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, thirdStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); assertThat(task.execute(state), sameInstance(state.cluster())); } @@ -169,7 +177,15 @@ public void testSuccessThenFailureUnsetNextKey() throws Exception { setStateToKey(firstStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, firstStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = getLifecycleExecutionState(newState); StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); @@ -186,7 +202,15 @@ public void testExecuteUntilFirstNonClusterStateStep() throws Exception { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = getLifecycleExecutionState(newState); StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); @@ -217,6 +241,7 @@ public void testExecuteInvalidStartStep() throws Exception { Step invalidStep = new MockClusterStateActionStep(firstStepKey, secondStepKey); long now = randomNonNegativeLong(); ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), invalidPolicyName, index, invalidStep, @@ -233,7 +258,15 @@ public void testExecuteIncompleteWaitStepNoInfo() throws Exception { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = getLifecycleExecutionState(newState); StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); @@ -252,7 +285,15 @@ public void testExecuteIncompleteWaitStepWithInfo() throws Exception { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = getLifecycleExecutionState(newState); StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); @@ -268,7 +309,15 @@ public void testOnFailure() throws IOException { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); Exception expectedException = new RuntimeException(); task.onFailure(expectedException); } @@ -279,7 +328,15 @@ public void testClusterActionStepThrowsException() throws Exception { setStateToKey(firstStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, firstStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = getLifecycleExecutionState(newState); StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); @@ -299,7 +356,15 @@ public void testClusterWaitStepThrowsException() throws Exception { setStateToKey(firstStepKey); Step startStep = policyStepsRegistry.getStep(indexMetadata, firstStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask( + state.projectId(), + mixedPolicyName, + index, + startStep, + policyStepsRegistry, + null, + () -> now + ); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = getLifecycleExecutionState(newState); StepKey currentStepKey = Step.getCurrentStepKey(lifecycleState); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index ac4738fb787b2..b2c8de98ed80c 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -140,7 +141,7 @@ public void testRunPolicyTerminalPolicyStep() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); Mockito.verify(clusterService, times(1)).createTaskQueue(anyString(), any(), any()); Mockito.verifyNoMoreInteractions(clusterService); @@ -154,8 +155,9 @@ public void testRunPolicyPhaseCompletePolicyStep() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); - runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); + final var state = projectStateFromProject(ProjectMetadata.builder(randomProjectIdOrDefault()).put(indexMetadata, true)); + runner.runPolicyAfterStateChange(state.projectId(), policyName, indexMetadata); + runner.runPeriodicStep(state, policyName, indexMetadata); Mockito.verify(clusterService, times(1)).createTaskQueue(anyString(), any(), any()); Mockito.verifyNoMoreInteractions(clusterService); @@ -180,8 +182,9 @@ public void testRunPolicyPhaseCompleteWithMoreStepsPolicyStep() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); - runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); + final var state = projectStateFromProject(ProjectMetadata.builder(randomProjectIdOrDefault()).put(indexMetadata, true)); + runner.runPolicyAfterStateChange(state.projectId(), policyName, indexMetadata); + runner.runPeriodicStep(state, policyName, indexMetadata); Mockito.verify(taskQueue, times(1)).submitTask(anyString(), any(), any()); } @@ -212,7 +215,7 @@ public void testRunPolicyErrorStep() { .putCustom(ILM_CUSTOM_METADATA_KEY, newState.build().asMap()) .build(); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); Mockito.verify(clusterService).createTaskQueue(anyString(), any(Priority.class), any()); Mockito.verifyNoMoreInteractions(clusterService); @@ -226,7 +229,7 @@ public void testSkip_afterStateChange() { .settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_SKIP, true)) .build(); - runner.runPolicyAfterStateChange(policyName, index); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, index); Mockito.verify(clusterService).createTaskQueue(anyString(), any(Priority.class), any()); Mockito.verifyNoMoreInteractions(clusterService); @@ -240,7 +243,7 @@ public void testSkip_periodicRun() { .settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_SKIP, true)) .build(); - runner.runPeriodicStep(policyName, null, index); + runner.runPeriodicStep(null, policyName, index); Mockito.verify(clusterService).createTaskQueue(anyString(), any(Priority.class), any()); Mockito.verifyNoMoreInteractions(clusterService); @@ -254,7 +257,8 @@ public void testSkip_asyncAction() { .settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_SKIP, true)) .build(); - runner.maybeRunAsyncAction(null, index, policyName, null); + final var state = projectStateFromProject(ProjectMetadata.builder(randomProjectIdOrDefault()).put(index, true)); + runner.maybeRunAsyncAction(state, index, policyName, null); Mockito.verify(clusterService).createTaskQueue(anyString(), any(Priority.class), any()); Mockito.verifyNoMoreInteractions(clusterService); @@ -296,7 +300,8 @@ public void testRunPolicyErrorStepOnRetryableFailedStep() { .putCustom(ILM_CUSTOM_METADATA_KEY, newState.build().asMap()) .build(); - runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); + final var state = projectStateFromProject(ProjectMetadata.builder(randomProjectIdOrDefault()).put(indexMetadata, true)); + runner.runPeriodicStep(state, policyName, indexMetadata); Mockito.verify(taskQueue, times(1)).submitTask(anyString(), any(), any()); } @@ -313,8 +318,12 @@ public void testRunStateChangePolicyWithNoNextStep() throws Exception { ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); DiscoveryNode node = clusterService.localNode(); IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING); - ClusterState state = ClusterState.builder(new ClusterName("cluster")) - .metadata(Metadata.builder().put(indexMetadata, true).putCustom(IndexLifecycleMetadata.TYPE, ilm)) + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) + .build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(project) .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); @@ -323,7 +332,7 @@ public void testRunStateChangePolicyWithNoNextStep() throws Exception { ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); step.setLatch(latch); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(project.id(), policyName, indexMetadata); awaitLatch(latch, 5, TimeUnit.SECONDS); ClusterState after = clusterService.state(); @@ -365,8 +374,12 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); DiscoveryNode node = clusterService.localNode(); IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING); - ClusterState state = ClusterState.builder(new ClusterName("cluster")) - .metadata(Metadata.builder().put(indexMetadata, true).putCustom(IndexLifecycleMetadata.TYPE, ilm)) + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) + .build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(project) .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); @@ -376,7 +389,7 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); nextStep.setLatch(latch); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(project.id(), policyName, indexMetadata); awaitLatch(latch, 5, TimeUnit.SECONDS); @@ -384,7 +397,7 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { assertBusy(() -> assertNotEquals(before, clusterService.state())); LifecycleExecutionState newExecutionState = clusterService.state() .metadata() - .getProject() + .getProject(project.id()) .index(indexMetadata.getIndex()) .getLifecycleExecutionState(); assertThat(newExecutionState.phase(), equalTo("phase")); @@ -453,28 +466,36 @@ public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); DiscoveryNode node = clusterService.localNode(); IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING); - ClusterState state = ClusterState.builder(new ClusterName("cluster")) - .metadata(Metadata.builder().put(indexMetadata, true).putCustom(IndexLifecycleMetadata.TYPE, ilm)) - .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) .build(); - ClusterServiceUtils.setState(clusterService, state); + ProjectState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(project) + .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) + .build() + .projectState(project.id()); + ClusterServiceUtils.setState(clusterService, state.cluster()); long stepTime = randomLong(); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> stepTime); - ClusterState before = clusterService.state(); if (asyncAction) { - runner.maybeRunAsyncAction(before, indexMetadata, policyName, stepKey); + runner.maybeRunAsyncAction(state, indexMetadata, policyName, stepKey); } else if (periodicAction) { - runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); + runner.runPeriodicStep(state, policyName, indexMetadata); } else { - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(project.id(), policyName, indexMetadata); } // The cluster state can take a few extra milliseconds to update after the steps are executed - assertBusy(() -> assertNotEquals(before, clusterService.state())); + ClusterServiceUtils.awaitClusterState( + logger, + s -> s.metadata().getProject(state.projectId()).index(indexMetadata.getIndex()).getLifecycleExecutionState().stepInfo() != null, + clusterService + ); LifecycleExecutionState newExecutionState = clusterService.state() .metadata() - .getProject() + .getProject(state.projectId()) .index(indexMetadata.getIndex()) .getLifecycleExecutionState(); assertThat(newExecutionState.phase(), equalTo("phase")); @@ -503,8 +524,12 @@ public void testRunAsyncActionDoesNotRun() { ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); DiscoveryNode node = clusterService.localNode(); IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING); - ClusterState state = ClusterState.builder(new ClusterName("cluster")) - .metadata(Metadata.builder().put(indexMetadata, true).putCustom(IndexLifecycleMetadata.TYPE, ilm)) + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) + .build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(project) .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); @@ -512,7 +537,7 @@ public void testRunAsyncActionDoesNotRun() { ClusterState before = clusterService.state(); // State changes should not run AsyncAction steps - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(project.id(), policyName, indexMetadata); ClusterState after = clusterService.state(); @@ -553,8 +578,12 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); DiscoveryNode node = clusterService.localNode(); IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING); - ClusterState state = ClusterState.builder(new ClusterName("cluster")) - .metadata(Metadata.builder().put(indexMetadata, true).putCustom(IndexLifecycleMetadata.TYPE, ilm)) + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) + .build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(project) .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) .build(); logger.info("--> state: {}", state); @@ -566,7 +595,7 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { step.setLatch(latch); CountDownLatch asyncLatch = new CountDownLatch(1); nextStep.setLatch(asyncLatch); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(project.id(), policyName, indexMetadata); // Wait for the cluster state action step awaitLatch(latch, 5, TimeUnit.SECONDS); @@ -630,18 +659,23 @@ public void testRunPeriodicStep() throws Exception { ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); DiscoveryNode node = clusterService.localNode(); IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING); - ClusterState state = ClusterState.builder(new ClusterName("cluster")) - .metadata(Metadata.builder().put(indexMetadata, true).putCustom(IndexLifecycleMetadata.TYPE, ilm)) - .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) .build(); + ProjectState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(project) + .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) + .build() + .projectState(project.id()); logger.info("--> state: {}", state); - ClusterServiceUtils.setState(clusterService, state); + ClusterServiceUtils.setState(clusterService, state.cluster()); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); step.setLatch(latch); - runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata); + runner.runPeriodicStep(state, policyName, indexMetadata); awaitLatch(latch, 5, TimeUnit.SECONDS); ClusterState after = clusterService.state(); @@ -664,7 +698,7 @@ public void testRunPolicyClusterStateActionStep() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); final ExecuteStepsUpdateTaskMatcher taskMatcher = new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step); Mockito.verify(taskQueue, Mockito.times(1)) @@ -689,7 +723,7 @@ public void testRunPolicyClusterStateWaitStep() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); final ExecuteStepsUpdateTaskMatcher taskMatcher = new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step); Mockito.verify(taskQueue, Mockito.times(1)) @@ -715,7 +749,7 @@ public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); assertEquals(0, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).createTaskQueue(any(), any(), any()); @@ -733,7 +767,7 @@ public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetadata indexMetadata = createIndex("my_index"); - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); assertEquals(0, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).createTaskQueue(any(), any(), any()); @@ -753,7 +787,7 @@ public void testRunPolicyThatDoesntExist() { ); IndexMetadata indexMetadata = createIndex("my_index"); // verify that no exception is thrown - runner.runPolicyAfterStateChange(policyName, indexMetadata); + runner.runPolicyAfterStateChange(randomProjectIdOrDefault(), policyName, indexMetadata); final SetStepInfoUpdateTaskMatcher taskMatcher = new SetStepInfoUpdateTaskMatcher( indexMetadata.getIndex(), policyName, diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java index bba21d5a3a170..3151d39eed61b 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java @@ -73,6 +73,7 @@ public void testExecuteSuccessfullyMoved() throws Exception { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask( + state.projectId(), index, policy, currentStepKey, @@ -104,6 +105,7 @@ public void testExecuteNoopDifferentStep() throws Exception { Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE"); setStateToKey(notCurrentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask( + state.projectId(), index, policy, currentStepKey, @@ -123,6 +125,7 @@ public void testExecuteNoopDifferentPolicy() throws Exception { setStateToKey(currentStepKey); setStatePolicy("not-" + policy); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask( + state.projectId(), index, policy, currentStepKey, diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTaskTests.java index a830246aaecd6..841b5445a18b9 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTaskTests.java @@ -96,6 +96,7 @@ public void testExecuteSuccessfullyMoved() throws Exception { ) ); MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask( + state.projectId(), index, policy, currentStepKey, @@ -124,6 +125,7 @@ public void testExecuteDifferentCurrentStep() throws Exception { long now = randomNonNegativeLong(); setStateToKey(notCurrentStepKey, now); MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask( + state.projectId(), index, policy, currentStepKey, @@ -142,6 +144,7 @@ public void testExecuteDifferentPolicy() throws Exception { setStateToKey(currentStepKey, now); setStatePolicy("not-" + policy); MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask( + state.projectId(), index, policy, currentStepKey, @@ -173,6 +176,7 @@ public void testExecuteSuccessfulMoveWithInvalidNextStep() throws Exception { ) ); MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask( + state.projectId(), index, policy, currentStepKey, @@ -203,6 +207,7 @@ public void testOnFailure() { setStateToKey(currentStepKey, now); MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask( + state.projectId(), index, policy, currentStepKey, diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTaskTests.java index 1ec19a2632b35..eb8dc7bab2c4b 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTaskTests.java @@ -59,7 +59,7 @@ public void testExecuteSuccessfullySet() throws Exception { ToXContentObject stepInfo = getRandomStepInfo(); setStateToKey(currentStepKey); - SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo); + SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(state.projectId(), index, policy, currentStepKey, stepInfo); ClusterState newState = task.execute(state); LifecycleExecutionState lifecycleState = newState.metadata() .getProject(state.projectId()) @@ -93,7 +93,7 @@ public void testExecuteNoopDifferentStep() throws Exception { StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current"); ToXContentObject stepInfo = getRandomStepInfo(); setStateToKey(notCurrentStepKey); - SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo); + SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(state.projectId(), index, policy, currentStepKey, stepInfo); ClusterState newState = task.execute(state); assertThat(newState, sameInstance(state.cluster())); } @@ -103,7 +103,7 @@ public void testExecuteNoopDifferentPolicy() throws Exception { ToXContentObject stepInfo = getRandomStepInfo(); setStateToKey(currentStepKey); setStatePolicy("not-" + policy); - SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo); + SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(state.projectId(), index, policy, currentStepKey, stepInfo); ClusterState newState = task.execute(state); assertThat(newState, sameInstance(state.cluster())); } @@ -115,7 +115,7 @@ public void testOnFailure() throws IllegalAccessException { setStateToKey(currentStepKey); - SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo); + SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(state.projectId(), index, policy, currentStepKey, stepInfo); try (var mockLog = MockLog.capture(SetStepInfoUpdateTask.class)) { mockLog.addExpectation(