2626import org .elasticsearch .common .scheduler .SchedulerEngine ;
2727import org .elasticsearch .common .scheduler .TimeValueSchedule ;
2828import org .elasticsearch .common .settings .Settings ;
29+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2930import org .elasticsearch .core .FixForMultiProject ;
3031import org .elasticsearch .core .Nullable ;
3132import org .elasticsearch .core .SuppressForbidden ;
6061import java .util .List ;
6162import java .util .Map ;
6263import java .util .Set ;
64+ import java .util .concurrent .ExecutorService ;
65+ import java .util .concurrent .atomic .AtomicReference ;
6366import java .util .function .LongSupplier ;
6467import java .util .stream .Collectors ;
6568
@@ -90,7 +93,12 @@ public class IndexLifecycleService
9093 private final IndexLifecycleRunner lifecycleRunner ;
9194 private final Settings settings ;
9295 private final ClusterService clusterService ;
96+ private final ThreadPool threadPool ;
9397 private final LongSupplier nowSupplier ;
98+ private final ExecutorService managementExecutor ;
99+ /** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
100+ private final AtomicReference <ClusterState > lastSeenState = new AtomicReference <>();
101+
94102 private SchedulerEngine .Job scheduledJob ;
95103
96104 @ SuppressWarnings ("this-escape" )
@@ -108,12 +116,14 @@ public IndexLifecycleService(
108116 super ();
109117 this .settings = settings ;
110118 this .clusterService = clusterService ;
119+ this .threadPool = threadPool ;
111120 this .clock = clock ;
112121 this .nowSupplier = nowSupplier ;
113122 this .scheduledJob = null ;
114123 this .policyRegistry = new PolicyStepsRegistry (xContentRegistry , client , licenseState );
115124 this .lifecycleRunner = new IndexLifecycleRunner (policyRegistry , ilmHistoryStore , clusterService , threadPool , nowSupplier );
116125 this .pollInterval = LifecycleSettings .LIFECYCLE_POLL_INTERVAL_SETTING .get (settings );
126+ this .managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
117127 clusterService .addStateApplier (this );
118128 clusterService .addListener (this );
119129 clusterService .getClusterSettings ()
@@ -332,17 +342,76 @@ public void clusterChanged(ClusterChangedEvent event) {
332342 // ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary.
333343 final List <Index > indicesDeleted = event .indicesDeleted ();
334344 if (indicesDeleted .isEmpty () == false ) {
335- clusterService . getClusterApplierService (). threadPool (). executor ( ThreadPool . Names . MANAGEMENT ) .execute (() -> {
345+ managementExecutor .execute (() -> {
336346 for (Index index : indicesDeleted ) {
337347 policyRegistry .delete (index );
338348 }
339349 });
340350 }
341351
342- triggerPolicies (event .state (), true );
352+ // Only start processing the new cluster state if we're not already processing one.
353+ // Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet.
354+ // This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
355+ // the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some
356+ // cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine.
357+ if (lastSeenState .getAndSet (event .state ()) == null ) {
358+ processClusterState ();
359+ } else {
360+ logger .trace ("ILM state processor still running, not starting new thread" );
361+ }
343362 }
344363 }
345364
365+ /**
366+ * Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where
367+ * ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states.
368+ * That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally
369+ * unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount
370+ * of processing on the critical cluster state applier thread.
371+ */
372+ private void processClusterState () {
373+ managementExecutor .execute (new AbstractRunnable () {
374+
375+ private final SetOnce <ClusterState > currentState = new SetOnce <>();
376+
377+ @ Override
378+ protected void doRun () throws Exception {
379+ final ClusterState currentState = lastSeenState .get ();
380+ // This should never be null, but we're checking anyway to be sure.
381+ if (currentState == null ) {
382+ assert false : "Expected current state to non-null when processing cluster state in ILM" ;
383+ return ;
384+ }
385+ this .currentState .set (currentState );
386+ triggerPolicies (currentState , true );
387+ }
388+
389+ @ Override
390+ public void onFailure (Exception e ) {
391+ logger .warn ("ILM failed to process cluster state" , e );
392+ }
393+
394+ @ Override
395+ public void onAfter () {
396+ // If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return.
397+ if (lastSeenState .compareAndSet (currentState .get (), null )) {
398+ return ;
399+ }
400+ // If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to
401+ // process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets
402+ // executed.
403+ processClusterState ();
404+ }
405+
406+ @ Override
407+ public boolean isForceExecution () {
408+ // Without force execution, we risk ILM state processing being postponed arbitrarily long, which in turn could cause
409+ // thundering herd issues if there's significant time between ILM runs.
410+ return true ;
411+ }
412+ });
413+ }
414+
346415 @ Override
347416 public void applyClusterState (ClusterChangedEvent event ) {
348417 // only act if we are master, otherwise keep idle until elected
0 commit comments