Skip to content

Conversation

@jonathan-buttner
Copy link
Contributor

This PR adds functionality to the ModelRegistry to store multiple inference endpoints at the same time by using a bulk index operation. This will be useful for when the master handles polling EIS for the authorized preconfigured endpoints so that it can create the new ones in one operation.

This doesn't leverage the ability to create multiple endpoints (beyond the storeModel using it internally). It will be used in a follow up PR.

@jonathan-buttner jonathan-buttner added >non-issue :ml Machine learning Team:ML Meta label for the ML team v9.3.0 labels Oct 14, 2025
new ElasticsearchStatusException(
"Inference endpoint [{}] already exists",
RestStatus.BAD_REQUEST,
failureItem.failureCause,
Copy link
Contributor Author

@jonathan-buttner jonathan-buttner Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this from a ResourceAlreadyExistsException so we could include the cause, but maybe we don't want to include it 🤷‍♂️


private void updateClusterState(List<Model> models, ActionListener<AcknowledgedResponse> listener, TimeValue timeout) {
var inferenceIdsSet = models.stream().map(Model::getInferenceEntityId).collect(Collectors.toSet());
var storeListener = listener.delegateResponse((delegate, exc) -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to this instead of creating an anonymous class.

@jonathan-buttner jonathan-buttner marked this pull request as ready for review October 17, 2025 12:51
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

Copy link
Contributor

@DonalEvans DonalEvans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related to this PR, but it seems like a lot of the tests in ModelRegistryTests could/should be moved to ModelRegistryIT since they're integration tests rather than unit tests (at least, as far as I understand those terms to be defined).

};

var secretsItem = bulkItems[i + 1];
var secretsStoreResponse = createModelStoreResponse(secretsItem, docIdToInferenceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth doing some kind of check that the inference ID for the secrets item matches the inference ID for the configuration item? Or is it not possible to lose items from part way through the bulk response, only at the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need the check for not getting an even number of responses. I was mostly trying to create a nicer error message in the very very unlikely event that it happened. I'll add an assertion that the inference IDs are the same.

return;
}

var failureItem = firstFailureResponse.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When creating multiple inference endpoints in one go, it's possible that there may be multiple failures with multiple reasons. Would it be better to report all of the failures in the exception? It could be misleading if we only report the first endpoint that failed to be created when in fact none of them were created. Also, what would be the correct way to handle the case where one of the endpoints wasn't created because it already exists (400 error), but another wasn't created due to some other issue not caused by VersionConflictEngineException (500 error)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is within the storeModel() method. It only allows one endpoint to be created. The previous behavior to this PR for the storeModel() method was to retrieve the first failure (at most there will be 2 failures, one for each of the bulk items).

Your point about combining the errors is a good idea though. I'd rather create an issue and address that later though if that's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The storeModels() does report each failure/success but it does it using the onResponse() of the listener and returns a list of responses which includes the rest status and an exception if one occurred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what would be the correct way to handle the case where one of the endpoints wasn't created because it already exists (400 error), but another wasn't created due to some other issue not caused by VersionConflictEngineException (500 error)?

In this situation, VersionConflictEngineException represents the case where the endpoint already exists. The change I made here was to return a raw ElasticsearchStatusException instead of a ResourceExistsException. The reason I chose that was so we could return the actual cause but we don't necessarily need to do that. I figured that might be more informative but maybe it's unnecessarily information for the user.

I believe VersionConflictEngineException could also occur if we were trying to do an update and sequence number we're using is incorrect (some other request occurred before ours). I don't think we need to handle that scenario in this case though, that'd be in the update flow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my mistake, I missed that this was the case where there was only one endpoint. Combining the errors doesn't need to be done as part of this PR.

@jonathan-buttner
Copy link
Contributor Author

Not strictly related to this PR, but it seems like a lot of the tests in ModelRegistryTests could/should be moved to ModelRegistryIT since they're integration tests rather than unit tests (at least, as far as I understand those terms to be defined).

I took a stab at moving over the tests to ModelRegistryIT. I agree that most of them should live in that file now. I left a few that didn't seem to be leveraging elasticsearch and needed package private access to ModelRegistry.

format("Storing inference endpoint [%s] failed, no items were received from the bulk response", inferenceEntityId)
);
var inferenceEntityIds = String.join(", ", models.stream().map(Model::getInferenceEntityId).toList());
logger.warn("Storing inference endpoints [{}] failed, no items were received from the bulk response", inferenceEntityIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check that the bulk request or models list is not empty in the storeModels function and trivially return success if that is the case. Otherwise an empty request would return a 500 error code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call 👍

var inferenceEntityId = model.getInferenceEntityId();
var docIdToInferenceId = models.stream()
.collect(Collectors.toMap(m -> Model.documentId(m.getInferenceEntityId()), Model::getInferenceEntityId, (id1, id2) -> {
logger.warn("Encountered duplicate inference ids when storing endpoints: [{}]", id1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid one config overwriting another (or throwing a version conflict exception) the check for duplicate Ids should be performed before indexing. The storeModels function is called automatically by internal code and we want it to be resilient so maybe filter out the duplicates if the id and model config are exactly the same.


var storageResponses = responses.stream().map(StoreResponseWithIndexInfo::modelStoreResponse).toList();

deleteModels(inferenceIdsToBeRemoved, ActionListener.running(() -> delegate.onResponse(storageResponses)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for inferenceIdsToBeRemoved to be an empty set and in which case can deleteModels be skipped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll have deleteModels return early if the set is empty.

Copy link
Member

@davidkyle davidkyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jonathan-buttner jonathan-buttner enabled auto-merge (squash) October 29, 2025 12:51
@jonathan-buttner jonathan-buttner merged commit 22fe5be into elastic:main Oct 29, 2025
34 checks passed
shmuelhanoch pushed a commit to shmuelhanoch/elasticsearch that referenced this pull request Oct 29, 2025
* Adding bulk storage of multiple models

* Adding tests

* Adding log for duplicate ids

* [CI] Auto commit changes from spotless

* Removing unused code

* Removing constructor

* Adding more tests

* Adding in logic to delete models when a failure occurs

* revert rename changes

* formatting

* Starting on feedback

* Improving tests

* Moving most tests to ModelRegistryIT

* [CI] Auto commit changes from spotless

* Fixing test

* Removing duplicate tests

* Handling empty list and duplicates

* Fixing empty delete

---------

Co-authored-by: elasticsearchmachine <[email protected]>
chrisparrinello pushed a commit to chrisparrinello/elasticsearch that referenced this pull request Nov 3, 2025
* Adding bulk storage of multiple models

* Adding tests

* Adding log for duplicate ids

* [CI] Auto commit changes from spotless

* Removing unused code

* Removing constructor

* Adding more tests

* Adding in logic to delete models when a failure occurs

* revert rename changes

* formatting

* Starting on feedback

* Improving tests

* Moving most tests to ModelRegistryIT

* [CI] Auto commit changes from spotless

* Fixing test

* Removing duplicate tests

* Handling empty list and duplicates

* Fixing empty delete

---------

Co-authored-by: elasticsearchmachine <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:ml Machine learning >non-issue Team:ML Meta label for the ML team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants