From e4a748174c696fee1b4c5adfc5aced2cb3bfd7fb Mon Sep 17 00:00:00 2001 From: dan-rubinstein Date: Thu, 29 May 2025 11:08:18 -0400 Subject: [PATCH 1/3] Allow timeout during trained model download process --- .../ModelDeploymentTimeoutException.java | 2 +- .../BaseElasticsearchInternalService.java | 53 ++++++++++++++----- ...portStartTrainedModelDeploymentAction.java | 2 +- .../inference/InferenceWaitForAllocation.java | 2 +- .../TrainedModelAssignmentService.java | 1 + 5 files changed, 43 insertions(+), 17 deletions(-) rename x-pack/plugin/{ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment => core/src/main/java/org/elasticsearch/xpack/core/ml/inference}/ModelDeploymentTimeoutException.java (90%) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/ModelDeploymentTimeoutException.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/inference/ModelDeploymentTimeoutException.java similarity index 90% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/ModelDeploymentTimeoutException.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/inference/ModelDeploymentTimeoutException.java index 13d16515866e0..8f6625f5802ae 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/ModelDeploymentTimeoutException.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/inference/ModelDeploymentTimeoutException.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.ml.inference.assignment; +package org.elasticsearch.xpack.core.ml.inference; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.rest.RestStatus; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java index a6823d65da107..e85b294c1934d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -22,6 +23,7 @@ import org.elasticsearch.inference.InputType; import org.elasticsearch.inference.Model; import org.elasticsearch.inference.TaskType; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; @@ -29,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.action.StopTrainedModelDeploymentAction; +import org.elasticsearch.xpack.core.ml.inference.ModelDeploymentTimeoutException; import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput; import org.elasticsearch.xpack.core.ml.inference.TrainedModelPrefixStrings; @@ -41,12 +44,14 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ClientHelper.INFERENCE_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public abstract class BaseElasticsearchInternalService implements InferenceService { protected final OriginSettingClient client; + protected final ThreadPool threadPool; protected final ExecutorService inferenceExecutor; protected final Consumer> preferredModelVariantFn; private final ClusterService clusterService; @@ -60,6 +65,7 @@ public enum PreferredModelVariant { public BaseElasticsearchInternalService(InferenceServiceExtension.InferenceServiceFactoryContext context) { this.client = new OriginSettingClient(context.client(), ClientHelper.INFERENCE_ORIGIN); + this.threadPool = context.threadPool(); this.inferenceExecutor = context.threadPool().executor(InferencePlugin.UTILITY_THREAD_POOL_NAME); this.preferredModelVariantFn = this::preferredVariantFromPlatformArchitecture; this.clusterService = context.clusterService(); @@ -75,6 +81,7 @@ public BaseElasticsearchInternalService( Consumer> preferredModelVariantFn ) { this.client = new OriginSettingClient(context.client(), ClientHelper.INFERENCE_ORIGIN); + this.threadPool = context.threadPool(); this.inferenceExecutor = context.threadPool().executor(InferencePlugin.UTILITY_THREAD_POOL_NAME); this.preferredModelVariantFn = preferredModelVariantFn; this.clusterService = context.clusterService(); @@ -96,20 +103,38 @@ public void start(Model model, TimeValue timeout, ActionListener finalL return; } - SubscribableListener.newForked(forkedListener -> { isBuiltinModelPut(model, forkedListener); }) - .andThen((l, modelConfigExists) -> { - if (modelConfigExists == false) { - putModel(model, l); - } else { - l.onResponse(true); - } - }) - .andThen((l2, modelDidPut) -> { - var startRequest = esModel.getStartTrainedModelDeploymentActionRequest(timeout); - var responseListener = esModel.getCreateTrainedModelAssignmentActionListener(model, l2); - client.execute(StartTrainedModelDeploymentAction.INSTANCE, startRequest, responseListener); - }) - .addListener(finalListener); + // instead of a subscribably listener, use some wait to wait for the first one. + var subscribableListener = SubscribableListener.newForked( + forkedListener -> { isBuiltinModelPut(model, forkedListener); } + ).andThen((l, modelConfigExists) -> { + if (modelConfigExists == false) { + putModel(model, l); + } else { + l.onResponse(true); + } + }).andThen((l2, modelDidPut) -> { + var startRequest = esModel.getStartTrainedModelDeploymentActionRequest(timeout); + var responseListener = esModel.getCreateTrainedModelAssignmentActionListener(model, l2); + client.execute(StartTrainedModelDeploymentAction.INSTANCE, startRequest, responseListener); + }); + subscribableListener.addTimeout(timeout, threadPool, inferenceExecutor); + subscribableListener.addListener(finalListener.delegateResponse((l, e) -> { + if (e instanceof ElasticsearchTimeoutException) { + l.onFailure( + new ModelDeploymentTimeoutException( + format( + "Timed out after [%s] waiting for trained model deployment for inference endpoint [%s] to start. " + + "Use the trained model stats API to track the state of the deployment " + + "and try again once it has started.", + timeout, + model.getInferenceEntityId() + ) + ) + ); + } else { + l.onFailure(e); + } + })); } else { finalListener.onFailure(notElasticsearchModelException(model)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java index f7d83dfd62145..5dd2001ab6e64 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.TaskParams; +import org.elasticsearch.xpack.core.ml.inference.ModelDeploymentTimeoutException; import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.TrainedModelType; import org.elasticsearch.xpack.core.ml.inference.assignment.AllocationStatus; @@ -64,7 +65,6 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.TransportVersionUtils; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.inference.assignment.ModelDeploymentTimeoutException; import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentService; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelDefinitionDoc; import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java index a0c57c9b3abc4..4e42f172e9879 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/InferenceWaitForAllocation.java @@ -16,12 +16,12 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.action.InferModelAction; +import org.elasticsearch.xpack.core.ml.inference.ModelDeploymentTimeoutException; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState; import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignmentMetadata; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.inference.assignment.ModelDeploymentTimeoutException; import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentService; import java.util.HashMap; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentService.java index cdb3e1f6fa64f..12f402ff18f9f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentService.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAssignmentAction; import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAssignmentAction; import org.elasticsearch.xpack.core.ml.action.UpdateTrainedModelAssignmentRoutingInfoAction; +import org.elasticsearch.xpack.core.ml.inference.ModelDeploymentTimeoutException; import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignmentMetadata; From 1af3137a505c306da3c5c6277f35790bbaef06c8 Mon Sep 17 00:00:00 2001 From: Dan Rubinstein Date: Thu, 5 Jun 2025 13:45:43 -0400 Subject: [PATCH 2/3] Update docs/changelog/129003.yaml --- docs/changelog/129003.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/129003.yaml diff --git a/docs/changelog/129003.yaml b/docs/changelog/129003.yaml new file mode 100644 index 0000000000000..0cb98105a7dca --- /dev/null +++ b/docs/changelog/129003.yaml @@ -0,0 +1,5 @@ +pr: 129003 +summary: Allow timeout during trained model download process +area: Machine Learning +type: bug +issues: [] From bf2d6c5809fd807c35984f44e58b92d11669e85b Mon Sep 17 00:00:00 2001 From: dan-rubinstein Date: Wed, 2 Jul 2025 10:12:00 -0400 Subject: [PATCH 3/3] Update timeout message --- .../elasticsearch/BaseElasticsearchInternalService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java index e85b294c1934d..53e859b7f7a4d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elasticsearch/BaseElasticsearchInternalService.java @@ -124,8 +124,8 @@ public void start(Model model, TimeValue timeout, ActionListener finalL new ModelDeploymentTimeoutException( format( "Timed out after [%s] waiting for trained model deployment for inference endpoint [%s] to start. " - + "Use the trained model stats API to track the state of the deployment " - + "and try again once it has started.", + + "The inference endpoint can not be used to perform inference until the deployment has started. " + + "Use the trained model stats API to track the state of the deployment.", timeout, model.getInferenceEntityId() )