Skip to content

Commit 4ccb046

Browse files
authored
Prevent default inference model to update the cluster state when deleting (#125369) (#125597)
The Elastic inference service removes the default models at startup if the node cannot access EIS. Since #125242 we don't store default models in the cluster state but we still try to delete them. This change ensures that we don't try to update the cluster state when a default model is deleted since the delete is not performed on the master node and default models are never stored in the cluster state.
1 parent bff1dff commit 4ccb046

File tree

1 file changed

+27
-6
lines changed
  • x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry

1 file changed

+27
-6
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.common.xcontent.XContentHelper;
4646
import org.elasticsearch.core.TimeValue;
4747
import org.elasticsearch.core.Tuple;
48+
import org.elasticsearch.gateway.GatewayService;
4849
import org.elasticsearch.index.engine.VersionConflictEngineException;
4950
import org.elasticsearch.index.query.QueryBuilder;
5051
import org.elasticsearch.index.query.QueryBuilders;
@@ -625,8 +626,8 @@ public void storeModel(Model model, ActionListener<Boolean> listener, TimeValue
625626
storeModel(model, true, listener, timeout);
626627
}
627628

628-
private void storeModel(Model model, boolean addToClusterState, ActionListener<Boolean> listener, TimeValue timeout) {
629-
ActionListener<BulkResponse> bulkResponseActionListener = getStoreIndexListener(model, addToClusterState, listener, timeout);
629+
private void storeModel(Model model, boolean updateClusterState, ActionListener<Boolean> listener, TimeValue timeout) {
630+
ActionListener<BulkResponse> bulkResponseActionListener = getStoreIndexListener(model, updateClusterState, listener, timeout);
630631

631632
IndexRequest configRequest = createIndexRequest(
632633
Model.documentId(model.getConfigurations().getInferenceEntityId()),
@@ -651,7 +652,7 @@ private void storeModel(Model model, boolean addToClusterState, ActionListener<B
651652

652653
private ActionListener<BulkResponse> getStoreIndexListener(
653654
Model model,
654-
boolean addToClusterState,
655+
boolean updateClusterState,
655656
ActionListener<Boolean> listener,
656657
TimeValue timeout
657658
) {
@@ -678,7 +679,7 @@ private ActionListener<BulkResponse> getStoreIndexListener(
678679
BulkItemResponse.Failure failure = getFirstBulkFailure(bulkItemResponses);
679680

680681
if (failure == null) {
681-
if (addToClusterState) {
682+
if (updateClusterState) {
682683
var storeListener = getStoreMetadataListener(inferenceEntityId, listener);
683684
try {
684685
metadataTaskQueue.submitTask(
@@ -778,14 +779,19 @@ public synchronized void removeDefaultConfigs(Set<String> inferenceEntityIds, Ac
778779
}
779780

780781
defaultConfigIds.keySet().removeAll(inferenceEntityIds);
781-
deleteModels(inferenceEntityIds, listener);
782+
// default models are not stored in the cluster state.
783+
deleteModels(inferenceEntityIds, false, listener);
782784
}
783785

784786
public void deleteModel(String inferenceEntityId, ActionListener<Boolean> listener) {
785787
deleteModels(Set.of(inferenceEntityId), listener);
786788
}
787789

788790
public void deleteModels(Set<String> inferenceEntityIds, ActionListener<Boolean> listener) {
791+
deleteModels(inferenceEntityIds, true, listener);
792+
}
793+
794+
private void deleteModels(Set<String> inferenceEntityIds, boolean updateClusterState, ActionListener<Boolean> listener) {
789795
var lockedInferenceIds = new HashSet<>(inferenceEntityIds);
790796
lockedInferenceIds.retainAll(preventDeletionLock);
791797

@@ -804,16 +810,25 @@ public void deleteModels(Set<String> inferenceEntityIds, ActionListener<Boolean>
804810
}
805811

806812
var request = createDeleteRequest(inferenceEntityIds);
807-
client.execute(DeleteByQueryAction.INSTANCE, request, getDeleteModelClusterStateListener(inferenceEntityIds, listener));
813+
client.execute(
814+
DeleteByQueryAction.INSTANCE,
815+
request,
816+
getDeleteModelClusterStateListener(inferenceEntityIds, updateClusterState, listener)
817+
);
808818
}
809819

810820
private ActionListener<BulkByScrollResponse> getDeleteModelClusterStateListener(
811821
Set<String> inferenceEntityIds,
822+
boolean updateClusterState,
812823
ActionListener<Boolean> listener
813824
) {
814825
return new ActionListener<>() {
815826
@Override
816827
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
828+
if (updateClusterState == false) {
829+
listener.onResponse(Boolean.TRUE);
830+
return;
831+
}
817832
var clusterStateListener = new ActionListener<AcknowledgedResponse>() {
818833
@Override
819834
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
@@ -921,6 +936,12 @@ public void clusterChanged(ClusterChangedEvent event) {
921936
}
922937

923938
var state = ModelRegistryMetadata.fromState(event.state().metadata());
939+
940+
// wait for the cluster state to be recovered
941+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
942+
return;
943+
}
944+
924945
if (state.isUpgraded()) {
925946
return;
926947
}

0 commit comments

Comments
 (0)