1010import org .apache .logging .log4j .LogManager ;
1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .ElasticsearchStatusException ;
13+ import org .elasticsearch .ElasticsearchTimeoutException ;
1314import org .elasticsearch .ExceptionsHelper ;
1415import org .elasticsearch .ResourceNotFoundException ;
1516import org .elasticsearch .action .ActionListener ;
2223import org .elasticsearch .inference .InputType ;
2324import org .elasticsearch .inference .Model ;
2425import org .elasticsearch .inference .TaskType ;
26+ import org .elasticsearch .threadpool .ThreadPool ;
2527import org .elasticsearch .xpack .core .ClientHelper ;
2628import org .elasticsearch .xpack .core .ml .MachineLearningField ;
2729import org .elasticsearch .xpack .core .ml .action .GetTrainedModelsAction ;
2830import org .elasticsearch .xpack .core .ml .action .InferModelAction ;
2931import org .elasticsearch .xpack .core .ml .action .PutTrainedModelAction ;
3032import org .elasticsearch .xpack .core .ml .action .StartTrainedModelDeploymentAction ;
3133import org .elasticsearch .xpack .core .ml .action .StopTrainedModelDeploymentAction ;
34+ import org .elasticsearch .xpack .core .ml .inference .ModelDeploymentTimeoutException ;
3235import org .elasticsearch .xpack .core .ml .inference .TrainedModelConfig ;
3336import org .elasticsearch .xpack .core .ml .inference .TrainedModelInput ;
3437import org .elasticsearch .xpack .core .ml .inference .TrainedModelPrefixStrings ;
4144import java .util .concurrent .ExecutorService ;
4245import java .util .function .Consumer ;
4346
47+ import static org .elasticsearch .core .Strings .format ;
4448import static org .elasticsearch .xpack .core .ClientHelper .INFERENCE_ORIGIN ;
4549import static org .elasticsearch .xpack .core .ClientHelper .executeAsyncWithOrigin ;
4650
4751public abstract class BaseElasticsearchInternalService implements InferenceService {
4852
4953 protected final OriginSettingClient client ;
54+ protected final ThreadPool threadPool ;
5055 protected final ExecutorService inferenceExecutor ;
5156 protected final Consumer <ActionListener <PreferredModelVariant >> preferredModelVariantFn ;
5257 private final ClusterService clusterService ;
@@ -60,6 +65,7 @@ public enum PreferredModelVariant {
6065
6166 public BaseElasticsearchInternalService (InferenceServiceExtension .InferenceServiceFactoryContext context ) {
6267 this .client = new OriginSettingClient (context .client (), ClientHelper .INFERENCE_ORIGIN );
68+ this .threadPool = context .threadPool ();
6369 this .inferenceExecutor = context .threadPool ().executor (InferencePlugin .UTILITY_THREAD_POOL_NAME );
6470 this .preferredModelVariantFn = this ::preferredVariantFromPlatformArchitecture ;
6571 this .clusterService = context .clusterService ();
@@ -75,6 +81,7 @@ public BaseElasticsearchInternalService(
7581 Consumer <ActionListener <PreferredModelVariant >> preferredModelVariantFn
7682 ) {
7783 this .client = new OriginSettingClient (context .client (), ClientHelper .INFERENCE_ORIGIN );
84+ this .threadPool = context .threadPool ();
7885 this .inferenceExecutor = context .threadPool ().executor (InferencePlugin .UTILITY_THREAD_POOL_NAME );
7986 this .preferredModelVariantFn = preferredModelVariantFn ;
8087 this .clusterService = context .clusterService ();
@@ -96,20 +103,38 @@ public void start(Model model, TimeValue timeout, ActionListener<Boolean> finalL
96103 return ;
97104 }
98105
99- SubscribableListener .<Boolean >newForked (forkedListener -> { isBuiltinModelPut (model , forkedListener ); })
100- .<Boolean >andThen ((l , modelConfigExists ) -> {
101- if (modelConfigExists == false ) {
102- putModel (model , l );
103- } else {
104- l .onResponse (true );
105- }
106- })
107- .<Boolean >andThen ((l2 , modelDidPut ) -> {
108- var startRequest = esModel .getStartTrainedModelDeploymentActionRequest (timeout );
109- var responseListener = esModel .getCreateTrainedModelAssignmentActionListener (model , l2 );
110- client .execute (StartTrainedModelDeploymentAction .INSTANCE , startRequest , responseListener );
111- })
112- .addListener (finalListener );
106+ // instead of a subscribably listener, use some wait to wait for the first one.
107+ var subscribableListener = SubscribableListener .<Boolean >newForked (
108+ forkedListener -> { isBuiltinModelPut (model , forkedListener ); }
109+ ).<Boolean >andThen ((l , modelConfigExists ) -> {
110+ if (modelConfigExists == false ) {
111+ putModel (model , l );
112+ } else {
113+ l .onResponse (true );
114+ }
115+ }).<Boolean >andThen ((l2 , modelDidPut ) -> {
116+ var startRequest = esModel .getStartTrainedModelDeploymentActionRequest (timeout );
117+ var responseListener = esModel .getCreateTrainedModelAssignmentActionListener (model , l2 );
118+ client .execute (StartTrainedModelDeploymentAction .INSTANCE , startRequest , responseListener );
119+ });
120+ subscribableListener .addTimeout (timeout , threadPool , inferenceExecutor );
121+ subscribableListener .addListener (finalListener .delegateResponse ((l , e ) -> {
122+ if (e instanceof ElasticsearchTimeoutException ) {
123+ l .onFailure (
124+ new ModelDeploymentTimeoutException (
125+ format (
126+ "Timed out after [%s] waiting for trained model deployment for inference endpoint [%s] to start. "
127+ + "The inference endpoint can not be used to perform inference until the deployment has started. "
128+ + "Use the trained model stats API to track the state of the deployment." ,
129+ timeout ,
130+ model .getInferenceEntityId ()
131+ )
132+ )
133+ );
134+ } else {
135+ l .onFailure (e );
136+ }
137+ }));
113138
114139 } else {
115140 finalListener .onFailure (notElasticsearchModelException (model ));
0 commit comments