Skip to content

Commit 2b73a6c

Browse files
authored
Don't run async actions when ILM is stopped (#133683)
Today ILM will run `AsyncActionStep`s even if ILM is stopped. These actions are started either as callbacks after previous actions complete or when the move-to-step API is used. By checking the ILM operation mode before running the action in `IndexLifecycleRunner#maybeRunAsyncAction`, we prevent these actions from being executed while ILM is stopped. `AsyncActionStep`s are currently only automatically started as callbacks after previous actions complete or after a master failover. To ensure that these steps will be executed when ILM is restarted after a stop, we loop over all the managed indices and start all async action steps. Fixes #81234 Fixes #85097 Fixes #99859
1 parent 374b96b commit 2b73a6c

File tree

5 files changed

+186
-73
lines changed

5 files changed

+186
-73
lines changed

docs/changelog/133683.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
pr: 133683
2+
summary: Avoid running asynchronous ILM actions while ILM is stopped
3+
area: ILM+SLM
4+
type: bug
5+
issues:
6+
- 99859
7+
- 81234
8+
- 85097

x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.http.util.EntityUtils;
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
1314
import org.elasticsearch.client.Request;
1415
import org.elasticsearch.client.ResponseException;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -19,18 +20,22 @@
1920
import org.elasticsearch.xpack.core.ilm.DeleteAction;
2021
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
2122
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
23+
import org.elasticsearch.xpack.core.ilm.Phase;
2224
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
25+
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
2326
import org.elasticsearch.xpack.core.ilm.RolloverAction;
2427
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
2528
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
2629
import org.junit.Before;
2730

2831
import java.util.Locale;
32+
import java.util.Map;
2933
import java.util.concurrent.TimeUnit;
3034

3135
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
3236
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
3337
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
38+
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy;
3439
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
3540
import static org.elasticsearch.xpack.TimeSeriesRestDriver.index;
3641
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
@@ -51,6 +56,7 @@ public void refreshIndex() {
5156
index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
5257
policy = "policy-" + randomAlphaOfLength(5);
5358
alias = "alias-" + randomAlphaOfLength(5);
59+
logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy);
5460
}
5561

5662
public void testMoveToAllocateStep() throws Exception {
@@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception {
245251
assertBusy(() -> { indexExists("test-000002"); });
246252
}
247253

254+
/**
255+
* Test that an async action does not execute when the Move To Step API is used while ILM is stopped.
256+
* Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process
257+
* never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the
258+
* runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that
259+
* the index is not stuck in the async action step.
260+
*/
261+
public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception {
262+
String originalIndex = index + "-000001";
263+
// Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep.
264+
var actions = Map.of(
265+
"rollover",
266+
new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()),
267+
"readonly",
268+
new ReadOnlyAction()
269+
);
270+
Phase phase = new Phase("hot", TimeValue.ZERO, actions);
271+
createPolicy(client(), policy, phase, null, null, null, null);
272+
273+
createIndexWithSettings(
274+
client(),
275+
originalIndex,
276+
alias,
277+
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
278+
);
279+
280+
// Wait for ILM to do everything it can for this index
281+
assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex)));
282+
283+
// Stop ILM
284+
client().performRequest(new Request("POST", "/_ilm/stop"));
285+
286+
// Move ILM to the readonly step, which is an async action step.
287+
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
288+
moveToStepRequest.setJsonEntity("""
289+
{
290+
"current_step": {
291+
"phase": "hot",
292+
"action": "rollover",
293+
"name": "check-rollover-ready"
294+
},
295+
"next_step": {
296+
"phase": "hot",
297+
"action": "readonly",
298+
"name": "readonly"
299+
}
300+
}""");
301+
client().performRequest(moveToStepRequest);
302+
303+
// Since ILM is stopped, the async action should not execute and the index should remain in the readonly step.
304+
// This is the tricky part of the test, as we can't really verify that the async action will never happen.
305+
assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex));
306+
307+
// Restart ILM
308+
client().performRequest(new Request("POST", "/_ilm/start"));
309+
310+
// Make sure we actually complete the remainder of the policy after ILM is started again.
311+
assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex)));
312+
}
313+
248314
public void testMoveToStepWithInvalidNextStep() throws Exception {
249315
createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100));
250316
createIndexWithSettings(

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
3434
import org.elasticsearch.xpack.core.ilm.ErrorStep;
3535
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
36+
import org.elasticsearch.xpack.core.ilm.OperationMode;
3637
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
3738
import org.elasticsearch.xpack.core.ilm.Step;
3839
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
@@ -48,6 +49,7 @@
4849

4950
import static org.elasticsearch.core.Strings.format;
5051
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
52+
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;
5153

5254
class IndexLifecycleRunner {
5355
private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
@@ -308,6 +310,12 @@ void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey cur
308310
void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) {
309311
final var projectId = state.projectId();
310312
String index = indexMetadata.getIndex().getName();
313+
OperationMode currentMode = currentILMMode(state.metadata());
314+
if (OperationMode.RUNNING.equals(currentMode) == false) {
315+
logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode);
316+
return;
317+
}
318+
311319
if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) {
312320
logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP);
313321
return;

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 76 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -184,88 +184,78 @@ void onMaster(ClusterState clusterState) {
184184
maybeScheduleJob();
185185

186186
for (var projectId : clusterState.metadata().projects().keySet()) {
187-
onMaster(clusterState.projectState(projectId));
187+
maybeRunAsyncActions(clusterState.projectState(projectId));
188188
}
189189
}
190190

