Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog/133683.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pr: 133683
summary: Avoid running asynchronous ILM actions while ILM is stopped
area: ILM+SLM
type: bug
issues:
- 99859
- 81234
- 85097
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -19,18 +20,22 @@
import org.elasticsearch.xpack.core.ilm.DeleteAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.junit.Before;

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.index;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
Expand All @@ -51,6 +56,7 @@ public void refreshIndex() {
index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
policy = "policy-" + randomAlphaOfLength(5);
alias = "alias-" + randomAlphaOfLength(5);
logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy);
}

public void testMoveToAllocateStep() throws Exception {
Expand Down Expand Up @@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception {
assertBusy(() -> { indexExists("test-000002"); });
}

/**
* Test that an async action does not execute when the Move To Step API is used while ILM is stopped.
* Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process
* never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the
* runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that
* the index is not stuck in the async action step.
*/
public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception {
String originalIndex = index + "-000001";
// Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep.
var actions = Map.of(
"rollover",
new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()),
"readonly",
new ReadOnlyAction()
);
Phase phase = new Phase("hot", TimeValue.ZERO, actions);
createPolicy(client(), policy, phase, null, null, null, null);

createIndexWithSettings(
client(),
originalIndex,
alias,
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
);

// Wait for ILM to do everything it can for this index
assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex)));

// Stop ILM
client().performRequest(new Request("POST", "/_ilm/stop"));

// Move ILM to the readonly step, which is an async action step.
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
moveToStepRequest.setJsonEntity("""
{
"current_step": {
"phase": "hot",
"action": "rollover",
"name": "check-rollover-ready"
},
"next_step": {
"phase": "hot",
"action": "readonly",
"name": "readonly"
}
}""");
client().performRequest(moveToStepRequest);

// Since ILM is stopped, the async action should not execute and the index should remain in the readonly step.
// This is the tricky part of the test, as we can't really verify that the async action will never happen.
assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex));
Comment on lines +303 to +305
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions for improved assertions are welcome. Adding a Thread.sleep here would increase our confidence, but still wouldn't make any guarantee.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we write a test that wraps the Client passed to IndexLifecycleService with a wrapper that asserts that we never perform some particular ILM action?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test to IndexLifecycleRunnerTests in 370d966 that verifies the action isn't executed when ILM is stopped. Let me know if that's what you had in mind. I chose for testing IndexLifecycleRunner#maybeRunAsyncAction instead of IndexLifecycleService#maybeRunAsyncAction, as the latter is only used by APIs and the former is also used internally by ILM.


// Restart ILM
client().performRequest(new Request("POST", "/_ilm/start"));

// Make sure we actually complete the remainder of the policy after ILM is started again.
assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex)));
}

public void testMoveToStepWithInvalidNextStep() throws Exception {
createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100));
createIndexWithSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.ilm.ErrorStep;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
Expand All @@ -48,6 +49,7 @@

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;

class IndexLifecycleRunner {
private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
Expand Down Expand Up @@ -308,6 +310,12 @@ void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey cur
void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) {
final var projectId = state.projectId();
String index = indexMetadata.getIndex().getName();
OperationMode currentMode = currentILMMode(state.metadata());
if (OperationMode.RUNNING.equals(currentMode) == false) {
logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode);
return;
}

