2525import org .opensearch .action .support .ActionFilters ;
2626import org .opensearch .action .support .HandledTransportAction ;
2727import org .opensearch .action .support .WriteRequest ;
28- import org .opensearch .client .Client ;
2928import org .opensearch .cluster .service .ClusterService ;
3029import org .opensearch .common .inject .Inject ;
3130import org .opensearch .common .settings .Settings ;
6665import org .opensearch .tasks .Task ;
6766import org .opensearch .threadpool .ThreadPool ;
6867import org .opensearch .transport .TransportService ;
68+ import org .opensearch .transport .client .Client ;
6969
7070import com .google .common .annotations .VisibleForTesting ;
7171
@@ -218,6 +218,7 @@ private void undeployModels(
218218 return modelCacheMissForModelIds ;
219219 });
220220 if (response .getNodes ().isEmpty () || modelNotFoundInNodesCache ) {
221+ log .warn ("No nodes service these models, performing manual `UNDEPLOY` write to model index" );
221222 bulkSetModelIndexToUndeploy (modelIds , tenantId , listener , response );
222223 return ;
223224 }
@@ -227,10 +228,10 @@ private void undeployModels(
227228 }
228229
229230 private void bulkSetModelIndexToUndeploy (
230- String [] modelIds ,
231- String tenantId ,
232- ActionListener <MLUndeployModelsResponse > listener ,
233- MLUndeployModelNodesResponse mlUndeployModelNodesResponse
231+ String [] modelIds ,
232+ String tenantId ,
233+ ActionListener <MLUndeployModelsResponse > listener ,
234+ MLUndeployModelNodesResponse mlUndeployModelNodesResponse
234235 ) {
235236 BulkDataObjectRequest bulkRequest = BulkDataObjectRequest .builder ().globalIndex (ML_MODEL_INDEX ).build ();
236237
@@ -245,11 +246,11 @@ private void bulkSetModelIndexToUndeploy(
245246 updateDocument .put (MLModel .CURRENT_WORKER_NODE_COUNT_FIELD , 0 );
246247
247248 UpdateDataObjectRequest updateRequest = UpdateDataObjectRequest
248- .builder ()
249- .id (modelId )
250- .tenantId (tenantId )
251- .dataObject (updateDocument )
252- .build ();
249+ .builder ()
250+ .id (modelId )
251+ .tenantId (tenantId )
252+ .dataObject (updateDocument )
253+ .build ();
253254 bulkRequest .add (updateRequest ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
254255 }
255256
@@ -263,12 +264,12 @@ private void bulkSetModelIndexToUndeploy(
263264 listenerWithContextRestoration .onResponse (new MLUndeployModelsResponse (mlUndeployModelNodesResponse ));
264265 }, e -> {
265266 String modelsNotFoundMessage = String
266- .format ("Failed to set the following modelId(s) to UNDEPLOY in index: %s" , Arrays .toString (modelIds ));
267+ .format ("Failed to set the following modelId(s) to UNDEPLOY in index: %s" , Arrays .toString (modelIds ));
267268 log .error (modelsNotFoundMessage , e );
268269
269270 OpenSearchStatusException exception = new OpenSearchStatusException (
270- modelsNotFoundMessage + e .getMessage (),
271- RestStatus .INTERNAL_SERVER_ERROR
271+ modelsNotFoundMessage + e .getMessage (),
272+ RestStatus .INTERNAL_SERVER_ERROR
272273 );
273274 listenerWithContextRestoration .onFailure (exception );
274275 });
@@ -283,19 +284,24 @@ private void bulkSetModelIndexToUndeploy(
283284 try {
284285 BulkResponse bulkResponse = BulkResponse .fromXContent (response .parser ());
285286 log
286- .info (
287- "Executed {} bulk operations with {} failures, Took: {}" ,
288- bulkResponse .getItems ().length ,
289- bulkResponse .hasFailures ()
290- ? Arrays .stream (bulkResponse .getItems ()).filter (BulkItemResponse ::isFailed ).count ()
291- : 0 ,
292- bulkResponse .getTook ()
293- );
294- List <String > unemployedModelIds = Arrays .stream (bulkResponse .getItems ())
295- .filter (bulkItemResponse -> !bulkItemResponse .isFailed ())
296- .map (BulkItemResponse ::getId )
297- .collect (Collectors .toList ());
298- log .debug ("Successfully set the following modelId(s) to UNDEPLOY in index: {}" , Arrays .toString (unemployedModelIds .toArray ()));
287+ .info (
288+ "Executed {} bulk operations with {} failures, Took: {}" ,
289+ bulkResponse .getItems ().length ,
290+ bulkResponse .hasFailures ()
291+ ? Arrays .stream (bulkResponse .getItems ()).filter (BulkItemResponse ::isFailed ).count ()
292+ : 0 ,
293+ bulkResponse .getTook ()
294+ );
295+ List <String > unemployedModelIds = Arrays
296+ .stream (bulkResponse .getItems ())
297+ .filter (bulkItemResponse -> !bulkItemResponse .isFailed ())
298+ .map (BulkItemResponse ::getId )
299+ .collect (Collectors .toList ());
300+ log
301+ .debug (
302+ "Successfully set the following modelId(s) to UNDEPLOY in index: {}" ,
303+ Arrays .toString (unemployedModelIds .toArray ())
304+ );
299305
300306 bulkResponseListener .onResponse (bulkResponse );
301307 } catch (Exception e ) {
0 commit comments