Skip to content

Commit 9f9bdb2

Browse files
committed
Add fallback in ILM to run cluster state steps periodically
1 parent a97e006 commit 9f9bdb2

File tree

3 files changed

+37
-45
lines changed

3 files changed

+37
-45
lines changed

muted-tests.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,6 @@ tests:
338338
- class: org.elasticsearch.packaging.test.DockerTests
339339
method: test010Install
340340
issue: https://github.com/elastic/elasticsearch/issues/125680
341-
- class: org.elasticsearch.xpack.ilm.actions.SearchableSnapshotActionIT
342-
method: testSearchableSnapshotsInHotPhasePinnedToHotNodes
343-
issue: https://github.com/elastic/elasticsearch/issues/125683
344341
- class: org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportActionIT
345342
method: testAlreadyUpToDateDataStream
346343
issue: https://github.com/elastic/elasticsearch/issues/125727
@@ -362,9 +359,6 @@ tests:
362359
- class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests
363360
method: testRecreateTemplateWhenDeleted
364361
issue: https://github.com/elastic/elasticsearch/issues/123232
365-
- class: org.elasticsearch.xpack.ilm.TimeSeriesDataStreamsIT
366-
method: testSearchableSnapshotAction
367-
issue: https://github.com/elastic/elasticsearch/issues/125867
368362
- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT
369363
method: testDataStreamLifecycleDownsampleRollingRestart
370364
issue: https://github.com/elastic/elasticsearch/issues/123769
@@ -380,9 +374,6 @@ tests:
380374
- class: org.elasticsearch.xpack.esql.action.ManyShardsIT
381375
method: testCancelUnnecessaryRequests
382376
issue: https://github.com/elastic/elasticsearch/issues/125947
383-
- class: org.elasticsearch.xpack.ilm.actions.SearchableSnapshotActionIT
384-
method: testResumingSearchableSnapshotFromPartialToFull
385-
issue: https://github.com/elastic/elasticsearch/issues/125789
386377
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
387378
method: test {p0=transform/transforms_stats/Test get transform stats with timeout}
388379
issue: https://github.com/elastic/elasticsearch/issues/125975

x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,11 @@ public void testSearchableSnapshotAction() throws Exception {
114114
}
115115
}, 30, TimeUnit.SECONDS));
116116

117-
assertBusy(() -> {
118-
triggerStateChange();
119-
assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME));
120-
}, 30, TimeUnit.SECONDS);
117+
assertBusy(
118+
() -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); },
119+
30,
120+
TimeUnit.SECONDS
121+
);
121122
}
122123

123124
public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exception {
@@ -174,10 +175,11 @@ public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exceptio
174175
}
175176
}, 60, TimeUnit.SECONDS));
176177

177-
assertBusy(() -> {
178-
triggerStateChange();
179-
assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME));
180-
}, 30, TimeUnit.SECONDS);
178+
assertBusy(
179+
() -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); },
180+
30,
181+
TimeUnit.SECONDS
182+
);
181183
}
182184

183185
@SuppressWarnings("unchecked")
@@ -315,7 +317,6 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws
315317
}, 30, TimeUnit.SECONDS));
316318

317319
assertBusy(() -> {
318-
triggerStateChange();
319320
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredIndexName);
320321
assertThat(stepKeyForIndex.phase(), is("hot"));
321322
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -338,7 +339,6 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws
338339
// even though the index is now mounted as a searchable snapshot, the actions that can't operate on it should
339340
// skip and ILM should not be blocked (not should the managed index move into the ERROR step)
340341
assertBusy(() -> {
341-
triggerStateChange();
342342
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredIndexName);
343343
assertThat(stepKeyForIndex.phase(), is("cold"));
344344
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -394,7 +394,6 @@ public void testRestoredIndexManagedByLocalPolicySkipsIllegalActions() throws Ex
394394
}, 30, TimeUnit.SECONDS));
395395

396396
assertBusy(() -> {
397-
triggerStateChange();
398397
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
399398
assertThat(stepKeyForIndex.phase(), is("hot"));
400399
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -499,7 +498,6 @@ public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception {
499498
}, 30, TimeUnit.SECONDS);
500499

501500
assertBusy(() -> {
502-
triggerStateChange();
503501
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
504502
assertThat(stepKeyForIndex.phase(), is("cold"));
505503
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -561,7 +559,6 @@ public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception
561559
}, 30, TimeUnit.SECONDS);
562560

563561
assertBusy(() -> {
564-
triggerStateChange();
565562
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
566563
assertThat(stepKeyForIndex.phase(), is("frozen"));
567564
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -644,7 +641,6 @@ public void testResumingSearchableSnapshotFromFullToPartial() throws Exception {
644641
}, 30, TimeUnit.SECONDS);
645642

646643
assertBusy(() -> {
647-
triggerStateChange();
648644
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), fullMountedIndexName);
649645
assertThat(stepKeyForIndex.phase(), is("cold"));
650646
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -665,7 +661,6 @@ public void testResumingSearchableSnapshotFromFullToPartial() throws Exception {
665661
}, 30, TimeUnit.SECONDS);
666662

667663
assertBusy(() -> {
668-
triggerStateChange();
669664
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partiallyMountedIndexName);
670665
assertThat(stepKeyForIndex.phase(), is("frozen"));
671666
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -755,7 +750,6 @@ public void testResumingSearchableSnapshotFromPartialToFull() throws Exception {
755750
}, 30, TimeUnit.SECONDS);
756751

757752
assertBusy(() -> {
758-
triggerStateChange();
759753
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partialMountedIndexName);
760754
assertThat(stepKeyForIndex.phase(), is("frozen"));
761755
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -776,7 +770,6 @@ public void testResumingSearchableSnapshotFromPartialToFull() throws Exception {
776770
}, 30, TimeUnit.SECONDS);
777771

778772
assertBusy(() -> {
779-
triggerStateChange();
780773
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredPartiallyMountedIndexName);
781774
assertThat(stepKeyForIndex.phase(), is("cold"));
782775
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -936,10 +929,11 @@ public void testSearchableSnapshotInvokesAsyncActionOnNewIndex() throws Exceptio
936929
}
937930
}, 30, TimeUnit.SECONDS));
938931

