Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/changelog/126073.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pr: 126073
summary: Add fallback in ILM to run cluster state steps periodically
area: ILM+SLM
type: enhancement
issues:
- 125683
- 126053
- 125911
- 125867
- 125789
9 changes: 0 additions & 9 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test010Install
issue: https://github.com/elastic/elasticsearch/issues/125680
- class: org.elasticsearch.xpack.ilm.actions.SearchableSnapshotActionIT
method: testSearchableSnapshotsInHotPhasePinnedToHotNodes
issue: https://github.com/elastic/elasticsearch/issues/125683
- class: org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportActionIT
method: testAlreadyUpToDateDataStream
issue: https://github.com/elastic/elasticsearch/issues/125727
Expand All @@ -362,9 +359,6 @@ tests:
- class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests
method: testRecreateTemplateWhenDeleted
issue: https://github.com/elastic/elasticsearch/issues/123232
- class: org.elasticsearch.xpack.ilm.TimeSeriesDataStreamsIT
method: testSearchableSnapshotAction
issue: https://github.com/elastic/elasticsearch/issues/125867
- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT
method: testDataStreamLifecycleDownsampleRollingRestart
issue: https://github.com/elastic/elasticsearch/issues/123769
Expand All @@ -380,9 +374,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.ManyShardsIT
method: testCancelUnnecessaryRequests
issue: https://github.com/elastic/elasticsearch/issues/125947
- class: org.elasticsearch.xpack.ilm.actions.SearchableSnapshotActionIT
method: testResumingSearchableSnapshotFromPartialToFull
issue: https://github.com/elastic/elasticsearch/issues/125789
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_stats/Test get transform stats with timeout}
issue: https://github.com/elastic/elasticsearch/issues/125975
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ public void testSearchableSnapshotAction() throws Exception {
}
}, 30, TimeUnit.SECONDS));

assertBusy(() -> {
triggerStateChange();
assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME));
}, 30, TimeUnit.SECONDS);
assertBusy(
() -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); },
30,
TimeUnit.SECONDS
);
}

public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exception {
Expand Down Expand Up @@ -174,10 +175,11 @@ public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exceptio
}
}, 60, TimeUnit.SECONDS));

assertBusy(() -> {
triggerStateChange();
assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME));
}, 30, TimeUnit.SECONDS);
assertBusy(
() -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); },
30,
TimeUnit.SECONDS
);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -315,7 +317,6 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws
}, 30, TimeUnit.SECONDS));

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredIndexName);
assertThat(stepKeyForIndex.phase(), is("hot"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand All @@ -338,7 +339,6 @@ public void testUpdatePolicyToAddPhasesYieldsInvalidActionsToBeSkipped() throws
// even though the index is now mounted as a searchable snapshot, the actions that can't operate on it should
// skip and ILM should not be blocked (not should the managed index move into the ERROR step)
assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredIndexName);
assertThat(stepKeyForIndex.phase(), is("cold"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -394,7 +394,6 @@ public void testRestoredIndexManagedByLocalPolicySkipsIllegalActions() throws Ex
}, 30, TimeUnit.SECONDS));

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("hot"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -499,7 +498,6 @@ public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("cold"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -561,7 +559,6 @@ public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("frozen"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -644,7 +641,6 @@ public void testResumingSearchableSnapshotFromFullToPartial() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), fullMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("cold"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand All @@ -665,7 +661,6 @@ public void testResumingSearchableSnapshotFromFullToPartial() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partiallyMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("frozen"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -755,7 +750,6 @@ public void testResumingSearchableSnapshotFromPartialToFull() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partialMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("frozen"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand All @@ -776,7 +770,6 @@ public void testResumingSearchableSnapshotFromPartialToFull() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredPartiallyMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("cold"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -936,10 +929,11 @@ public void testSearchableSnapshotInvokesAsyncActionOnNewIndex() throws Exceptio
}
}, 30, TimeUnit.SECONDS));

assertBusy(() -> {
triggerStateChange();
assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME));
}, 30, TimeUnit.SECONDS);
assertBusy(
() -> { assertThat(explainIndex(client(), restoredIndexName).get("step"), is(PhaseCompleteStep.NAME)); },
30,
TimeUnit.SECONDS
);
}

