Skip to content

Commit 227e800

Browse files
committed
Don't run async actions when ILM is stopped
Today ILM will run `AsyncActionStep`s even if ILM is stopped. These actions are started either as callbacks after previous actions complete or when the move-to-step API is used. Fixes #81234 Fixes #85097 Fixes #99859
1 parent 0814e2f commit 227e800

File tree

3 files changed

+135
-60
lines changed

3 files changed

+135
-60
lines changed

x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.http.util.EntityUtils;
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
1314
import org.elasticsearch.client.Request;
1415
import org.elasticsearch.client.ResponseException;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -19,18 +20,22 @@
1920
import org.elasticsearch.xpack.core.ilm.DeleteAction;
2021
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
2122
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
23+
import org.elasticsearch.xpack.core.ilm.Phase;
2224
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
25+
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
2326
import org.elasticsearch.xpack.core.ilm.RolloverAction;
2427
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
2528
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
2629
import org.junit.Before;
2730

2831
import java.util.Locale;
32+
import java.util.Map;
2933
import java.util.concurrent.TimeUnit;
3034

3135
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
3236
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
3337
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
38+
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy;
3439
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
3540
import static org.elasticsearch.xpack.TimeSeriesRestDriver.index;
3641
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
@@ -51,6 +56,7 @@ public void refreshIndex() {
5156
index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
5257
policy = "policy-" + randomAlphaOfLength(5);
5358
alias = "alias-" + randomAlphaOfLength(5);
59+
logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy);
5460
}
5561

5662
public void testMoveToAllocateStep() throws Exception {
@@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception {
245251
assertBusy(() -> { indexExists("test-000002"); });
246252
}
247253

254+
/**
255+
* Test that an async action does not execute when the Move To Step API is used while ILM is stopped.
256+
* Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process
257+
* never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the
258+
* runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that
259+
* the index is not stuck in the async action step.
260+
*/
261+
public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception {
262+
String originalIndex = index + "-000001";
263+
// Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep.
264+
var actions = Map.of(
265+
"rollover",
266+
new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()),
267+
"readonly",
268+
new ReadOnlyAction()
269+
);
270+
Phase phase = new Phase("hot", TimeValue.ZERO, actions);
271+
createPolicy(client(), policy, phase, null, null, null, null);
272+
273+
createIndexWithSettings(
274+
client(),
275+
originalIndex,
276+
alias,
277+
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
278+
);
279+
280+
// Wait for ILM to do everything it can for this index
281+
assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex)));
282+
283+
// Stop ILM
284+
client().performRequest(new Request("POST", "/_ilm/stop"));
285+
286+
// Move ILM to the readonly step, which is an async action step.
287+
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
288+
moveToStepRequest.setJsonEntity("""
289+
{
290+
"current_step": {
291+
"phase": "hot",
292+
"action": "rollover",
293+
"name": "check-rollover-ready"
294+
},
295+
"next_step": {
296+
"phase": "hot",
297+
"action": "readonly",
298+
"name": "readonly"
299+
}
300+
}""");
301+
client().performRequest(moveToStepRequest);
302+
303+
// Since ILM is stopped, the async action should not execute and the index should remain in the readonly step.
304+
// This is the tricky part of the test, as we can't really verify that the async action will never happen.
305+
assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex));
306+
307+
// Restart ILM
308+
client().performRequest(new Request("POST", "/_ilm/start"));
309+
310+
// Make sure we actually complete the remainder of the policy after ILM is started again.
311+
assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex)));
312+
}
313+
248314
public void testMoveToStepWithInvalidNextStep() throws Exception {
249315
createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100));
250316
createIndexWithSettings(

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
3434
import org.elasticsearch.xpack.core.ilm.ErrorStep;
3535
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
36+
import org.elasticsearch.xpack.core.ilm.OperationMode;
3637
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
3738
import org.elasticsearch.xpack.core.ilm.Step;
3839
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
@@ -48,6 +49,7 @@
4849

4950
import static org.elasticsearch.core.Strings.format;
5051
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
52+
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;
5153

5254
class IndexLifecycleRunner {
5355
private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
@@ -308,6 +310,12 @@ void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey cur
308310
void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) {
309311
final var projectId = state.projectId();
310312
String index = indexMetadata.getIndex().getName();
313+
OperationMode currentMode = currentILMMode(state.metadata());
314+
if (OperationMode.RUNNING.equals(currentMode) == false) {
315+
logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode);
316+
return;
317+
}
318+
311319
if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) {
312320
logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP);
313321
return;

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 61 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,14 @@ void onMaster(ClusterState clusterState) {
184184
maybeScheduleJob();
185185

186186
for (var projectId : clusterState.metadata().projects().keySet()) {
187-
onMaster(clusterState.projectState(projectId));
187+
maybeRunAsyncActions(clusterState.projectState(projectId));
188188
}
189189
}
190190

