@@ -400,7 +400,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
400400 }
401401 } else if (Objects .isNull (currentTemplate )) {
402402 logger .debug ("adding composable template [{}] for [{}], because it doesn't exist" , templateName , getOrigin ());
403- putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck , false );
403+ putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck );
404404 } else if (Objects .isNull (currentTemplate .version ()) || newTemplate .getValue ().version () > currentTemplate .version ()) {
405405 // IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can
406406 // safely assume it's an old version of the template.
@@ -411,7 +411,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
411411 currentTemplate .version (),
412412 newTemplate .getValue ().version ()
413413 );
414- putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck , true );
414+ putComposableTemplate (state , templateName , newTemplate .getValue (), creationCheck );
415415 } else {
416416 creationCheck .set (false );
417417 logger .trace (
@@ -433,11 +433,11 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
433433
434434 /**
435435 * Returns true if the cluster state contains all of the component templates needed by the composable template. If this registry
436- * requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Upgrade ()}), this method also
436+ * requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Update ()}), this method also
437437 * verifies that the installed components templates are of the right version.
438438 */
439439 private boolean componentTemplatesInstalled (ClusterState state , ComposableIndexTemplate indexTemplate ) {
440- if (applyRolloverAfterTemplateV2Upgrade () == false ) {
440+ if (applyRolloverAfterTemplateV2Update () == false ) {
441441 // component templates and index templates can be updated independently, we only need to know that the required component
442442 // templates are available
443443 return state .metadata ().componentTemplates ().keySet ().containsAll (indexTemplate .getRequiredComponentTemplates ());
@@ -533,8 +533,7 @@ private void putComposableTemplate(
533533 ClusterState state ,
534534 final String templateName ,
535535 final ComposableIndexTemplate indexTemplate ,
536- final AtomicBoolean creationCheck ,
537- final boolean isUpgrade
536+ final AtomicBoolean creationCheck
538537 ) {
539538 final Executor executor = threadPool .generic ();
540539 executor .execute (() -> {
@@ -549,8 +548,8 @@ private void putComposableTemplate(
549548 @ Override
550549 public void onResponse (AcknowledgedResponse response ) {
551550 if (response .isAcknowledged ()) {
552- if (isUpgrade && applyRolloverAfterTemplateV2Upgrade ()) {
553- invokeRollover (state , templateName , indexTemplate , creationCheck );
551+ if (applyRolloverAfterTemplateV2Update ()) {
552+ invokeRollover (state , templateName , indexTemplate , () -> creationCheck . set (( false )) );
554553 } else {
555554 creationCheck .set (false );
556555 }
@@ -763,12 +762,13 @@ public void onFailure(Exception e) {
763762
764763 /**
765764 * Allows registries to opt-in for automatic rollover of "relevant" data streams immediately after a composable index template gets
766- * upgraded. If set to {@code true}, then every time a composable index template is being upgraded, all data streams of which name
767- * matches this template's index patterns AND of all matching templates the upgraded one has the highest priority, will be rolled over.
765+ * updated, including its initial installation. If set to {@code true}, then every time a composable index template is being updated,
766+ * all data streams of which name matches this template's index patterns AND of all matching templates the upgraded one has the highest
767+ * priority, will be rolled over.
768768 *
769769 * @return {@code true} if this registry wants to apply automatic rollovers after template V2 upgrades
770770 */
771- protected boolean applyRolloverAfterTemplateV2Upgrade () {
771+ protected boolean applyRolloverAfterTemplateV2Update () {
772772 return false ;
773773 }
774774
@@ -782,50 +782,56 @@ protected void onPutPipelineFailure(String pipelineId, Exception e) {
782782 logger .error (() -> format ("error adding ingest pipeline template [%s] for [%s]" , pipelineId , getOrigin ()), e );
783783 }
784784
785+ /**
786+ * invokeRollover rolls over any data streams matching the index template,
787+ * and then invokes runAfter.
788+ */
785789 private void invokeRollover (
786790 final ClusterState state ,
787791 final String templateName ,
788792 final ComposableIndexTemplate indexTemplate ,
789- final AtomicBoolean creationCheck
793+ final Runnable runAfter
790794 ) {
791795 final Executor executor = threadPool .generic ();
792796 executor .execute (() -> {
793797 List <String > rolloverTargets = findRolloverTargetDataStreams (state , templateName , indexTemplate );
794- if (rolloverTargets .isEmpty () == false ) {
795- GroupedActionListener <RolloverResponse > groupedActionListener = new GroupedActionListener <>(
796- rolloverTargets .size (),
797- new ActionListener <>() {
798- @ Override
799- public void onResponse (Collection <RolloverResponse > rolloverResponses ) {
800- creationCheck .set (false );
801- onRolloversBulkResponse (rolloverResponses );
802- }
798+ if (rolloverTargets .isEmpty ()) {
799+ runAfter .run ();
800+ return ;
801+ }
802+ GroupedActionListener <RolloverResponse > groupedActionListener = new GroupedActionListener <>(
803+ rolloverTargets .size (),
804+ new ActionListener <>() {
805+ @ Override
806+ public void onResponse (Collection <RolloverResponse > rolloverResponses ) {
807+ runAfter .run ();
808+ onRolloversBulkResponse (rolloverResponses );
809+ }
803810
804- @ Override
805- public void onFailure (Exception e ) {
806- creationCheck .set (false );
807- onRolloverFailure (e );
808- }
811+ @ Override
812+ public void onFailure (Exception e ) {
813+ runAfter .run ();
814+ onRolloverFailure (e );
809815 }
810- );
811- for (String rolloverTarget : rolloverTargets ) {
812- logger .info (
813- "rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]" ,
814- rolloverTarget ,
815- getOrigin (),
816- templateName
817- );
818- RolloverRequest request = new RolloverRequest (rolloverTarget , null );
819- request .lazy (true );
820- request .masterNodeTimeout (TimeValue .MAX_VALUE );
821- executeAsyncWithOrigin (
822- client .threadPool ().getThreadContext (),
823- getOrigin (),
824- request ,
825- groupedActionListener ,
826- (req , listener ) -> client .execute (RolloverAction .INSTANCE , req , listener )
827- );
828816 }
817+ );
818+ for (String rolloverTarget : rolloverTargets ) {
819+ logger .info (
820+ "rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]" ,
821+ rolloverTarget ,
822+ getOrigin (),
823+ templateName
824+ );
825+ RolloverRequest request = new RolloverRequest (rolloverTarget , null );
826+ request .lazy (true );
827+ request .masterNodeTimeout (TimeValue .MAX_VALUE );
828+ executeAsyncWithOrigin (
829+ client .threadPool ().getThreadContext (),
830+ getOrigin (),
831+ request ,
832+ groupedActionListener ,
833+ (req , listener ) -> client .execute (RolloverAction .INSTANCE , req , listener )
834+ );
829835 }
830836 });
831837 }
@@ -865,7 +871,21 @@ static List<String> findRolloverTargetDataStreams(ClusterState state, String tem
865871 .stream ()
866872 // Limit to checking data streams that match any of the index template's index patterns
867873 .filter (ds -> indexTemplate .indexPatterns ().stream ().anyMatch (pattern -> Regex .simpleMatch (pattern , ds .getName ())))
868- .filter (ds -> templateName .equals (MetadataIndexTemplateService .findV2Template (metadata , ds .getName (), ds .isHidden ())))
874+ .filter (ds -> {
875+ final String dsTemplateName = MetadataIndexTemplateService .findV2Template (metadata , ds .getName (), ds .isHidden ());
876+ if (templateName .equals (dsTemplateName )) {
877+ return true ;
878+ }
879+ // findV2Template did not match templateName, which implies one of two things:
880+ // - indexTemplate has a lower priority than the index template matching for ds, OR
881+ // - indexTemplate does not yet exist in cluster state (i.e. because it's in the process of being
882+ // installed or updated)
883+ //
884+ // Because of the second case, we must check if indexTemplate's priority is greater than the matching
885+ // index template, in case it would take precedence after installation/update.
886+ final ComposableIndexTemplate dsTemplate = metadata .templatesV2 ().get (dsTemplateName );
887+ return dsTemplate == null || indexTemplate .priorityOrZero () > dsTemplate .priorityOrZero ();
888+ })
869889 .map (DataStream ::getName )
870890 .collect (Collectors .toList ());
871891 }
0 commit comments