public void testSearchableSnapshotTotalShardsPerNode() throws Exception {
Expand Down Expand Up @@ -980,7 +974,6 @@ public void testSearchableSnapshotTotalShardsPerNode() throws Exception {
assertTrue(indexExists(searchableSnapMountedIndexName));
}, 30, TimeUnit.SECONDS);
assertBusy(() -> {
triggerStateChange();
Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
assertThat(stepKeyForIndex.phase(), is("frozen"));
assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME));
Expand Down Expand Up @@ -1044,7 +1037,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception {

// check that the index is in the expected step and has the expected step_info.message
assertBusy(() -> {
triggerStateChange();
Map<String, Object> explainResponse = explainIndex(client(), restoredIndexName);
assertThat(explainResponse.get("step"), is(WaitUntilReplicateForTimePassesStep.NAME));
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1082,7 +1074,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception {

// check that the index has progressed because enough time has passed now that the policy is different
assertBusy(() -> {
triggerStateChange();
Map<String, Object> explainResponse = explainIndex(client(), restoredIndexName);
assertThat(explainResponse.get("phase"), is("cold"));
assertThat(explainResponse.get("step"), is(PhaseCompleteStep.NAME));
Expand All @@ -1097,15 +1088,6 @@ public void testSearchableSnapshotReplicateFor() throws Exception {
}
}

/**
* Cause a bit of cluster activity using an empty reroute call in case the `wait-for-index-colour` ILM step missed the
* notification that partial-index is now GREEN.
*/
private void triggerStateChange() throws IOException {
Request rerouteRequest = new Request("POST", "/_cluster/reroute");
client().performRequest(rerouteRequest);
}

private Step.StepKey getKeyForIndex(Response response, String indexName) throws IOException {
Map<String, Object> responseMap;
try (InputStream is = response.getEntity().getContent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -93,11 +94,12 @@ public class IndexLifecycleService
private final IndexLifecycleRunner lifecycleRunner;
private final Settings settings;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LongSupplier nowSupplier;
private final ExecutorService managementExecutor;
/** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>();
/** A boolean indicating whether we received a cluster state since the last periodic run. */
private final AtomicBoolean didReceiveClusterStateSinceLastPeriodicRun = new AtomicBoolean();

private SchedulerEngine.Job scheduledJob;

Expand All @@ -116,7 +118,6 @@ public IndexLifecycleService(
super();
this.settings = settings;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
Expand Down Expand Up @@ -349,6 +350,9 @@ public void clusterChanged(ClusterChangedEvent event) {
});
}

// Store that a new custer state update came in.
didReceiveClusterStateSinceLastPeriodicRun.set(true);

// Only start processing the new cluster state if we're not already processing one.
// Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet.
// This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
Expand Down Expand Up @@ -445,9 +449,34 @@ private void cancelJob() {

@Override
public void triggered(SchedulerEngine.Event event) {
if (event.jobName().equals(XPackField.INDEX_LIFECYCLE)) {
logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime());
triggerPolicies(clusterService.state(), false);
if (event.jobName().equals(XPackField.INDEX_LIFECYCLE) == false) {
assert false : "Expected scheduler event to be for ILM";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert false : "Expected scheduler event to be for ILM";
assert false : "Expected scheduler event to be for ILM but it was for " + event.jobName();

return;
}
logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime());

triggerPolicies(clusterService.state(), false);

// Check if we've received at least one cluster state update since the last periodic run.
// If not, we run all policies as if we just received a cluster state update. If there are any policies/indices currently in a step
// that is waiting for the next cluster state update (e.g. `wait-for-index-color`), they will get unstuck with this fallback.
if (didReceiveClusterStateSinceLastPeriodicRun.getAndSet(false) == false) {
// If a new cluster state came in while/after running the above check, or if ILM is still processing the last cluster state
// update that came in before the previous periodic run (i.e. when the processing thread is still in the queue),
// `lastSeenState` will be non-null, meaning we don't need to trigger the polices. That's why we only update `lastSeenState`
// if it was null before - to avoid redundant processing.
final var stateCurrentlyBeingProcessed = lastSeenState.compareAndExchange(null, clusterService.state());
if (stateCurrentlyBeingProcessed == null) {
logger.info("ILM didn't receive a new cluster state for [{}]. Running cluster state steps now", pollInterval);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make this debug level

processClusterState();
} else {
logger.warn(
"ILM didn't receive a new cluster state for [{}] but it was still processing cluster state version [{}]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clarify this to say something like "the poll interval should be increased" in the log message? In order to give a hint to users running on-prem.

pollInterval,
stateCurrentlyBeingProcessed.version()
);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.scheduler.SchedulerEngine;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -482,13 +481,6 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
ilmService.clusterChanged(new ClusterChangedEvent("_source", currentState, ClusterState.EMPTY_STATE));
}

public void testTriggeredDifferentJob() {
Mockito.reset(clusterService);
SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event("foo", randomLong(), randomLong());
indexLifecycleService.triggered(schedulerEvent);
Mockito.verifyNoMoreInteractions(indicesClient, clusterService);
}

public void testParsingOriginationDateBeforeIndexCreation() {
Settings indexSettings = Settings.builder().put(IndexSettings.LIFECYCLE_PARSE_ORIGINATION_DATE, true).build();
Index index = new Index("invalid_index_name", UUID.randomUUID().toString());
Expand Down
Loading