191-
void onMaster(ProjectState state) {
191+
/**
192+
* Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped.
193+
*/
194+
private void maybeRunAsyncActions(ProjectState state) {
192195
final ProjectMetadata projectMetadata = state.metadata();
193196
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
194197
if (currentMetadata != null) {
@@ -198,67 +201,51 @@ void onMaster(ProjectState state) {
198201
}
199202

200203
boolean safeToStop = true; // true until proven false by a run policy
201-
202-
// If we just became master, we need to kick off any async actions that
203-
// may have not been run due to master rollover
204204
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
205-
if (projectMetadata.isIndexManagedByILM(idxMeta)) {
206-
String policyName = idxMeta.getLifecyclePolicyName();
207-
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
208-
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);
209-
210-
try {
211-
if (OperationMode.STOPPING == currentMode) {
212-
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
213-
logger.info(
214-
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
215-
idxMeta.getIndex().getName(),
216-
policyName,
217-
stepKey.name()
218-
);
219-
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
220-
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
221-
safeToStop = false;
222-
} else {
223-
logger.info(
224-
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
225-
stepKey == null ? "n/a" : stepKey.name(),
226-
idxMeta.getIndex().getName(),
227-
policyName
228-
);
229-
}
230-
} else {
231-
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
232-
}
233-
} catch (Exception e) {
234-
if (logger.isTraceEnabled()) {
235-
logger.warn(
236-
() -> format(
237-
"async action execution failed during master election trigger"
238-
+ " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]",
239-
idxMeta.getIndex().getName(),
240-
policyName,
241-
stepKey,
242-
lifecycleState.asMap()
243-
),
244-
e
245-
);
246-
} else {
247-
logger.warn(
248-
() -> format(
249-
"async action execution failed during master election trigger"
250-
+ " for index [%s] with policy [%s] in step [%s]",
251-
idxMeta.getIndex().getName(),
252-
policyName,
253-
stepKey
254-
),
255-
e
256-
);
205+
if (projectMetadata.isIndexManagedByILM(idxMeta) == false) {
206+
continue;
207+
}
208+
String policyName = idxMeta.getLifecyclePolicyName();
209+
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
210+
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);
257211

258-
}
259-
// Don't rethrow the exception, we don't want a failure for one index to be
260-
// called to cause actions not to be triggered for further indices
212+
try {
213+
if (currentMode == OperationMode.RUNNING) {
214+
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
215+
continue;
216+
}
217+
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
218+
logger.info(
219+
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
220+
idxMeta.getIndex().getName(),
221+
policyName,
222+
stepKey.name()
223+
);
224+
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
225+
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
226+
safeToStop = false;
227+
} else {
228+
logger.info(
229+
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
230+
stepKey == null ? "n/a" : stepKey.name(),
231+
idxMeta.getIndex().getName(),
232+
policyName
233+
);
234+
}
235+
} catch (Exception e) {
236+
String format = format(
237+
"async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]",
238+
idxMeta.getIndex().getName(),
239+
policyName,
240+
stepKey
241+
);
242+
if (logger.isTraceEnabled()) {
243+
format += format(", lifecycle state: [%s]", lifecycleState.asMap());
261244
}
245+
logger.warn(format, e);
246+
247+
// Don't rethrow the exception, we don't want a failure for one index to be
248+
// called to cause actions not to be triggered for further indices
262249
}
263250
}
264251

@@ -333,6 +320,20 @@ public void clusterChanged(ClusterChangedEvent event) {
333320
cancelJob();
334321
policyRegistry.clear();
335322
}
323+
} else if (this.isMaster) {
324+
// If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING.
325+
// If so, kick off any async actions that may not have run while not in RUNNING mode.
326+
for (ProjectMetadata project : event.state().metadata().projects().values()) {
327+
final var previousProject = event.previousState().metadata().projects().get(project.id());
328+
if (previousProject == null || project == previousProject) {
329+
continue;
330+
}
331+
final OperationMode currentMode = currentILMMode(project);
332+
final OperationMode previousMode = currentILMMode(previousProject);
333+
if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) {
334+
maybeRunAsyncActions(event.state().projectState(project.id()));
335+
}
336+
}
336337
}
337338

338339
// if we're the master, then process deleted indices and trigger policies

0 commit comments

Comments
 (0)