diff --git a/docs/changelog/126073.yaml b/docs/changelog/126073.yaml new file mode 100644 index 0000000000000..3f4f6af87af5d --- /dev/null +++ b/docs/changelog/126073.yaml @@ -0,0 +1,11 @@ +pr: 126073 +summary: Add fallback in ILM to run cluster state steps periodically +area: ILM+SLM +type: enhancement +issues: + - 125683 + - 126354 + - 126053 + - 125911 + - 125867 + - 125789 diff --git a/muted-tests.yml b/muted-tests.yml index 102ed16b5b87a..102772021344e 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -338,9 +338,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test010Install issue: https://github.com/elastic/elasticsearch/issues/125680 -- class: org.elasticsearch.xpack.ilm.actions.SearchableSnapshotActionIT - method: testSearchableSnapshotsInHotPhasePinnedToHotNodes - issue: https://github.com/elastic/elasticsearch/issues/125683 - class: org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportActionIT method: testAlreadyUpToDateDataStream issue: https://github.com/elastic/elasticsearch/issues/125727 @@ -362,9 +359,6 @@ tests: - class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests method: testRecreateTemplateWhenDeleted issue: https://github.com/elastic/elasticsearch/issues/123232 -- class: org.elasticsearch.xpack.ilm.TimeSeriesDataStreamsIT - method: testSearchableSnapshotAction - issue: https://github.com/elastic/elasticsearch/issues/125867 - class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT method: testDataStreamLifecycleDownsampleRollingRestart issue: https://github.com/elastic/elasticsearch/issues/123769 @@ -380,9 +374,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.ManyShardsIT method: testCancelUnnecessaryRequests issue: https://github.com/elastic/elasticsearch/issues/125947 -- class: org.elasticsearch.xpack.ilm.actions.SearchableSnapshotActionIT - method: testResumingSearchableSnapshotFromPartialToFull - issue: https://github.com/elastic/elasticsearch/issues/125789 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats with timeout} issue: https://github.com/elastic/elasticsearch/issues/125975 diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 4fe3a075ec03c..b95ea9b2745cb 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -114,10 +114,11 @@ public void testSearchableSnapshotAction() throws Exception { } }, 30, TimeUnit.SECONDS)); - assertBusy(() -> { - triggerStateChange(); - assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); - }, 30, TimeUnit.SECONDS); + assertBusy( + () -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); }, + 30, + TimeUnit.SECONDS + ); } public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exception { @@ -174,10 +175,11 @@ public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exceptio } }, 60, TimeUnit.SECONDS)); - assertBusy(() -> { - triggerStateChange(); - assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); - }, 30, TimeUnit.SECONDS); + assertBusy( + () -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); }, + 30, + TimeUnit.SECONDS + ); } @SuppressWarnings("unchecked") @@ -315,7 +317,6 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws }, 30, TimeUnit.SECONDS)); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredIndexName); assertThat(stepKeyForIndex.phase(), is("hot")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -338,7 +339,6 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws // even though the index is now mounted as a searchable snapshot, the actions that can't operate on it should // skip and ILM should not be blocked (not should the managed index move into the ERROR step) assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredIndexName); assertThat(stepKeyForIndex.phase(), is("cold")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -394,7 +394,6 @@ public void testRestoredIndexManagedByLocalPolicySkipsIllegalActions() throws Ex }, 30, TimeUnit.SECONDS)); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName); assertThat(stepKeyForIndex.phase(), is("hot")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -499,7 +498,6 @@ public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName); assertThat(stepKeyForIndex.phase(), is("cold")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -561,7 +559,6 @@ public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName); assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -644,7 +641,6 @@ public void testResumingSearchableSnapshotFromFullToPartial() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), fullMountedIndexName); assertThat(stepKeyForIndex.phase(), is("cold")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -665,7 +661,6 @@ public void testResumingSearchableSnapshotFromFullToPartial() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partiallyMountedIndexName); assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -755,7 +750,6 @@ public void testResumingSearchableSnapshotFromPartialToFull() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partialMountedIndexName); assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -776,7 +770,6 @@ public void testResumingSearchableSnapshotFromPartialToFull() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredPartiallyMountedIndexName); assertThat(stepKeyForIndex.phase(), is("cold")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -936,10 +929,11 @@ public void testSearchableSnapshotInvokesAsyncActionOnNewIndex() throws Exceptio } }, 30, TimeUnit.SECONDS)); - assertBusy(() -> { - triggerStateChange(); - assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); - }, 30, TimeUnit.SECONDS); + assertBusy( + () -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); }, + 30, + TimeUnit.SECONDS + ); } public void testSearchableSnapshotTotalShardsPerNode() throws Exception { @@ -980,7 +974,6 @@ public void testSearchableSnapshotTotalShardsPerNode() throws Exception { assertTrue(indexExists(searchableSnapMountedIndexName)); }, 30, TimeUnit.SECONDS); assertBusy(() -> { - triggerStateChange(); Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName); assertThat(stepKeyForIndex.phase(), is("frozen")); assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); @@ -1044,7 +1037,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception { // check that the index is in the expected step and has the expected step_info.message assertBusy(() -> { - triggerStateChange(); Map explainResponse = explainIndex(client(), restoredIndexName); assertThat(explainResponse.get("step"), is(WaitUntilReplicateForTimePassesStep.NAME)); @SuppressWarnings("unchecked") @@ -1082,7 +1074,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception { // check that the index has progressed because enough time has passed now that the policy is different assertBusy(() -> { - triggerStateChange(); Map explainResponse = explainIndex(client(), restoredIndexName); assertThat(explainResponse.get("phase"), is("cold")); assertThat(explainResponse.get("step"), is(PhaseCompleteStep.NAME)); @@ -1097,15 +1088,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception { } } - /** - * Cause a bit of cluster activity using an empty reroute call in case the `wait-for-index-colour` ILM step missed the - * notification that partial-index is now GREEN. - */ - private void triggerStateChange() throws IOException { - Request rerouteRequest = new Request("POST", "/_cluster/reroute"); - client().performRequest(rerouteRequest); - } - private Step.StepKey getKeyForIndex(Response response, String indexName) throws IOException { Map responseMap; try (InputStream is = response.getEntity().getContent()) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 5b5f692674723..07b2ed0745014 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -93,11 +94,12 @@ public class IndexLifecycleService private final IndexLifecycleRunner lifecycleRunner; private final Settings settings; private final ClusterService clusterService; - private final ThreadPool threadPool; private final LongSupplier nowSupplier; private final ExecutorService managementExecutor; /** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */ private final AtomicReference lastSeenState = new AtomicReference<>(); + /** A boolean indicating whether we received a cluster state since the last periodic run. */ + private final AtomicBoolean didReceiveClusterStateSinceLastPeriodicRun = new AtomicBoolean(); private SchedulerEngine.Job scheduledJob; @@ -116,7 +118,6 @@ public IndexLifecycleService( super(); this.settings = settings; this.clusterService = clusterService; - this.threadPool = threadPool; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; @@ -349,6 +350,9 @@ public void clusterChanged(ClusterChangedEvent event) { }); } + // Store that a new custer state update came in. + didReceiveClusterStateSinceLastPeriodicRun.set(true); + // Only start processing the new cluster state if we're not already processing one. // Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet. // This means that when ILM's cluster state processing takes longer than the overall cluster state application or when @@ -445,9 +449,34 @@ private void cancelJob() { @Override public void triggered(SchedulerEngine.Event event) { - if (event.jobName().equals(XPackField.INDEX_LIFECYCLE)) { - logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime()); - triggerPolicies(clusterService.state(), false); + if (event.jobName().equals(XPackField.INDEX_LIFECYCLE) == false) { + assert false : "Expected scheduler event to be for ILM"; + return; + } + logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime()); + + triggerPolicies(clusterService.state(), false); + + // Check if we've received at least one cluster state update since the last periodic run. + // If not, we run all policies as if we just received a cluster state update. If there are any policies/indices currently in a step + // that is waiting for the next cluster state update (e.g. `wait-for-index-color`), they will get unstuck with this fallback. + if (didReceiveClusterStateSinceLastPeriodicRun.getAndSet(false) == false) { + // If a new cluster state came in while/after running the above check, or if ILM is still processing the last cluster state + // update that came in before the previous periodic run (i.e. when the processing thread is still in the queue), + // `lastSeenState` will be non-null, meaning we don't need to trigger the polices. That's why we only update `lastSeenState` + // if it was null before - to avoid redundant processing. + final var stateCurrentlyBeingProcessed = lastSeenState.compareAndExchange(null, clusterService.state()); + if (stateCurrentlyBeingProcessed == null) { + logger.info("ILM didn't receive a new cluster state for [{}]. Running cluster state steps now", pollInterval); + processClusterState(); + } else { + logger.warn( + "ILM didn't receive a new cluster state for [{}] but it was still processing cluster state version [{}]", + pollInterval, + stateCurrentlyBeingProcessed.version() + ); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index e70b1be1d108d..05962ffcc8b5c 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.Lifecycle.State; -import org.elasticsearch.common.scheduler.SchedulerEngine; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -482,13 +481,6 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) ilmService.clusterChanged(new ClusterChangedEvent("_source", currentState, ClusterState.EMPTY_STATE)); } - public void testTriggeredDifferentJob() { - Mockito.reset(clusterService); - SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event("foo", randomLong(), randomLong()); - indexLifecycleService.triggered(schedulerEvent); - Mockito.verifyNoMoreInteractions(indicesClient, clusterService); - } - public void testParsingOriginationDateBeforeIndexCreation() { Settings indexSettings = Settings.builder().put(IndexSettings.LIFECYCLE_PARSE_ORIGINATION_DATE, true).build(); Index index = new Index("invalid_index_name", UUID.randomUUID().toString());