2626import  org .elasticsearch .common .scheduler .SchedulerEngine ;
2727import  org .elasticsearch .common .scheduler .TimeValueSchedule ;
2828import  org .elasticsearch .common .settings .Settings ;
29+ import  org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2930import  org .elasticsearch .core .FixForMultiProject ;
3031import  org .elasticsearch .core .Nullable ;
3132import  org .elasticsearch .core .SuppressForbidden ;
6061import  java .util .List ;
6162import  java .util .Map ;
6263import  java .util .Set ;
64+ import  java .util .concurrent .ExecutorService ;
65+ import  java .util .concurrent .atomic .AtomicReference ;
6366import  java .util .function .LongSupplier ;
6467import  java .util .stream .Collectors ;
6568
@@ -90,7 +93,12 @@ public class IndexLifecycleService
9093    private  final  IndexLifecycleRunner  lifecycleRunner ;
9194    private  final  Settings  settings ;
9295    private  final  ClusterService  clusterService ;
96+     private  final  ThreadPool  threadPool ;
9397    private  final  LongSupplier  nowSupplier ;
98+     private  final  ExecutorService  managementExecutor ;
99+     /** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */ 
100+     private  final  AtomicReference <ClusterState > lastSeenState  = new  AtomicReference <>();
101+ 
94102    private  SchedulerEngine .Job  scheduledJob ;
95103
96104    @ SuppressWarnings ("this-escape" )
@@ -108,12 +116,14 @@ public IndexLifecycleService(
108116        super ();
109117        this .settings  = settings ;
110118        this .clusterService  = clusterService ;
119+         this .threadPool  = threadPool ;
111120        this .clock  = clock ;
112121        this .nowSupplier  = nowSupplier ;
113122        this .scheduledJob  = null ;
114123        this .policyRegistry  = new  PolicyStepsRegistry (xContentRegistry , client , licenseState );
115124        this .lifecycleRunner  = new  IndexLifecycleRunner (policyRegistry , ilmHistoryStore , clusterService , threadPool , nowSupplier );
116125        this .pollInterval  = LifecycleSettings .LIFECYCLE_POLL_INTERVAL_SETTING .get (settings );
126+         this .managementExecutor  = threadPool .executor (ThreadPool .Names .MANAGEMENT );
117127        clusterService .addStateApplier (this );
118128        clusterService .addListener (this );
119129        clusterService .getClusterSettings ()
@@ -332,17 +342,76 @@ public void clusterChanged(ClusterChangedEvent event) {
332342            // ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary. 
333343            final  List <Index > indicesDeleted  = event .indicesDeleted ();
334344            if  (indicesDeleted .isEmpty () == false ) {
335-                 clusterService . getClusterApplierService (). threadPool (). executor ( ThreadPool . Names . MANAGEMENT ) .execute (() -> {
345+                 managementExecutor .execute (() -> {
336346                    for  (Index  index  : indicesDeleted ) {
337347                        policyRegistry .delete (index );
338348                    }
339349                });
340350            }
341351
342-             triggerPolicies (event .state (), true );
352+             // Only start processing the new cluster state if we're not already processing one. 
353+             // Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet. 
354+             // This means that when ILM's cluster state processing takes longer than the overall cluster state application or when 
355+             // the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some 
356+             // cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine. 
357+             if  (lastSeenState .getAndSet (event .state ()) == null ) {
358+                 processClusterState ();
359+             } else  {
360+                 logger .trace ("ILM state processor still running, not starting new thread" );
361+             }
343362        }
344363    }
345364
365+     /** 
366+      * Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where 
367+      * ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states. 
368+      * That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally 
369+      * unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount 
370+      * of processing on the critical cluster state applier thread. 
371+      */ 
372+     private  void  processClusterState () {
373+         managementExecutor .execute (new  AbstractRunnable () {
374+ 
375+             private  final  SetOnce <ClusterState > currentState  = new  SetOnce <>();
376+ 
377+             @ Override 
378+             protected  void  doRun () throws  Exception  {
379+                 final  ClusterState  currentState  = lastSeenState .get ();
380+                 // This should never be null, but we're checking anyway to be sure. 
381+                 if  (currentState  == null ) {
382+                     assert  false  : "Expected current state to non-null when processing cluster state in ILM" ;
383+                     return ;
384+                 }
385+                 this .currentState .set (currentState );
386+                 triggerPolicies (currentState , true );
387+             }
388+ 
389+             @ Override 
390+             public  void  onFailure (Exception  e ) {
391+                 logger .warn ("ILM failed to process cluster state" , e );
392+             }
393+ 
394+             @ Override 
395+             public  void  onAfter () {
396+                 // If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return. 
397+                 if  (lastSeenState .compareAndSet (currentState .get (), null )) {
398+                     return ;
399+                 }
400+                 // If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to 
401+                 // process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets 
402+                 // executed. 
403+                 processClusterState ();
404+             }
405+ 
406+             @ Override 
407+             public  boolean  isForceExecution () {
408+                 // Without force execution, we risk ILM state processing being postponed arbitrarily long, which in turn could cause 
409+                 // thundering herd issues if there's significant time between ILM runs. 
410+                 return  true ;
411+             }
412+         });
413+     }
414+ 
346415    @ Override 
347416    public  void  applyClusterState (ClusterChangedEvent  event ) {
348417        // only act if we are master, otherwise keep idle until elected 
0 commit comments