-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Don't run async actions when ILM is stopped #133683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
3b77520
930155f
6aadbb3
26b3dc4
370d966
0ed022c
ebe7f23
80f5e81
02e978b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
pr: 133683 | ||
summary: Don't run async actions when ILM is stopped | ||
area: ILM+SLM | ||
type: bug | ||
issues: | ||
- 99859 | ||
- 81234 | ||
- 85097 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
import org.apache.http.util.EntityUtils; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.client.ResponseException; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
|
@@ -19,18 +20,22 @@ | |
import org.elasticsearch.xpack.core.ilm.DeleteAction; | ||
import org.elasticsearch.xpack.core.ilm.ForceMergeAction; | ||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings; | ||
import org.elasticsearch.xpack.core.ilm.Phase; | ||
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; | ||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; | ||
import org.elasticsearch.xpack.core.ilm.RolloverAction; | ||
import org.elasticsearch.xpack.core.ilm.ShrinkAction; | ||
import org.elasticsearch.xpack.core.ilm.Step.StepKey; | ||
import org.junit.Before; | ||
|
||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy; | ||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings; | ||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; | ||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy; | ||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; | ||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.index; | ||
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; | ||
|
@@ -51,6 +56,7 @@ public void refreshIndex() { | |
index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); | ||
policy = "policy-" + randomAlphaOfLength(5); | ||
alias = "alias-" + randomAlphaOfLength(5); | ||
logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy); | ||
} | ||
|
||
public void testMoveToAllocateStep() throws Exception { | ||
|
@@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception { | |
assertBusy(() -> { indexExists("test-000002"); }); | ||
} | ||
|
||
/** | ||
* Test that an async action does not execute when the Move To Step API is used while ILM is stopped. | ||
* Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process | ||
* never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the | ||
* runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that | ||
* the index is not stuck in the async action step. | ||
*/ | ||
public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception { | ||
String originalIndex = index + "-000001"; | ||
// Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep. | ||
var actions = Map.of( | ||
"rollover", | ||
new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()), | ||
"readonly", | ||
new ReadOnlyAction() | ||
); | ||
Phase phase = new Phase("hot", TimeValue.ZERO, actions); | ||
createPolicy(client(), policy, phase, null, null, null, null); | ||
|
||
createIndexWithSettings( | ||
client(), | ||
originalIndex, | ||
alias, | ||
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias) | ||
); | ||
|
||
// Wait for ILM to do everything it can for this index | ||
assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex))); | ||
|
||
// Stop ILM | ||
client().performRequest(new Request("POST", "/_ilm/stop")); | ||
|
||
// Move ILM to the readonly step, which is an async action step. | ||
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex); | ||
moveToStepRequest.setJsonEntity(""" | ||
{ | ||
"current_step": { | ||
"phase": "hot", | ||
"action": "rollover", | ||
"name": "check-rollover-ready" | ||
}, | ||
"next_step": { | ||
"phase": "hot", | ||
"action": "readonly", | ||
"name": "readonly" | ||
} | ||
}"""); | ||
client().performRequest(moveToStepRequest); | ||
|
||
// Since ILM is stopped, the async action should not execute and the index should remain in the readonly step. | ||
// This is the tricky part of the test, as we can't really verify that the async action will never happen. | ||
assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex)); | ||
Comment on lines
+303
to
+305
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any suggestions for improved assertions are welcome. Adding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we write a test that wraps the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a test to |
||
|
||
// Restart ILM | ||
client().performRequest(new Request("POST", "/_ilm/start")); | ||
|
||
// Make sure we actually complete the remainder of the policy after ILM is started again. | ||
assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex))); | ||
} | ||
|
||
public void testMoveToStepWithInvalidNextStep() throws Exception { | ||
createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100)); | ||
createIndexWithSettings( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -184,11 +184,14 @@ void onMaster(ClusterState clusterState) { | |
maybeScheduleJob(); | ||
|
||
for (var projectId : clusterState.metadata().projects().keySet()) { | ||
onMaster(clusterState.projectState(projectId)); | ||
maybeRunAsyncActions(clusterState.projectState(projectId)); | ||
} | ||
} | ||
|
||
void onMaster(ProjectState state) { | ||
/** | ||
* Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped. | ||
*/ | ||
private void maybeRunAsyncActions(ProjectState state) { | ||
final ProjectMetadata projectMetadata = state.metadata(); | ||
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE); | ||
if (currentMetadata != null) { | ||
|
@@ -198,67 +201,51 @@ void onMaster(ProjectState state) { | |
} | ||
|
||
boolean safeToStop = true; // true until proven false by a run policy | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes in this method aren't stricly necessary, I just took this opportunity to clean this method up a bit, as it was becoming hard to read. If there are concerns with this refactor, I can revert these stylistic changes and stick to the bug fix. |
||
// If we just became master, we need to kick off any async actions that | ||
// may have not been run due to master rollover | ||
for (IndexMetadata idxMeta : projectMetadata.indices().values()) { | ||
if (projectMetadata.isIndexManagedByILM(idxMeta)) { | ||
String policyName = idxMeta.getLifecyclePolicyName(); | ||
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState(); | ||
StepKey stepKey = Step.getCurrentStepKey(lifecycleState); | ||
|
||
try { | ||
if (OperationMode.STOPPING == currentMode) { | ||
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) { | ||
logger.info( | ||
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]", | ||
idxMeta.getIndex().getName(), | ||
policyName, | ||
stepKey.name() | ||
); | ||
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); | ||
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop | ||
safeToStop = false; | ||
} else { | ||
logger.info( | ||
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping", | ||
stepKey == null ? "n/a" : stepKey.name(), | ||
idxMeta.getIndex().getName(), | ||
policyName | ||
); | ||
} | ||
} else { | ||
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); | ||
} | ||
} catch (Exception e) { | ||
if (logger.isTraceEnabled()) { | ||
logger.warn( | ||
() -> format( | ||
"async action execution failed during master election trigger" | ||
+ " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]", | ||
idxMeta.getIndex().getName(), | ||
policyName, | ||
stepKey, | ||
lifecycleState.asMap() | ||
), | ||
e | ||
); | ||
} else { | ||
logger.warn( | ||
() -> format( | ||
"async action execution failed during master election trigger" | ||
+ " for index [%s] with policy [%s] in step [%s]", | ||
idxMeta.getIndex().getName(), | ||
policyName, | ||
stepKey | ||
), | ||
e | ||
); | ||
if (projectMetadata.isIndexManagedByILM(idxMeta) == false) { | ||
continue; | ||
} | ||
String policyName = idxMeta.getLifecyclePolicyName(); | ||
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState(); | ||
StepKey stepKey = Step.getCurrentStepKey(lifecycleState); | ||
|
||
} | ||
// Don't rethrow the exception, we don't want a failure for one index to be | ||
// called to cause actions not to be triggered for further indices | ||
try { | ||
if (currentMode == OperationMode.RUNNING) { | ||
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); | ||
continue; | ||
} | ||
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) { | ||
|
||
logger.info( | ||
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]", | ||
idxMeta.getIndex().getName(), | ||
policyName, | ||
stepKey.name() | ||
); | ||
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey); | ||
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop | ||
safeToStop = false; | ||
} else { | ||
logger.info( | ||
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping", | ||
stepKey == null ? "n/a" : stepKey.name(), | ||
idxMeta.getIndex().getName(), | ||
policyName | ||
); | ||
} | ||
} catch (Exception e) { | ||
String logMessage = format( | ||
"async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]", | ||
idxMeta.getIndex().getName(), | ||
policyName, | ||
stepKey | ||
); | ||
if (logger.isTraceEnabled()) { | ||
logMessage += format(", lifecycle state: [%s]", lifecycleState.asMap()); | ||
} | ||
logger.warn(logMessage, e); | ||
|
||
// Don't rethrow the exception, we don't want a failure for one index to be | ||
// called to cause actions not to be triggered for further indices | ||
} | ||
} | ||
|
||
|
@@ -333,6 +320,20 @@ public void clusterChanged(ClusterChangedEvent event) { | |
cancelJob(); | ||
policyRegistry.clear(); | ||
} | ||
} else if (this.isMaster) { | ||
// If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING. | ||
// If so, kick off any async actions that may not have run while not in RUNNING mode. | ||
for (ProjectMetadata project : event.state().metadata().projects().values()) { | ||
final var previousProject = event.previousState().metadata().projects().get(project.id()); | ||
if (previousProject == null || project == previousProject) { | ||
continue; | ||
} | ||
final OperationMode currentMode = currentILMMode(project); | ||
final OperationMode previousMode = currentILMMode(previousProject); | ||
if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) { | ||
maybeRunAsyncActions(event.state().projectState(project.id())); | ||
} | ||
} | ||
} | ||
|
||
// if we're the master, then process deleted indices and trigger policies | ||
|
Uh oh!
There was an error while loading. Please reload this page.