Skip to content

Commit b23024f

Browse files
Adding comments and extracting method
1 parent 7aab191 commit b23024f

File tree

1 file changed

+41
-18
lines changed
  • x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry

1 file changed

+41
-18
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,14 @@ public Tuple<ClusterState, ClusterStateAckListener> executeTask(MetadataTask tas
177177
* @return true if we find a match and false if not
178178
*/
179179
public boolean containsPreconfiguredInferenceEndpointId(String inferenceEntityId) {
180+
// This checks an in memory cache local to the node. The cache should be the same across all nodes as it is populated in the
181+
// inference plugin on boot up (excluding an upgrade scenario where the plugins could be different).
182+
// This primarily holds endpoints registered by the ElasticsearchInternalService
180183
if (defaultConfigIds.containsKey(inferenceEntityId)) {
181184
return true;
182185
}
183186

187+
// This checks the cluster state for user created endpoints as well as EIS preconfigured endpoints
184188
if (lastMetadata.get() != null) {
185189
var project = lastMetadata.get().getProject(ProjectId.DEFAULT);
186190
var state = ModelRegistryMetadata.fromState(project);
@@ -960,6 +964,18 @@ private void updateClusterState(ResponseInfo responseInfo, ActionListener<Acknow
960964
.addListener(listener);
961965
}
962966

967+
/**
968+
* An out of sync endpoint is one that exists in the model store index, but is not present in the cluster state. This doesn't usually
969+
* happen. It did happen when we transitioned the EIS preconfigured endpoints from being stored in the in memory cache local to
970+
* each node in {@link #defaultConfigIds} to being stored in cluster state. The issue was that when we transitioned the logic
971+
* the {@link ElasticInferenceService} no longer registered the preconfigured endpoints on an authorization poll to the in memory
972+
* cache. The polling logic was moved to a persistent task that only runs on a single node. Since we're only running on one node
973+
* we need to store the information in a way that all nodes can access it, hence storing in cluster state. The logic to store the
974+
* information in cluster state was only triggered when a preconfigured endpoint was missing from cluster state (it gets flagged as
975+
* being new). So the authorization logic would receive the preconfigured endpoints from EIS and attempt to store them. When it
976+
* stored them a version conflict would occur since the index already had the documents. So this logic identifies that scenario and
977+
* adds the missing information to cluster state.
978+
*/
963979
private void handleOutOfSyncEndpoints(ResponseInfo responseInfo, ActionListener<Void> listener, TimeValue timeout) {
964980
var outOfSyncEndpointsExist = responseInfo.responses.stream()
965981
.anyMatch(
@@ -975,12 +991,12 @@ && containsInferenceEndpointId(response.modelStoreResponse().inferenceId()) == f
975991
}
976992

977993
var fixOutOfSyncListener = ActionListener.<GetInferenceModelAction.Response>wrap((response) -> {
978-
var endpointsToFix = new ArrayList<ModelAndSettings>();
994+
var outOfSyncEndpoints = new ArrayList<ModelAndSettings>();
979995

980996
for (var model : response.getEndpoints()) {
981997
// If the inference id can't be found in the in memory hash map or the cluster state, then it is out of sync
982998
if (containsInferenceEndpointId(model.getInferenceEntityId()) == false) {
983-
endpointsToFix.add(
999+
outOfSyncEndpoints.add(
9841000
new ModelAndSettings(
9851001
model.getInferenceEntityId(),
9861002
new MinimalServiceSettings(
@@ -995,26 +1011,14 @@ && containsInferenceEndpointId(response.modelStoreResponse().inferenceId()) == f
9951011
}
9961012
}
9971013

998-
if (endpointsToFix.isEmpty()) {
1014+
if (outOfSyncEndpoints.isEmpty()) {
9991015
listener.onResponse(null);
10001016
return;
10011017
}
10021018

1003-
metadataTaskQueue.submitTask(
1004-
format(
1005-
"adding out of sync endpoint metadata for %s",
1006-
endpointsToFix.stream().map(ModelAndSettings::inferenceEntityId).toList()
1007-
),
1008-
new AddModelMetadataTask(
1009-
ProjectId.DEFAULT,
1010-
endpointsToFix,
1011-
ActionListener.wrap((result) -> listener.onResponse(null), e -> {
1012-
logger.atWarn().withThrowable(e).log("Failed while submitting task to fix out of sync endpoints");
1013-
listener.onResponse(null);
1014-
})
1015-
),
1016-
timeout
1017-
);
1019+
// Add the missing endpoints information from the index to the cluster state
1020+
// This only updates cluster state and not the index
1021+
submitEndpointMetadataToClusterState(outOfSyncEndpoints, listener, timeout);
10181022
}, e -> {
10191023
logger.atWarn().withThrowable(e).log("Failed to retrieve all endpoints to fix out of sync ones");
10201024
listener.onResponse(null);
@@ -1034,10 +1038,14 @@ && containsInferenceEndpointId(response.modelStoreResponse().inferenceId()) == f
10341038
* @return true if we find a match and false if not
10351039
*/
10361040
private boolean containsInferenceEndpointId(String inferenceEntityId) {
1041+
// This checks an in memory cache local to the node. The cache should be the same across all nodes as it is populated in the
1042+
// inference plugin on boot up (excluding an upgrade scenario where the plugins could be different).
1043+
// This primarily holds endpoints registered by the ElasticsearchInternalService
10371044
if (defaultConfigIds.containsKey(inferenceEntityId)) {
10381045
return true;
10391046
}
10401047

1048+
// This checks the cluster state for user created endpoints as well as EIS preconfigured endpoints
10411049
if (lastMetadata.get() != null) {
10421050
var project = lastMetadata.get().getProject(ProjectId.DEFAULT);
10431051
var state = ModelRegistryMetadata.fromState(project);
@@ -1048,6 +1056,21 @@ private boolean containsInferenceEndpointId(String inferenceEntityId) {
10481056
return false;
10491057
}
10501058

1059+
private void submitEndpointMetadataToClusterState(
1060+
ArrayList<ModelAndSettings> endpoints,
1061+
ActionListener<Void> listener,
1062+
TimeValue timeout
1063+
) {
1064+
metadataTaskQueue.submitTask(
1065+
format("adding out of sync endpoint metadata for %s", endpoints.stream().map(ModelAndSettings::inferenceEntityId).toList()),
1066+
new AddModelMetadataTask(ProjectId.DEFAULT, endpoints, ActionListener.wrap((result) -> listener.onResponse(null), e -> {
1067+
logger.atWarn().withThrowable(e).log("Failed while submitting task to fix out of sync endpoints");
1068+
listener.onResponse(null);
1069+
})),
1070+
timeout
1071+
);
1072+
}
1073+
10511074
public boolean isReady() {
10521075
if (lastMetadata.get() == null) {
10531076
return false;

0 commit comments

Comments
 (0)