4545import org .elasticsearch .common .xcontent .XContentHelper ;
4646import org .elasticsearch .core .TimeValue ;
4747import org .elasticsearch .core .Tuple ;
48+ import org .elasticsearch .gateway .GatewayService ;
4849import org .elasticsearch .index .engine .VersionConflictEngineException ;
4950import org .elasticsearch .index .query .QueryBuilder ;
5051import org .elasticsearch .index .query .QueryBuilders ;
@@ -625,8 +626,8 @@ public void storeModel(Model model, ActionListener<Boolean> listener, TimeValue
625626 storeModel (model , true , listener , timeout );
626627 }
627628
628- private void storeModel (Model model , boolean addToClusterState , ActionListener <Boolean > listener , TimeValue timeout ) {
629- ActionListener <BulkResponse > bulkResponseActionListener = getStoreIndexListener (model , addToClusterState , listener , timeout );
629+ private void storeModel (Model model , boolean updateClusterState , ActionListener <Boolean > listener , TimeValue timeout ) {
630+ ActionListener <BulkResponse > bulkResponseActionListener = getStoreIndexListener (model , updateClusterState , listener , timeout );
630631
631632 IndexRequest configRequest = createIndexRequest (
632633 Model .documentId (model .getConfigurations ().getInferenceEntityId ()),
@@ -651,7 +652,7 @@ private void storeModel(Model model, boolean addToClusterState, ActionListener<B
651652
652653 private ActionListener <BulkResponse > getStoreIndexListener (
653654 Model model ,
654- boolean addToClusterState ,
655+ boolean updateClusterState ,
655656 ActionListener <Boolean > listener ,
656657 TimeValue timeout
657658 ) {
@@ -678,7 +679,7 @@ private ActionListener<BulkResponse> getStoreIndexListener(
678679 BulkItemResponse .Failure failure = getFirstBulkFailure (bulkItemResponses );
679680
680681 if (failure == null ) {
681- if (addToClusterState ) {
682+ if (updateClusterState ) {
682683 var storeListener = getStoreMetadataListener (inferenceEntityId , listener );
683684 try {
684685 metadataTaskQueue .submitTask (
@@ -778,14 +779,19 @@ public synchronized void removeDefaultConfigs(Set<String> inferenceEntityIds, Ac
778779 }
779780
780781 defaultConfigIds .keySet ().removeAll (inferenceEntityIds );
781- deleteModels (inferenceEntityIds , listener );
782+ // default models are not stored in the cluster state.
783+ deleteModels (inferenceEntityIds , false , listener );
782784 }
783785
784786 public void deleteModel (String inferenceEntityId , ActionListener <Boolean > listener ) {
785787 deleteModels (Set .of (inferenceEntityId ), listener );
786788 }
787789
788790 public void deleteModels (Set <String > inferenceEntityIds , ActionListener <Boolean > listener ) {
791+ deleteModels (inferenceEntityIds , true , listener );
792+ }
793+
794+ private void deleteModels (Set <String > inferenceEntityIds , boolean updateClusterState , ActionListener <Boolean > listener ) {
789795 var lockedInferenceIds = new HashSet <>(inferenceEntityIds );
790796 lockedInferenceIds .retainAll (preventDeletionLock );
791797
@@ -804,16 +810,25 @@ public void deleteModels(Set<String> inferenceEntityIds, ActionListener<Boolean>
804810 }
805811
806812 var request = createDeleteRequest (inferenceEntityIds );
807- client .execute (DeleteByQueryAction .INSTANCE , request , getDeleteModelClusterStateListener (inferenceEntityIds , listener ));
813+ client .execute (
814+ DeleteByQueryAction .INSTANCE ,
815+ request ,
816+ getDeleteModelClusterStateListener (inferenceEntityIds , updateClusterState , listener )
817+ );
808818 }
809819
810820 private ActionListener <BulkByScrollResponse > getDeleteModelClusterStateListener (
811821 Set <String > inferenceEntityIds ,
822+ boolean updateClusterState ,
812823 ActionListener <Boolean > listener
813824 ) {
814825 return new ActionListener <>() {
815826 @ Override
816827 public void onResponse (BulkByScrollResponse bulkByScrollResponse ) {
828+ if (updateClusterState == false ) {
829+ listener .onResponse (Boolean .TRUE );
830+ return ;
831+ }
817832 var clusterStateListener = new ActionListener <AcknowledgedResponse >() {
818833 @ Override
819834 public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
@@ -921,6 +936,12 @@ public void clusterChanged(ClusterChangedEvent event) {
921936 }
922937
923938 var state = ModelRegistryMetadata .fromState (event .state ().metadata ());
939+
940+ // wait for the cluster state to be recovered
941+ if (event .state ().blocks ().hasGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK )) {
942+ return ;
943+ }
944+
924945 if (state .isUpgraded ()) {
925946 return ;
926947 }
0 commit comments