diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java index 837a66b3ad24c..5c99135e1dd22 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -625,8 +626,8 @@ public void storeModel(Model model, ActionListener listener, TimeValue storeModel(model, true, listener, timeout); } - private void storeModel(Model model, boolean addToClusterState, ActionListener listener, TimeValue timeout) { - ActionListener bulkResponseActionListener = getStoreIndexListener(model, addToClusterState, listener, timeout); + private void storeModel(Model model, boolean updateClusterState, ActionListener listener, TimeValue timeout) { + ActionListener bulkResponseActionListener = getStoreIndexListener(model, updateClusterState, listener, timeout); IndexRequest configRequest = createIndexRequest( Model.documentId(model.getConfigurations().getInferenceEntityId()), @@ -651,7 +652,7 @@ private void storeModel(Model model, boolean addToClusterState, ActionListener getStoreIndexListener( Model model, - boolean addToClusterState, + boolean updateClusterState, ActionListener listener, TimeValue timeout ) { @@ -678,7 +679,7 @@ private ActionListener getStoreIndexListener( BulkItemResponse.Failure failure = getFirstBulkFailure(bulkItemResponses); if (failure == null) { - if (addToClusterState) { + if (updateClusterState) { var storeListener = getStoreMetadataListener(inferenceEntityId, listener); try { metadataTaskQueue.submitTask( @@ -778,7 +779,8 @@ public synchronized void removeDefaultConfigs(Set inferenceEntityIds, Ac } defaultConfigIds.keySet().removeAll(inferenceEntityIds); - deleteModels(inferenceEntityIds, listener); + // default models are not stored in the cluster state. + deleteModels(inferenceEntityIds, false, listener); } public void deleteModel(String inferenceEntityId, ActionListener listener) { @@ -786,6 +788,10 @@ public void deleteModel(String inferenceEntityId, ActionListener listen } public void deleteModels(Set inferenceEntityIds, ActionListener listener) { + deleteModels(inferenceEntityIds, true, listener); + } + + private void deleteModels(Set inferenceEntityIds, boolean updateClusterState, ActionListener listener) { var lockedInferenceIds = new HashSet<>(inferenceEntityIds); lockedInferenceIds.retainAll(preventDeletionLock); @@ -804,16 +810,25 @@ public void deleteModels(Set inferenceEntityIds, ActionListener } var request = createDeleteRequest(inferenceEntityIds); - client.execute(DeleteByQueryAction.INSTANCE, request, getDeleteModelClusterStateListener(inferenceEntityIds, listener)); + client.execute( + DeleteByQueryAction.INSTANCE, + request, + getDeleteModelClusterStateListener(inferenceEntityIds, updateClusterState, listener) + ); } private ActionListener getDeleteModelClusterStateListener( Set inferenceEntityIds, + boolean updateClusterState, ActionListener listener ) { return new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + if (updateClusterState == false) { + listener.onResponse(Boolean.TRUE); + return; + } var clusterStateListener = new ActionListener() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { @@ -921,6 +936,12 @@ public void clusterChanged(ClusterChangedEvent event) { } var state = ModelRegistryMetadata.fromState(event.state().metadata()); + + // wait for the cluster state to be recovered + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + if (state.isUpgraded()) { return; }