diff --git a/docs/changelog/133683.yaml b/docs/changelog/133683.yaml new file mode 100644 index 0000000000000..2782b8c2c6c19 --- /dev/null +++ b/docs/changelog/133683.yaml @@ -0,0 +1,8 @@ +pr: 133683 +summary: Avoid running asynchronous ILM actions while ILM is stopped +area: ILM+SLM +type: bug +issues: + - 99859 + - 81234 + - 85097 diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java index 5a61af793d907..3bd23ce9e2d2e 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java @@ -10,6 +10,7 @@ import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -19,18 +20,22 @@ import org.elasticsearch.xpack.core.ilm.DeleteAction; import org.elasticsearch.xpack.core.ilm.ForceMergeAction; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; +import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.junit.Before; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.index; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; @@ -51,6 +56,7 @@ public void refreshIndex() { index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); policy = "policy-" + randomAlphaOfLength(5); alias = "alias-" + randomAlphaOfLength(5); + logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy); } public void testMoveToAllocateStep() throws Exception { @@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception { assertBusy(() -> { indexExists("test-000002"); }); } + /** + * Test that an async action does not execute when the Move To Step API is used while ILM is stopped. + * Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process + * never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the + * runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that + * the index is not stuck in the async action step. + */ + public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception { + String originalIndex = index + "-000001"; + // Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep. + var actions = Map.of( + "rollover", + new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()), + "readonly", + new ReadOnlyAction() + ); + Phase phase = new Phase("hot", TimeValue.ZERO, actions); + createPolicy(client(), policy, phase, null, null, null, null); + + createIndexWithSettings( + client(), + originalIndex, + alias, + Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias) + ); + + // Wait for ILM to do everything it can for this index + assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex))); + + // Stop ILM + client().performRequest(new Request("POST", "/_ilm/stop")); + + // Move ILM to the readonly step, which is an async action step. + Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex); + moveToStepRequest.setJsonEntity(""" + { + "current_step": { + "phase": "hot", + "action": "rollover", + "name": "check-rollover-ready" + }, + "next_step": { + "phase": "hot", + "action": "readonly", + "name": "readonly" + } + }"""); + client().performRequest(moveToStepRequest); + + // Since ILM is stopped, the async action should not execute and the index should remain in the readonly step. + // This is the tricky part of the test, as we can't really verify that the async action will never happen. + assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex)); + + // Restart ILM + client().performRequest(new Request("POST", "/_ilm/start")); + + // Make sure we actually complete the remainder of the policy after ILM is started again. + assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex))); + } + public void testMoveToStepWithInvalidNextStep() throws Exception { createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100)); createIndexWithSettings( diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 79dc7cb25696e..97ed875c280a9 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; @@ -48,6 +49,7 @@ import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE; +import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode; class IndexLifecycleRunner { private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class); @@ -308,6 +310,12 @@ void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey cur void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) { final var projectId = state.projectId(); String index = indexMetadata.getIndex().getName(); + OperationMode currentMode = currentILMMode(state.metadata()); + if (OperationMode.RUNNING.equals(currentMode) == false) { + logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode); + return; + } + if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) { logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP); return; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 8e5ede13128f9..6dcccd103db39 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -184,88 +184,78 @@ void onMaster(ClusterState clusterState) { maybeScheduleJob(); for (var projectId : clusterState.metadata().projects().keySet()) { - onMaster(clusterState.projectState(projectId)); + maybeRunAsyncActions(clusterState.projectState(projectId)); } } - void onMaster(ProjectState state) { + /** + * Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped. + */ + private void maybeRunAsyncActions(ProjectState state) { final ProjectMetadata projectMetadata = state.metadata(); final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE); - if (currentMetadata != null) { - OperationMode currentMode = currentILMMode(projectMetadata); - if (OperationMode.STOPPED.equals(currentMode)) { - return; - } - - boolean safeToStop = true; // true until proven false by a run policy - - // If we just became master, we need to kick off any async actions that - // may have not been run due to master rollover - for (IndexMetadata idxMeta : projectMetadata.indices().values()) { - if (projectMetadata.isIndexManagedByILM(idxMeta)) { - String policyName = idxMeta.getLifecyclePolicyName(); - final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState(); - StepKey stepKey = Step.getCurrentStepKey(lifecycleState); - - try { - if (OperationMode.STOPPING == currentMode) { - if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) { - logger.info( - "waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]", - idxMeta.getIndex().getName(), - policyName, - stepKey.name() - ); - lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); - // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop - safeToStop = false; - } else { - logger.info( - "skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping", - stepKey == null ? "n/a" : stepKey.name(), - idxMeta.getIndex().getName(), - policyName - ); - } - } else { - lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); - } - } catch (Exception e) { - if (logger.isTraceEnabled()) { - logger.warn( - () -> format( - "async action execution failed during master election trigger" - + " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]", - idxMeta.getIndex().getName(), - policyName, - stepKey, - lifecycleState.asMap() - ), - e - ); - } else { - logger.warn( - () -> format( - "async action execution failed during master election trigger" - + " for index [%s] with policy [%s] in step [%s]", - idxMeta.getIndex().getName(), - policyName, - stepKey - ), - e - ); + if (currentMetadata == null) { + return; + } + OperationMode currentMode = currentILMMode(projectMetadata); + if (OperationMode.STOPPED.equals(currentMode)) { + return; + } - } - // Don't rethrow the exception, we don't want a failure for one index to be - // called to cause actions not to be triggered for further indices - } - } + boolean safeToStop = true; // true until proven false by a run policy + for (IndexMetadata idxMeta : projectMetadata.indices().values()) { + if (projectMetadata.isIndexManagedByILM(idxMeta) == false) { + continue; } + String policyName = idxMeta.getLifecyclePolicyName(); + final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState(); + StepKey stepKey = Step.getCurrentStepKey(lifecycleState); + + try { + if (currentMode == OperationMode.RUNNING) { + lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); + continue; + } + // We only get here if ILM is in STOPPING mode. In that case, we need to check if there is any index that is in a step + // that we can't stop ILM in. If there is, we don't stop ILM yet. + if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) { + logger.info( + "waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]", + idxMeta.getIndex().getName(), + policyName, + stepKey.name() + ); + lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); + // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop + safeToStop = false; + } else { + logger.info( + "skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping", + stepKey == null ? "n/a" : stepKey.name(), + idxMeta.getIndex().getName(), + policyName + ); + } + } catch (Exception e) { + String logMessage = format( + "async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]", + idxMeta.getIndex().getName(), + policyName, + stepKey + ); + if (logger.isTraceEnabled()) { + logMessage += format(", lifecycle state: [%s]", lifecycleState.asMap()); + } + logger.warn(logMessage, e); - if (safeToStop && OperationMode.STOPPING == currentMode) { - stopILM(state.projectId()); + // Don't rethrow the exception, we don't want a failure for one index to be + // called to cause actions not to be triggered for further indices } } + + if (safeToStop && OperationMode.STOPPING == currentMode) { + stopILM(state.projectId()); + } } private void stopILM(ProjectId projectId) { @@ -333,6 +323,20 @@ public void clusterChanged(ClusterChangedEvent event) { cancelJob(); policyRegistry.clear(); } + } else if (this.isMaster) { + // If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING. + // If so, kick off any async actions that may not have run while not in RUNNING mode. + for (ProjectMetadata project : event.state().metadata().projects().values()) { + final var previousProject = event.previousState().metadata().projects().get(project.id()); + if (previousProject == null || project == previousProject) { + continue; + } + final OperationMode currentMode = currentILMMode(project); + final OperationMode previousMode = currentILMMode(previousProject); + if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) { + maybeRunAsyncActions(event.state().projectState(project.id())); + } + } } // if we're the master, then process deleted indices and trigger policies diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index f3f96b77e3ec3..8bda79a77dd42 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -265,6 +265,34 @@ public void testSkip_asyncAction() { Mockito.verifyNoMoreInteractions(clusterService); } + /** + * Test that an async action step is not executed when ILM is stopped. + */ + public void testNotRunningAsyncActionWhenILMIsStopped() { + String policyName = "stopped_policy"; + Step.StepKey stepKey = new Step.StepKey("phase", "action", "async_action_step"); + + MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); + + PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + ClusterService clusterService = mock(ClusterService.class); + newMockTaskQueue(clusterService); // ensure constructor call to createTaskQueue is satisfied + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); + + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .build(); + + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.STOPPED); + final var project = ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(indexMetadata, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm) + .build(); + runner.maybeRunAsyncAction(projectStateFromProject(project), indexMetadata, policyName, stepKey); + + assertThat(step.getExecuteCount(), equalTo(0L)); + } + public void testRunPolicyErrorStepOnRetryableFailedStep() { String policyName = "rollover_policy"; String phaseName = "hot"; @@ -586,7 +614,6 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { .putProjectMetadata(project) .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId())) .build(); - logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);