@@ -401,7 +401,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
401401                    }
402402                } else  if  (Objects .isNull (currentTemplate )) {
403403                    logger .debug ("adding composable template [{}] for [{}], because it doesn't exist" , templateName , getOrigin ());
404-                     putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck ,  false );
404+                     putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck );
405405                } else  if  (Objects .isNull (currentTemplate .version ()) || newTemplate .getValue ().version () > currentTemplate .version ()) {
406406                    // IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can 
407407                    // safely assume it's an old version of the template. 
@@ -412,7 +412,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
412412                        currentTemplate .version (),
413413                        newTemplate .getValue ().version ()
414414                    );
415-                     putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck ,  true );
415+                     putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck );
416416                } else  {
417417                    creationCheck .set (false );
418418                    logger .trace (
@@ -434,11 +434,11 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
434434
435435    /** 
436436     * Returns true if the cluster state contains all of the component templates needed by the composable template. If this registry 
437-      * requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Upgrade ()}), this method also 
437+      * requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Update ()}), this method also 
438438     * verifies that the installed components templates are of the right version. 
439439     */ 
440440    private  boolean  componentTemplatesInstalled (ClusterState  state , ComposableIndexTemplate  indexTemplate ) {
441-         if  (applyRolloverAfterTemplateV2Upgrade () == false ) {
441+         if  (applyRolloverAfterTemplateV2Update () == false ) {
442442            // component templates and index templates can be updated independently, we only need to know that the required component 
443443            // templates are available 
444444            return  state .metadata ().componentTemplates ().keySet ().containsAll (indexTemplate .getRequiredComponentTemplates ());
@@ -534,8 +534,7 @@ private void putComposableTemplate(
534534        ClusterState  state ,
535535        final  String  templateName ,
536536        final  ComposableIndexTemplate  indexTemplate ,
537-         final  AtomicBoolean  creationCheck ,
538-         final  boolean  isUpgrade 
537+         final  AtomicBoolean  creationCheck 
539538    ) {
540539        final  Executor  executor  = threadPool .generic ();
541540        executor .execute (() -> {
@@ -550,8 +549,8 @@ private void putComposableTemplate(
550549                    @ Override 
551550                    public  void  onResponse (AcknowledgedResponse  response ) {
552551                        if  (response .isAcknowledged ()) {
553-                             if  (isUpgrade  &&  applyRolloverAfterTemplateV2Upgrade ()) {
554-                                 invokeRollover (state , templateName , indexTemplate , creationCheck );
552+                             if  (applyRolloverAfterTemplateV2Update ()) {
553+                                 invokeRollover (state , templateName , indexTemplate , () ->  creationCheck . set (( false )) );
555554                            } else  {
556555                                creationCheck .set (false );
557556                            }
@@ -765,12 +764,13 @@ public void onFailure(Exception e) {
765764
766765    /** 
767766     * Allows registries to opt-in for automatic rollover of "relevant" data streams immediately after a composable index template gets 
768-      * upgraded. If set to {@code true}, then every time a composable index template is being upgraded, all data streams of which name 
769-      * matches this template's index patterns AND of all matching templates the upgraded one has the highest priority, will be rolled over. 
767+      * updated, including its initial installation. If set to {@code true}, then every time a composable index template is being updated, 
768+      * all data streams of which name matches this template's index patterns AND of all matching templates the upgraded one has the highest 
769+      * priority, will be rolled over. 
770770     * 
771771     * @return {@code true} if this registry wants to apply automatic rollovers after template V2 upgrades 
772772     */ 
773-     protected  boolean  applyRolloverAfterTemplateV2Upgrade () {
773+     protected  boolean  applyRolloverAfterTemplateV2Update () {
774774        return  false ;
775775    }
776776
@@ -784,50 +784,56 @@ protected void onPutPipelineFailure(String pipelineId, Exception e) {
784784        logger .error (() -> format ("error adding ingest pipeline template [%s] for [%s]" , pipelineId , getOrigin ()), e );
785785    }
786786
787+     /** 
788+      * invokeRollover rolls over any data streams matching the index template, 
789+      * and then invokes runAfter. 
790+      */ 
787791    private  void  invokeRollover (
788792        final  ClusterState  state ,
789793        final  String  templateName ,
790794        final  ComposableIndexTemplate  indexTemplate ,
791-         final  AtomicBoolean   creationCheck 
795+         final  Runnable   runAfter 
792796    ) {
793797        final  Executor  executor  = threadPool .generic ();
794798        executor .execute (() -> {
795799            List <String > rolloverTargets  = findRolloverTargetDataStreams (state , templateName , indexTemplate );
796-             if  (rolloverTargets .isEmpty () == false ) {
797-                 GroupedActionListener <RolloverResponse > groupedActionListener  = new  GroupedActionListener <>(
798-                     rolloverTargets .size (),
799-                     new  ActionListener <>() {
800-                         @ Override 
801-                         public  void  onResponse (Collection <RolloverResponse > rolloverResponses ) {
802-                             creationCheck .set (false );
803-                             onRolloversBulkResponse (rolloverResponses );
804-                         }
800+             if  (rolloverTargets .isEmpty ()) {
801+                 runAfter .run ();
802+                 return ;
803+             }
804+             GroupedActionListener <RolloverResponse > groupedActionListener  = new  GroupedActionListener <>(
805+                 rolloverTargets .size (),
806+                 new  ActionListener <>() {
807+                     @ Override 
808+                     public  void  onResponse (Collection <RolloverResponse > rolloverResponses ) {
809+                         runAfter .run ();
810+                         onRolloversBulkResponse (rolloverResponses );
811+                     }
805812
806-                         @ Override 
807-                         public  void  onFailure (Exception  e ) {
808-                             creationCheck .set (false );
809-                             onRolloverFailure (e );
810-                         }
813+                     @ Override 
814+                     public  void  onFailure (Exception  e ) {
815+                         runAfter .run ();
816+                         onRolloverFailure (e );
811817                    }
812-                 );
813-                 for  (String  rolloverTarget  : rolloverTargets ) {
814-                     logger .info (
815-                         "rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]" ,
816-                         rolloverTarget ,
817-                         getOrigin (),
818-                         templateName 
819-                     );
820-                     RolloverRequest  request  = new  RolloverRequest (rolloverTarget , null );
821-                     request .lazy (true );
822-                     request .masterNodeTimeout (TimeValue .MAX_VALUE );
823-                     executeAsyncWithOrigin (
824-                         client .threadPool ().getThreadContext (),
825-                         getOrigin (),
826-                         request ,
827-                         groupedActionListener ,
828-                         (req , listener ) -> client .execute (RolloverAction .INSTANCE , req , listener )
829-                     );
830818                }
819+             );
820+             for  (String  rolloverTarget  : rolloverTargets ) {
821+                 logger .info (
822+                     "rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]" ,
823+                     rolloverTarget ,
824+                     getOrigin (),
825+                     templateName 
826+                 );
827+                 RolloverRequest  request  = new  RolloverRequest (rolloverTarget , null );
828+                 request .lazy (true );
829+                 request .masterNodeTimeout (TimeValue .MAX_VALUE );
830+                 executeAsyncWithOrigin (
831+                     client .threadPool ().getThreadContext (),
832+                     getOrigin (),
833+                     request ,
834+                     groupedActionListener ,
835+                     (req , listener ) -> client .execute (RolloverAction .INSTANCE , req , listener )
836+                 );
831837            }
832838        });
833839    }
@@ -867,7 +873,21 @@ static List<String> findRolloverTargetDataStreams(ClusterState state, String tem
867873            .stream ()
868874            // Limit to checking data streams that match any of the index template's index patterns 
869875            .filter (ds  -> indexTemplate .indexPatterns ().stream ().anyMatch (pattern  -> Regex .simpleMatch (pattern , ds .getName ())))
870-             .filter (ds  -> templateName .equals (MetadataIndexTemplateService .findV2Template (metadata , ds .getName (), ds .isHidden ())))
876+             .filter (ds  -> {
877+                 final  String  dsTemplateName  = MetadataIndexTemplateService .findV2Template (metadata , ds .getName (), ds .isHidden ());
878+                 if  (templateName .equals (dsTemplateName )) {
879+                     return  true ;
880+                 }
881+                 // findV2Template did not match templateName, which implies one of two things: 
882+                 // - indexTemplate has a lower priority than the index template matching for ds, OR 
883+                 // - indexTemplate does not yet exist in cluster state (i.e. because it's in the process of being 
884+                 // installed or updated) 
885+                 // 
886+                 // Because of the second case, we must check if indexTemplate's priority is greater than the matching 
887+                 // index template, in case it would take precedence after installation/update. 
888+                 final  ComposableIndexTemplate  dsTemplate  = metadata .templatesV2 ().get (dsTemplateName );
889+                 return  dsTemplate  == null  || indexTemplate .priorityOrZero () > dsTemplate .priorityOrZero ();
890+             })
871891            .map (DataStream ::getName )
872892            .collect (Collectors .toList ());
873893    }
0 commit comments