191-
void onMaster(ProjectState state) {
191+
/**
192+
* Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped.
193+
*/
194+
private void maybeRunAsyncActions(ProjectState state) {
192195
final ProjectMetadata projectMetadata = state.metadata();
193196
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
194-
if (currentMetadata != null) {
195-
OperationMode currentMode = currentILMMode(projectMetadata);
196-
if (OperationMode.STOPPED.equals(currentMode)) {
197-
return;
198-
}
199-
200-
boolean safeToStop = true; // true until proven false by a run policy
201-
202-
// If we just became master, we need to kick off any async actions that
203-
// may have not been run due to master rollover
204-
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
205-
if (projectMetadata.isIndexManagedByILM(idxMeta)) {
206-
String policyName = idxMeta.getLifecyclePolicyName();
207-
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
208-
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);
209-
210-
try {
211-
if (OperationMode.STOPPING == currentMode) {
212-
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
213-
logger.info(
214-
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
215-
idxMeta.getIndex().getName(),
216-
policyName,
217-
stepKey.name()
218-
);
219-
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
220-
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
221-
safeToStop = false;
222-
} else {
223-
logger.info(
224-
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
225-
stepKey == null ? "n/a" : stepKey.name(),
226-
idxMeta.getIndex().getName(),
227-
policyName
228-
);
229-
}
230-
} else {
231-
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
232-
}
233-
} catch (Exception e) {
234-
if (logger.isTraceEnabled()) {
235-
logger.warn(
236-
() -> format(
237-
"async action execution failed during master election trigger"
238-
+ " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]",
239-
idxMeta.getIndex().getName(),
240-
policyName,
241-
stepKey,
242-
lifecycleState.asMap()
243-
),
244-
e
245-
);
246-
} else {
247-
logger.warn(
248-
() -> format(
249-
"async action execution failed during master election trigger"
250-
+ " for index [%s] with policy [%s] in step [%s]",
251-
idxMeta.getIndex().getName(),
252-
policyName,
253-
stepKey
254-
),
255-
e
256-
);
197+
if (currentMetadata == null) {
198+
return;
199+
}
200+
OperationMode currentMode = currentILMMode(projectMetadata);
201+
if (OperationMode.STOPPED.equals(currentMode)) {
202+
return;
203+
}
257204

258-
}
259-
// Don't rethrow the exception, we don't want a failure for one index to be
260-
// called to cause actions not to be triggered for further indices
261-
}
262-
}
205+
boolean safeToStop = true; // true until proven false by a run policy
206+
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
207+
if (projectMetadata.isIndexManagedByILM(idxMeta) == false) {
208+
continue;
263209
}
210+
String policyName = idxMeta.getLifecyclePolicyName();
211+
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
212+
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);
213+
214+
try {
215+
if (currentMode == OperationMode.RUNNING) {
216+
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
217+
continue;
218+
}
219+
// We only get here if ILM is in STOPPING mode. In that case, we need to check if there is any index that is in a step
220+
// that we can't stop ILM in. If there is, we don't stop ILM yet.
221+
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
222+
logger.info(
223+
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
224+
idxMeta.getIndex().getName(),
225+
policyName,
226+
stepKey.name()
227+
);
228+
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
229+
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
230+
safeToStop = false;
231+
} else {
232+
logger.info(
233+
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
234+
stepKey == null ? "n/a" : stepKey.name(),
235+
idxMeta.getIndex().getName(),
236+
policyName
237+
);
238+
}
239+
} catch (Exception e) {
240+
String logMessage = format(
241+
"async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]",
242+
idxMeta.getIndex().getName(),
243+
policyName,
244+
stepKey
245+
);
246+
if (logger.isTraceEnabled()) {
247+
logMessage += format(", lifecycle state: [%s]", lifecycleState.asMap());
248+
}
249+
logger.warn(logMessage, e);
264250

265-
if (safeToStop && OperationMode.STOPPING == currentMode) {
266-
stopILM(state.projectId());
251+
// Don't rethrow the exception, we don't want a failure for one index to be
252+
// called to cause actions not to be triggered for further indices
267253
}
268254
}
255+
256+
if (safeToStop && OperationMode.STOPPING == currentMode) {
257+
stopILM(state.projectId());
258+
}
269259
}
270260

