Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -19,18 +20,22 @@
import org.elasticsearch.xpack.core.ilm.DeleteAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.junit.Before;

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.index;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
Expand All @@ -51,6 +56,7 @@ public void refreshIndex() {
index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
policy = "policy-" + randomAlphaOfLength(5);
alias = "alias-" + randomAlphaOfLength(5);
logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy);
}

public void testMoveToAllocateStep() throws Exception {
Expand Down Expand Up @@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception {
assertBusy(() -> { indexExists("test-000002"); });
}

/**
* Test that an async action does not execute when the Move To Step API is used while ILM is stopped.
* Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process
* never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the
* runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that
* the index is not stuck in the async action step.
*/
public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception {
String originalIndex = index + "-000001";
// Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep.
var actions = Map.of(
"rollover",
new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()),
"readonly",
new ReadOnlyAction()
);
Phase phase = new Phase("hot", TimeValue.ZERO, actions);
createPolicy(client(), policy, phase, null, null, null, null);

createIndexWithSettings(
client(),
originalIndex,
alias,
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
);

// Wait for ILM to do everything it can for this index
assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex)));

// Stop ILM
client().performRequest(new Request("POST", "/_ilm/stop"));

// Move ILM to the readonly step, which is an async action step.
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
moveToStepRequest.setJsonEntity("""
{
"current_step": {
"phase": "hot",
"action": "rollover",
"name": "check-rollover-ready"
},
"next_step": {
"phase": "hot",
"action": "readonly",
"name": "readonly"
}
}""");
client().performRequest(moveToStepRequest);

// Since ILM is stopped, the async action should not execute and the index should remain in the readonly step.
// This is the tricky part of the test, as we can't really verify that the async action will never happen.
assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex));
Comment on lines +303 to +305
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions for improved assertions are welcome. Adding a Thread.sleep here would increase our confidence, but still wouldn't make any guarantee.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we write a test that wraps the Client passed to IndexLifecycleService with a wrapper that asserts that we never perform some particular ILM action?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test to IndexLifecycleRunnerTests in 370d966 that verifies the action isn't executed when ILM is stopped. Let me know if that's what you had in mind. I chose for testing IndexLifecycleRunner#maybeRunAsyncAction instead of IndexLifecycleService#maybeRunAsyncAction, as the latter is only used by APIs and the former is also used internally by ILM.


// Restart ILM
client().performRequest(new Request("POST", "/_ilm/start"));

// Make sure we actually complete the remainder of the policy after ILM is started again.
assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex)));
}

public void testMoveToStepWithInvalidNextStep() throws Exception {
createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100));
createIndexWithSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.ilm.ErrorStep;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
Expand All @@ -48,6 +49,7 @@

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;

class IndexLifecycleRunner {
private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
Expand Down Expand Up @@ -308,6 +310,12 @@ void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey cur
void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) {
final var projectId = state.projectId();
String index = indexMetadata.getIndex().getName();
OperationMode currentMode = currentILMMode(state.metadata());
if (OperationMode.RUNNING.equals(currentMode) == false) {
logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode);
return;
}

if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) {
logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,14 @@ void onMaster(ClusterState clusterState) {
maybeScheduleJob();

for (var projectId : clusterState.metadata().projects().keySet()) {
onMaster(clusterState.projectState(projectId));
maybeRunAsyncActions(clusterState.projectState(projectId));
}
}

void onMaster(ProjectState state) {
/**
* Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped.
*/
private void maybeRunAsyncActions(ProjectState state) {
final ProjectMetadata projectMetadata = state.metadata();
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata != null) {
Expand All @@ -198,67 +201,51 @@ void onMaster(ProjectState state) {
}

boolean safeToStop = true; // true until proven false by a run policy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this method aren't stricly necessary, I just took this opportunity to clean this method up a bit, as it was becoming hard to read. If there are concerns with this refactor, I can revert these stylistic changes and stick to the bug fix.

// If we just became master, we need to kick off any async actions that
// may have not been run due to master rollover
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
if (projectMetadata.isIndexManagedByILM(idxMeta)) {
String policyName = idxMeta.getLifecyclePolicyName();
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);

try {
if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
logger.info(
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(),
policyName,
stepKey.name()
);
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info(
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
stepKey == null ? "n/a" : stepKey.name(),
idxMeta.getIndex().getName(),
policyName
);
}
} else {
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
}
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.warn(
() -> format(
"async action execution failed during master election trigger"
+ " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey,
lifecycleState.asMap()
),
e
);
} else {
logger.warn(
() -> format(
"async action execution failed during master election trigger"
+ " for index [%s] with policy [%s] in step [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey
),
e
);
if (projectMetadata.isIndexManagedByILM(idxMeta) == false) {
continue;
}
String policyName = idxMeta.getLifecyclePolicyName();
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);

}
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
try {
if (currentMode == OperationMode.RUNNING) {
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
continue;
}
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part is harder to read, because someone has to keep in mind that the second if statement executes only if currentMode != OperationMode.RUNNING.

What about factoring it into separate methods, so that it can look something like:

switch (currentMode) {
    case OperationMode.RUNNING:
        lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
        break;
    case OperationMode.STOPPING:
    case OperationMode.STOPPED:
        runOrCheckIfSafeToIgnore(…);
    default:
        throw new IllegalArgumentException("you need to handle the case for " + currentMode);
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because someone has to keep in mind that ...

I'm not sure I get what you mean. The two if statements are only a few lines apart, right? If they would be far apart, I could get that it's harder to understand/remember.

I do agree that it's easier to miss that we already do the check

        if (OperationMode.STOPPED.equals(currentMode)) {
            return;
        }

earlier in the method, which means that the second if statement you were referring to only executes if currentMode == STOPPING. Therefore, the switch you suggested looks a bit overkill to me, as there would only be two branches. A switch with two values is essentially just an if-else, which is basically what I have now.

I noticed another if statement at the start of this method that I could invert, and I added a comment between these two if statements, in the hope that that will make it easier to read, in 26b3dc4. Let me know what you think.

logger.info(
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(),
policyName,
stepKey.name()
);
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info(
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
stepKey == null ? "n/a" : stepKey.name(),
idxMeta.getIndex().getName(),
policyName
);
}
} catch (Exception e) {
String format = format(
"async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey
);
if (logger.isTraceEnabled()) {
format += format(", lifecycle state: [%s]", lifecycleState.asMap());
}
logger.warn(format, e);
Copy link

Copilot AI Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using format as both a variable name and function call creates ambiguous code. The variable format is being concatenated with the result of calling format() function, which could be confusing and error-prone.

Copilot uses AI. Check for mistakes.


// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}

Expand Down Expand Up @@ -333,6 +320,20 @@ public void clusterChanged(ClusterChangedEvent event) {
cancelJob();
policyRegistry.clear();
}
} else if (this.isMaster) {
// If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING.
// If so, kick off any async actions that may not have run while not in RUNNING mode.
for (ProjectMetadata project : event.state().metadata().projects().values()) {
final var previousProject = event.previousState().metadata().projects().get(project.id());
if (previousProject == null || project == previousProject) {
continue;
}
final OperationMode currentMode = currentILMMode(project);
final OperationMode previousMode = currentILMMode(previousProject);
if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) {
maybeRunAsyncActions(event.state().projectState(project.id()));
}
}
}

// if we're the master, then process deleted indices and trigger policies
Expand Down