2525import  org .opensearch .action .support .ActionFilters ;
2626import  org .opensearch .action .support .HandledTransportAction ;
2727import  org .opensearch .action .support .WriteRequest ;
28- import  org .opensearch .client .Client ;
2928import  org .opensearch .cluster .service .ClusterService ;
3029import  org .opensearch .common .inject .Inject ;
3130import  org .opensearch .common .settings .Settings ;
6665import  org .opensearch .tasks .Task ;
6766import  org .opensearch .threadpool .ThreadPool ;
6867import  org .opensearch .transport .TransportService ;
68+ import  org .opensearch .transport .client .Client ;
6969
7070import  com .google .common .annotations .VisibleForTesting ;
7171
@@ -218,6 +218,7 @@ private void undeployModels(
218218                return  modelCacheMissForModelIds ;
219219            });
220220            if  (response .getNodes ().isEmpty () || modelNotFoundInNodesCache ) {
221+                 log .warn ("No nodes service these models, performing manual `UNDEPLOY` write to model index" );
221222                bulkSetModelIndexToUndeploy (modelIds , tenantId , listener , response );
222223                return ;
223224            }
@@ -227,10 +228,10 @@ private void undeployModels(
227228    }
228229
229230    private  void  bulkSetModelIndexToUndeploy (
230-              String [] modelIds ,
231-              String  tenantId ,
232-              ActionListener <MLUndeployModelsResponse > listener ,
233-              MLUndeployModelNodesResponse  mlUndeployModelNodesResponse 
231+         String [] modelIds ,
232+         String  tenantId ,
233+         ActionListener <MLUndeployModelsResponse > listener ,
234+         MLUndeployModelNodesResponse  mlUndeployModelNodesResponse 
234235    ) {
235236        BulkDataObjectRequest  bulkRequest  = BulkDataObjectRequest .builder ().globalIndex (ML_MODEL_INDEX ).build ();
236237
@@ -245,11 +246,11 @@ private void bulkSetModelIndexToUndeploy(
245246            updateDocument .put (MLModel .CURRENT_WORKER_NODE_COUNT_FIELD , 0 );
246247
247248            UpdateDataObjectRequest  updateRequest  = UpdateDataObjectRequest 
248-                      .builder ()
249-                      .id (modelId )
250-                      .tenantId (tenantId )
251-                      .dataObject (updateDocument )
252-                      .build ();
249+                 .builder ()
250+                 .id (modelId )
251+                 .tenantId (tenantId )
252+                 .dataObject (updateDocument )
253+                 .build ();
253254            bulkRequest .add (updateRequest ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
254255        }
255256
@@ -263,12 +264,12 @@ private void bulkSetModelIndexToUndeploy(
263264                listenerWithContextRestoration .onResponse (new  MLUndeployModelsResponse (mlUndeployModelNodesResponse ));
264265            }, e  -> {
265266                String  modelsNotFoundMessage  = String 
266-                          .format ("Failed to set the following modelId(s) to UNDEPLOY in index: %s" , Arrays .toString (modelIds ));
267+                     .format ("Failed to set the following modelId(s) to UNDEPLOY in index: %s" , Arrays .toString (modelIds ));
267268                log .error (modelsNotFoundMessage , e );
268269
269270                OpenSearchStatusException  exception  = new  OpenSearchStatusException (
270-                          modelsNotFoundMessage  + e .getMessage (),
271-                          RestStatus .INTERNAL_SERVER_ERROR 
271+                     modelsNotFoundMessage  + e .getMessage (),
272+                     RestStatus .INTERNAL_SERVER_ERROR 
272273                );
273274                listenerWithContextRestoration .onFailure (exception );
274275            });
@@ -283,19 +284,24 @@ private void bulkSetModelIndexToUndeploy(
283284                try  {
284285                    BulkResponse  bulkResponse  = BulkResponse .fromXContent (response .parser ());
285286                    log 
286-                             .info (
287-                                     "Executed {} bulk operations with {} failures, Took: {}" ,
288-                                     bulkResponse .getItems ().length ,
289-                                     bulkResponse .hasFailures ()
290-                                             ? Arrays .stream (bulkResponse .getItems ()).filter (BulkItemResponse ::isFailed ).count ()
291-                                             : 0 ,
292-                                     bulkResponse .getTook ()
293-                             );
294-                     List <String > unemployedModelIds  = Arrays .stream (bulkResponse .getItems ())
295-                             .filter (bulkItemResponse  -> !bulkItemResponse .isFailed ())
296-                             .map (BulkItemResponse ::getId )
297-                             .collect (Collectors .toList ());
298-                     log .debug ("Successfully set the following modelId(s) to UNDEPLOY in index: {}" , Arrays .toString (unemployedModelIds .toArray ()));
287+                         .info (
288+                             "Executed {} bulk operations with {} failures, Took: {}" ,
289+                             bulkResponse .getItems ().length ,
290+                             bulkResponse .hasFailures ()
291+                                 ? Arrays .stream (bulkResponse .getItems ()).filter (BulkItemResponse ::isFailed ).count ()
292+                                 : 0 ,
293+                             bulkResponse .getTook ()
294+                         );
295+                     List <String > unemployedModelIds  = Arrays 
296+                         .stream (bulkResponse .getItems ())
297+                         .filter (bulkItemResponse  -> !bulkItemResponse .isFailed ())
298+                         .map (BulkItemResponse ::getId )
299+                         .collect (Collectors .toList ());
300+                     log 
301+                         .debug (
302+                             "Successfully set the following modelId(s) to UNDEPLOY in index: {}" ,
303+                             Arrays .toString (unemployedModelIds .toArray ())
304+                         );
299305
300306                    bulkResponseListener .onResponse (bulkResponse );
301307                } catch  (Exception  e ) {
0 commit comments