1717import org .elasticsearch .action .TaskOperationFailure ;
1818import org .elasticsearch .action .support .ActionFilters ;
1919import org .elasticsearch .action .support .tasks .TransportTasksAction ;
20+ import org .elasticsearch .client .internal .Client ;
21+ import org .elasticsearch .client .internal .OriginSettingClient ;
2022import org .elasticsearch .cluster .ClusterState ;
2123import org .elasticsearch .cluster .node .DiscoveryNode ;
2224import org .elasticsearch .cluster .node .DiscoveryNodes ;
2325import org .elasticsearch .cluster .service .ClusterService ;
2426import org .elasticsearch .common .util .concurrent .EsExecutors ;
27+ import org .elasticsearch .common .xcontent .XContentHelper ;
2528import org .elasticsearch .discovery .MasterNotDiscoveredException ;
29+ import org .elasticsearch .inference .TaskType ;
2630import org .elasticsearch .ingest .IngestMetadata ;
27- import org .elasticsearch .ingest .IngestService ;
2831import org .elasticsearch .injection .guice .Inject ;
2932import org .elasticsearch .rest .RestStatus ;
3033import org .elasticsearch .tasks .CancellableTask ;
3134import org .elasticsearch .tasks .Task ;
3235import org .elasticsearch .transport .TransportResponseHandler ;
3336import org .elasticsearch .transport .TransportService ;
37+ import org .elasticsearch .xcontent .XContentType ;
38+ import org .elasticsearch .xpack .core .inference .action .GetInferenceModelAction ;
3439import org .elasticsearch .xpack .core .ml .action .StopTrainedModelDeploymentAction ;
3540import org .elasticsearch .xpack .core .ml .inference .assignment .TrainedModelAssignment ;
3641import org .elasticsearch .xpack .core .ml .inference .assignment .TrainedModelAssignmentMetadata ;
4752import java .util .Set ;
4853
4954import static org .elasticsearch .core .Strings .format ;
55+ import static org .elasticsearch .xpack .core .ClientHelper .ML_ORIGIN ;
5056import static org .elasticsearch .xpack .ml .action .TransportDeleteTrainedModelAction .getModelAliases ;
5157
5258/**
@@ -63,7 +69,7 @@ public class TransportStopTrainedModelDeploymentAction extends TransportTasksAct
6369
6470 private static final Logger logger = LogManager .getLogger (TransportStopTrainedModelDeploymentAction .class );
6571
66- private final IngestService ingestService ;
72+ private final OriginSettingClient client ;
6773 private final TrainedModelAssignmentClusterService trainedModelAssignmentClusterService ;
6874 private final InferenceAuditor auditor ;
6975
@@ -72,7 +78,7 @@ public TransportStopTrainedModelDeploymentAction(
7278 ClusterService clusterService ,
7379 TransportService transportService ,
7480 ActionFilters actionFilters ,
75- IngestService ingestService ,
81+ Client client ,
7682 TrainedModelAssignmentClusterService trainedModelAssignmentClusterService ,
7783 InferenceAuditor auditor
7884 ) {
@@ -85,7 +91,7 @@ public TransportStopTrainedModelDeploymentAction(
8591 StopTrainedModelDeploymentAction .Response ::new ,
8692 EsExecutors .DIRECT_EXECUTOR_SERVICE
8793 );
88- this .ingestService = ingestService ;
94+ this .client = new OriginSettingClient ( client , ML_ORIGIN ) ;
8995 this .trainedModelAssignmentClusterService = trainedModelAssignmentClusterService ;
9096 this .auditor = Objects .requireNonNull (auditor );
9197 }
@@ -154,21 +160,84 @@ protected void doExecute(
154160
155161 // NOTE, should only run on Master node
156162 assert clusterService .localNode ().isMasterNode ();
163+
164+ if (request .isForce () == false ) {
165+ checkIfUsedByInferenceEndpoint (
166+ request .getId (),
167+ ActionListener .wrap (canStop -> stopDeployment (task , request , maybeAssignment .get (), listener ), listener ::onFailure )
168+ );
169+ } else {
170+ stopDeployment (task , request , maybeAssignment .get (), listener );
171+ }
172+ }
173+
174+ private void stopDeployment (
175+ Task task ,
176+ StopTrainedModelDeploymentAction .Request request ,
177+ TrainedModelAssignment assignment ,
178+ ActionListener <StopTrainedModelDeploymentAction .Response > listener
179+ ) {
157180 trainedModelAssignmentClusterService .setModelAssignmentToStopping (
158181 request .getId (),
159- ActionListener .wrap (
160- setToStopping -> normalUndeploy (task , request .getId (), maybeAssignment .get (), request , listener ),
161- failure -> {
162- if (ExceptionsHelper .unwrapCause (failure ) instanceof ResourceNotFoundException ) {
163- listener .onResponse (new StopTrainedModelDeploymentAction .Response (true ));
164- return ;
165- }
166- listener .onFailure (failure );
182+ ActionListener .wrap (setToStopping -> normalUndeploy (task , request .getId (), assignment , request , listener ), failure -> {
183+ if (ExceptionsHelper .unwrapCause (failure ) instanceof ResourceNotFoundException ) {
184+ listener .onResponse (new StopTrainedModelDeploymentAction .Response (true ));
185+ return ;
167186 }
168- )
187+ listener .onFailure (failure );
188+ })
169189 );
170190 }
171191
192+ private void checkIfUsedByInferenceEndpoint (String deploymentId , ActionListener <Boolean > listener ) {
193+
194+ GetInferenceModelAction .Request getAllEndpoints = new GetInferenceModelAction .Request ("*" , TaskType .ANY );
195+ client .execute (GetInferenceModelAction .INSTANCE , getAllEndpoints , listener .delegateFailureAndWrap ((l , response ) -> {
196+ // filter by the ml node services
197+ var mlNodeEndpoints = response .getEndpoints ()
198+ .stream ()
199+ .filter (model -> model .getService ().equals ("elasticsearch" ) || model .getService ().equals ("elser" ))
200+ .toList ();
201+
202+ var endpointOwnsDeployment = mlNodeEndpoints .stream ()
203+ .filter (model -> model .getInferenceEntityId ().equals (deploymentId ))
204+ .findFirst ();
205+ if (endpointOwnsDeployment .isPresent ()) {
206+ l .onFailure (
207+ new ElasticsearchStatusException (
208+ "Cannot stop deployment [{}] as it was created by inference endpoint [{}]" ,
209+ RestStatus .CONFLICT ,
210+ deploymentId ,
211+ endpointOwnsDeployment .get ().getInferenceEntityId ()
212+ )
213+ );
214+ return ;
215+ }
216+
217+ // The inference endpoint may have been created by attaching to an existing deployment.
218+ for (var endpoint : mlNodeEndpoints ) {
219+ var serviceSettingsXContent = XContentHelper .toXContent (endpoint .getServiceSettings (), XContentType .JSON , false );
220+ var settingsMap = XContentHelper .convertToMap (serviceSettingsXContent , false , XContentType .JSON ).v2 ();
221+ // Endpoints with the deployment_id setting are attached to an existing deployment.
222+ var deploymentIdFromSettings = (String ) settingsMap .get ("deployment_id" );
223+ if (deploymentIdFromSettings != null && deploymentIdFromSettings .equals (deploymentId )) {
224+ // The endpoint was created to use this deployment
225+ l .onFailure (
226+ new ElasticsearchStatusException (
227+ "Cannot stop deployment [{}] as it is used by inference endpoint [{}]" ,
228+ RestStatus .CONFLICT ,
229+ deploymentId ,
230+ endpoint .getInferenceEntityId ()
231+ )
232+ );
233+ return ;
234+ }
235+ }
236+
237+ l .onResponse (true );
238+ }));
239+ }
240+
172241 private void redirectToMasterNode (
173242 DiscoveryNode masterNode ,
174243 StopTrainedModelDeploymentAction .Request request ,
0 commit comments