939-
assertBusy(() -> {
940-
triggerStateChange();
941-
assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME));
942-
}, 30, TimeUnit.SECONDS);
932+
assertBusy(
933+
() -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); },
934+
30,
935+
TimeUnit.SECONDS
936+
);
943937
}
944938

945939
public void testSearchableSnapshotTotalShardsPerNode() throws Exception {
@@ -980,7 +974,6 @@ public void testSearchableSnapshotTotalShardsPerNode() throws Exception {
980974
assertTrue(indexExists(searchableSnapMountedIndexName));
981975
}, 30, TimeUnit.SECONDS);
982976
assertBusy(() -> {
983-
triggerStateChange();
984977
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
985978
assertThat(stepKeyForIndex.phase(), is("frozen"));
986979
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
@@ -1044,7 +1037,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception {
10441037

10451038
// check that the index is in the expected step and has the expected step_info.message
10461039
assertBusy(() -> {
1047-
triggerStateChange();
10481040
Map<String, Object> explainResponse = explainIndex(client(), restoredIndexName);
10491041
assertThat(explainResponse.get("step"), is(WaitUntilReplicateForTimePassesStep.NAME));
10501042
@SuppressWarnings("unchecked")
@@ -1082,7 +1074,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception {
10821074

10831075
// check that the index has progressed because enough time has passed now that the policy is different
10841076
assertBusy(() -> {
1085-
triggerStateChange();
10861077
Map<String, Object> explainResponse = explainIndex(client(), restoredIndexName);
10871078
assertThat(explainResponse.get("phase"), is("cold"));
10881079
assertThat(explainResponse.get("step"), is(PhaseCompleteStep.NAME));
@@ -1097,15 +1088,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception {
10971088
}
10981089
}
10991090

1100-
/**
1101-
* Cause a bit of cluster activity using an empty reroute call in case the `wait-for-index-colour` ILM step missed the
1102-
* notification that partial-index is now GREEN.
1103-
*/
1104-
private void triggerStateChange() throws IOException {
1105-
Request rerouteRequest = new Request("POST", "/_cluster/reroute");
1106-
client().performRequest(rerouteRequest);
1107-
}
1108-
11091091
private Step.StepKey getKeyForIndex(Response response, String indexName) throws IOException {
11101092
Map<String, Object> responseMap;
11111093
try (InputStream is = response.getEntity().getContent()) {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.Map;
6363
import java.util.Set;
6464
import java.util.concurrent.ExecutorService;
65+
import java.util.concurrent.atomic.AtomicBoolean;
6566
import java.util.concurrent.atomic.AtomicReference;
6667
import java.util.function.LongSupplier;
6768
import java.util.stream.Collectors;
@@ -98,6 +99,7 @@ public class IndexLifecycleService
9899
private final ExecutorService managementExecutor;
99100
/** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
100101
private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>();
102+
private final AtomicBoolean didProcessClusterStateSinceLastPeriodicRun = new AtomicBoolean();
101103

102104
private SchedulerEngine.Job scheduledJob;
103105

@@ -354,6 +356,7 @@ public void clusterChanged(ClusterChangedEvent event) {
354356
// This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
355357
// the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some
356358
// cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine.
359+
didProcessClusterStateSinceLastPeriodicRun.set(true);
357360
if (lastSeenState.getAndSet(event.state()) == null) {
358361
processClusterState();
359362
} else {
@@ -445,9 +448,25 @@ private void cancelJob() {
445448

446449
@Override
447450
public void triggered(SchedulerEngine.Event event) {
448-
if (event.jobName().equals(XPackField.INDEX_LIFECYCLE)) {
449-
logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime());
450-
triggerPolicies(clusterService.state(), false);
451+
if (event.jobName().equals(XPackField.INDEX_LIFECYCLE) == false) {
452+
assert false : "Expected scheduler event to be for ILM";
453+
return;
454+
}
455+
logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime());
456+
triggerPolicies(clusterService.state(), false);
457+
458+
// Check if we've processed at least one cluster state update since the last periodic run.
459+
// If not, we run all policies as if there was just a cluster state update to get them unstuck.
460+
if (didProcessClusterStateSinceLastPeriodicRun.getAndSet(false) == false) {
461+
// If a new cluster state came in while/after running the above check, `lastSeenState` would have become non-null, meaning
462+
// we don't need to trigger the polices - hence the `compareAndSet` instead of a `getAndSet`.
463+
if (lastSeenState.compareAndSet(null, clusterService.state())) {
464+
logger.info("ILM didn't process a cluster state for [{}]. Running cluster state steps now", pollInterval);
465+
processClusterState();
466+
} else {
467+
logger.trace("ILM didn't process a cluster state for [{}] but one just came in", pollInterval);
468+
}
469+
451470
}
452471
}
453472

0 commit comments

Comments
 (0)