Skip to content

Commit 2b1edc7

Browse files
jimczismalyshev
authored andcommitted
Prevent default inference model to update the cluster state when deleting (#125369)
The Elastic inference service removes the default models at startup if the node cannot access EIS. Since #125242 we don't store default models in the cluster state but we still try to delete them. This change ensures that we don't try to update the cluster state when a default model is deleted since the delete is not performed on the master node and default models are never stored in the cluster state.
1 parent 926fa87 commit 2b1edc7

File tree

1 file changed

+26
-6
lines changed
  • x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry

1 file changed

+26
-6
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.xcontent.XContentHelper;
4747
import org.elasticsearch.core.TimeValue;
4848
import org.elasticsearch.core.Tuple;
49+
import org.elasticsearch.gateway.GatewayService;
4950
import org.elasticsearch.index.engine.VersionConflictEngineException;
5051
import org.elasticsearch.index.query.QueryBuilder;
5152
import org.elasticsearch.index.query.QueryBuilders;
@@ -627,8 +628,8 @@ public void storeModel(Model model, ActionListener<Boolean> listener, TimeValue
627628
storeModel(model, true, listener, timeout);
628629
}
629630

630-
private void storeModel(Model model, boolean addToClusterState, ActionListener<Boolean> listener, TimeValue timeout) {
631-
ActionListener<BulkResponse> bulkResponseActionListener = getStoreIndexListener(model, addToClusterState, listener, timeout);
631+
private void storeModel(Model model, boolean updateClusterState, ActionListener<Boolean> listener, TimeValue timeout) {
632+
ActionListener<BulkResponse> bulkResponseActionListener = getStoreIndexListener(model, updateClusterState, listener, timeout);
632633

633634
IndexRequest configRequest = createIndexRequest(
634635
Model.documentId(model.getConfigurations().getInferenceEntityId()),
@@ -653,7 +654,7 @@ private void storeModel(Model model, boolean addToClusterState, ActionListener<B
653654

654655
private ActionListener<BulkResponse> getStoreIndexListener(
655656
Model model,
656-
boolean addToClusterState,
657+
boolean updateClusterState,
657658
ActionListener<Boolean> listener,
658659
TimeValue timeout
659660
) {
@@ -680,7 +681,7 @@ private ActionListener<BulkResponse> getStoreIndexListener(
680681
BulkItemResponse.Failure failure = getFirstBulkFailure(bulkItemResponses);
681682

682683
if (failure == null) {
683-
if (addToClusterState) {
684+
if (updateClusterState) {
684685
var storeListener = getStoreMetadataListener(inferenceEntityId, listener);
685686
try {
686687
var projectId = clusterService.state().projectState().projectId();
@@ -781,14 +782,19 @@ public synchronized void removeDefaultConfigs(Set<String> inferenceEntityIds, Ac
781782
}
782783

783784
defaultConfigIds.keySet().removeAll(inferenceEntityIds);
784-
deleteModels(inferenceEntityIds, listener);
785+
// default models are not stored in the cluster state.
786+
deleteModels(inferenceEntityIds, false, listener);
785787
}
786788

787789
public void deleteModel(String inferenceEntityId, ActionListener<Boolean> listener) {
788790
deleteModels(Set.of(inferenceEntityId), listener);
789791
}
790792

791793
public void deleteModels(Set<String> inferenceEntityIds, ActionListener<Boolean> listener) {
794+
deleteModels(inferenceEntityIds, true, listener);
795+
}
796+
797+
private void deleteModels(Set<String> inferenceEntityIds, boolean updateClusterState, ActionListener<Boolean> listener) {
792798
var lockedInferenceIds = new HashSet<>(inferenceEntityIds);
793799
lockedInferenceIds.retainAll(preventDeletionLock);
794800

@@ -807,16 +813,25 @@ public void deleteModels(Set<String> inferenceEntityIds, ActionListener<Boolean>
807813
}
808814

809815
var request = createDeleteRequest(inferenceEntityIds);
810-
client.execute(DeleteByQueryAction.INSTANCE, request, getDeleteModelClusterStateListener(inferenceEntityIds, listener));
816+
client.execute(
817+
DeleteByQueryAction.INSTANCE,
818+
request,
819+
getDeleteModelClusterStateListener(inferenceEntityIds, updateClusterState, listener)
820+
);
811821
}
812822

813823
private ActionListener<BulkByScrollResponse> getDeleteModelClusterStateListener(
814824
Set<String> inferenceEntityIds,
825+
boolean updateClusterState,
815826
ActionListener<Boolean> listener
816827
) {
817828
return new ActionListener<>() {
818829
@Override
819830
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
831+
if (updateClusterState == false) {
832+
listener.onResponse(Boolean.TRUE);
833+
return;
834+
}
820835
var clusterStateListener = new ActionListener<AcknowledgedResponse>() {
821836
@Override
822837
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
@@ -924,6 +939,11 @@ public void clusterChanged(ClusterChangedEvent event) {
924939
return;
925940
}
926941

942+
// wait for the cluster state to be recovered
943+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
944+
return;
945+
}
946+
927947
if (event.state().metadata().projects().size() > 1) {
928948
// TODO: Add support to handle multi-projects
929949
return;

0 commit comments

Comments
 (0)