-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add fallback in ILM to run cluster state steps periodically #126073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| pr: 126073 | ||
| summary: Add fallback in ILM to run cluster state steps periodically | ||
| area: ILM+SLM | ||
| type: enhancement | ||
| issues: | ||
| - 125683 | ||
| - 126354 | ||
| - 126053 | ||
| - 125911 | ||
| - 125867 | ||
| - 125789 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,7 @@ | |
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.LongSupplier; | ||
| import java.util.stream.Collectors; | ||
|
|
@@ -93,11 +94,12 @@ public class IndexLifecycleService | |
| private final IndexLifecycleRunner lifecycleRunner; | ||
| private final Settings settings; | ||
| private final ClusterService clusterService; | ||
| private final ThreadPool threadPool; | ||
| private final LongSupplier nowSupplier; | ||
| private final ExecutorService managementExecutor; | ||
| /** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */ | ||
| private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>(); | ||
| /** A boolean indicating whether we received a cluster state since the last periodic run. */ | ||
| private final AtomicBoolean didReceiveClusterStateSinceLastPeriodicRun = new AtomicBoolean(); | ||
|
|
||
| private SchedulerEngine.Job scheduledJob; | ||
|
|
||
|
|
@@ -116,7 +118,6 @@ public IndexLifecycleService( | |
| super(); | ||
| this.settings = settings; | ||
| this.clusterService = clusterService; | ||
| this.threadPool = threadPool; | ||
| this.clock = clock; | ||
| this.nowSupplier = nowSupplier; | ||
| this.scheduledJob = null; | ||
|
|
@@ -349,6 +350,9 @@ public void clusterChanged(ClusterChangedEvent event) { | |
| }); | ||
| } | ||
|
|
||
| // Store that a new custer state update came in. | ||
| didReceiveClusterStateSinceLastPeriodicRun.set(true); | ||
|
|
||
| // Only start processing the new cluster state if we're not already processing one. | ||
| // Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet. | ||
| // This means that when ILM's cluster state processing takes longer than the overall cluster state application or when | ||
|
|
@@ -445,9 +449,34 @@ private void cancelJob() { | |
|
|
||
| @Override | ||
| public void triggered(SchedulerEngine.Event event) { | ||
| if (event.jobName().equals(XPackField.INDEX_LIFECYCLE)) { | ||
| logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime()); | ||
| triggerPolicies(clusterService.state(), false); | ||
| if (event.jobName().equals(XPackField.INDEX_LIFECYCLE) == false) { | ||
| assert false : "Expected scheduler event to be for ILM"; | ||
| return; | ||
| } | ||
| logger.trace("job triggered: {}, {}, {}", event.jobName(), event.scheduledTime(), event.triggeredTime()); | ||
|
|
||
| triggerPolicies(clusterService.state(), false); | ||
|
|
||
| // Check if we've received at least one cluster state update since the last periodic run. | ||
| // If not, we run all policies as if we just received a cluster state update. If there are any policies/indices currently in a step | ||
| // that is waiting for the next cluster state update (e.g. `wait-for-index-color`), they will get unstuck with this fallback. | ||
| if (didReceiveClusterStateSinceLastPeriodicRun.getAndSet(false) == false) { | ||
| // If a new cluster state came in while/after running the above check, or if ILM is still processing the last cluster state | ||
| // update that came in before the previous periodic run (i.e. when the processing thread is still in the queue), | ||
| // `lastSeenState` will be non-null, meaning we don't need to trigger the polices. That's why we only update `lastSeenState` | ||
| // if it was null before - to avoid redundant processing. | ||
| final var stateCurrentlyBeingProcessed = lastSeenState.compareAndExchange(null, clusterService.state()); | ||
| if (stateCurrentlyBeingProcessed == null) { | ||
| logger.info("ILM didn't receive a new cluster state for [{}]. Running cluster state steps now", pollInterval); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can make this |
||
| processClusterState(); | ||
| } else { | ||
| logger.warn( | ||
| "ILM didn't receive a new cluster state for [{}] but it was still processing cluster state version [{}]", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we clarify this to say something like "the poll interval should be increased" in the log message? In order to give a hint to users running on-prem. |
||
| pollInterval, | ||
| stateCurrentlyBeingProcessed.version() | ||
| ); | ||
| } | ||
|
|
||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.