271261
private void stopILM(ProjectId projectId) {
@@ -333,6 +323,20 @@ public void clusterChanged(ClusterChangedEvent event) {
333323
cancelJob();
334324
policyRegistry.clear();
335325
}
326+
} else if (this.isMaster) {
327+
// If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING.
328+
// If so, kick off any async actions that may not have run while not in RUNNING mode.
329+
for (ProjectMetadata project : event.state().metadata().projects().values()) {
330+
final var previousProject = event.previousState().metadata().projects().get(project.id());
331+
if (previousProject == null || project == previousProject) {
332+
continue;
333+
}
334+
final OperationMode currentMode = currentILMMode(project);
335+
final OperationMode previousMode = currentILMMode(previousProject);
336+
if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) {
337+
maybeRunAsyncActions(event.state().projectState(project.id()));
338+
}
339+
}
336340
}
337341

338342
// if we're the master, then process deleted indices and trigger policies

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,34 @@ public void testSkip_asyncAction() {
265265
Mockito.verifyNoMoreInteractions(clusterService);
266266
}
267267

268+
/**
269+
* Test that an async action step is not executed when ILM is stopped.
270+
*/
271+
public void testNotRunningAsyncActionWhenILMIsStopped() {
272+
String policyName = "stopped_policy";
273+
Step.StepKey stepKey = new Step.StepKey("phase", "action", "async_action_step");
274+
275+
MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
276+
277+
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
278+
ClusterService clusterService = mock(ClusterService.class);
279+
newMockTaskQueue(clusterService); // ensure constructor call to createTaskQueue is satisfied
280+
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
281+
282+
IndexMetadata indexMetadata = IndexMetadata.builder("test")
283+
.settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_NAME, policyName))
284+
.build();
285+
286+
IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.STOPPED);
287+
final var project = ProjectMetadata.builder(randomProjectIdOrDefault())
288+
.put(indexMetadata, true)
289+
.putCustom(IndexLifecycleMetadata.TYPE, ilm)
290+
.build();
291+
runner.maybeRunAsyncAction(projectStateFromProject(project), indexMetadata, policyName, stepKey);
292+
293+
assertThat(step.getExecuteCount(), equalTo(0L));
294+
}
295+
268296
public void testRunPolicyErrorStepOnRetryableFailedStep() {
269297
String policyName = "rollover_policy";
270298
String phaseName = "hot";
@@ -586,7 +614,6 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception {
586614
.putProjectMetadata(project)
587615
.nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId()))
588616
.build();
589-
logger.info("--> state: {}", state);
590617
ClusterServiceUtils.setState(clusterService, state);
591618
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
592619

0 commit comments

Comments
 (0)