if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) {
logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,88 +184,78 @@ void onMaster(ClusterState clusterState) {
maybeScheduleJob();

for (var projectId : clusterState.metadata().projects().keySet()) {
onMaster(clusterState.projectState(projectId));
maybeRunAsyncActions(clusterState.projectState(projectId));
}
}

void onMaster(ProjectState state) {
/**
* Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped.
*/
private void maybeRunAsyncActions(ProjectState state) {
final ProjectMetadata projectMetadata = state.metadata();
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata != null) {
OperationMode currentMode = currentILMMode(projectMetadata);
if (OperationMode.STOPPED.equals(currentMode)) {
return;
}

boolean safeToStop = true; // true until proven false by a run policy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this method aren't stricly necessary, I just took this opportunity to clean this method up a bit, as it was becoming hard to read. If there are concerns with this refactor, I can revert these stylistic changes and stick to the bug fix.

// If we just became master, we need to kick off any async actions that
// may have not been run due to master rollover
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
if (projectMetadata.isIndexManagedByILM(idxMeta)) {
String policyName = idxMeta.getLifecyclePolicyName();
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);

try {
if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
logger.info(
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(),
policyName,
stepKey.name()
);
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info(
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
stepKey == null ? "n/a" : stepKey.name(),
idxMeta.getIndex().getName(),
policyName
);
}
} else {
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
}
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.warn(
() -> format(
"async action execution failed during master election trigger"
+ " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey,
lifecycleState.asMap()
),
e
);
} else {
logger.warn(
() -> format(
"async action execution failed during master election trigger"
+ " for index [%s] with policy [%s] in step [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey
),
e
);
if (currentMetadata == null) {
return;
}
OperationMode currentMode = currentILMMode(projectMetadata);
if (OperationMode.STOPPED.equals(currentMode)) {
return;
}

}
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}
boolean safeToStop = true; // true until proven false by a run policy
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
if (projectMetadata.isIndexManagedByILM(idxMeta) == false) {
continue;
}
String policyName = idxMeta.getLifecyclePolicyName();
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);

try {
if (currentMode == OperationMode.RUNNING) {
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
continue;
}
// We only get here if ILM is in STOPPING mode. In that case, we need to check if there is any index that is in a step
// that we can't stop ILM in. If there is, we don't stop ILM yet.
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
logger.info(
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(),
policyName,
stepKey.name()
);
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info(
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
stepKey == null ? "n/a" : stepKey.name(),
idxMeta.getIndex().getName(),
policyName
);
}
} catch (Exception e) {
String logMessage = format(
"async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey
);
if (logger.isTraceEnabled()) {
logMessage += format(", lifecycle state: [%s]", lifecycleState.asMap());
}
logger.warn(logMessage, e);

if (safeToStop && OperationMode.STOPPING == currentMode) {
stopILM(state.projectId());
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}

if (safeToStop && OperationMode.STOPPING == currentMode) {
stopILM(state.projectId());
}
}

private void stopILM(ProjectId projectId) {
Expand Down Expand Up @@ -333,6 +323,20 @@ public void clusterChanged(ClusterChangedEvent event) {
cancelJob();
policyRegistry.clear();
}
} else if (this.isMaster) {
// If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING.
// If so, kick off any async actions that may not have run while not in RUNNING mode.
for (ProjectMetadata project : event.state().metadata().projects().values()) {
final var previousProject = event.previousState().metadata().projects().get(project.id());
if (previousProject == null || project == previousProject) {
continue;
}
final OperationMode currentMode = currentILMMode(project);
final OperationMode previousMode = currentILMMode(previousProject);
if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) {
maybeRunAsyncActions(event.state().projectState(project.id()));
}
}
}

// if we're the master, then process deleted indices and trigger policies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,34 @@ public void testSkip_asyncAction() {
Mockito.verifyNoMoreInteractions(clusterService);
}

/**
* Test that an async action step is not executed when ILM is stopped.
*/
public void testNotRunningAsyncActionWhenILMIsStopped() {
String policyName = "stopped_policy";
Step.StepKey stepKey = new Step.StepKey("phase", "action", "async_action_step");

MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);

PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class);
newMockTaskQueue(clusterService); // ensure constructor call to createTaskQueue is satisfied
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);

IndexMetadata indexMetadata = IndexMetadata.builder("test")
.settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.build();

IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.STOPPED);
final var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.put(indexMetadata, true)
.putCustom(IndexLifecycleMetadata.TYPE, ilm)
.build();
runner.maybeRunAsyncAction(projectStateFromProject(project), indexMetadata, policyName, stepKey);

assertThat(step.getExecuteCount(), equalTo(0L));
}

public void testRunPolicyErrorStepOnRetryableFailedStep() {
String policyName = "rollover_policy";
String phaseName = "hot";
Expand Down Expand Up @@ -586,7 +614,6 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception {
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId()))
.build();
logger.info("--> state: {}", state);
ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);

Expand Down