Skip to content

Commit 08c250a

Browse files
committed
Exit early when no nodes service the model
Now when entering this method its guaranteed to write to index first before sending back the MLUndeploy response. And will also send back a exception if the write back fails Signed-off-by: Brian Flores <[email protected]>
1 parent 359d56f commit 08c250a

File tree

1 file changed

+19
-7
lines changed

1 file changed

+19
-7
lines changed

plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest;
4242
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction;
4343
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest;
44+
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesResponse;
4445
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsAction;
4546
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsRequest;
4647
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsResponse;
@@ -185,15 +186,20 @@ private void undeployModels(
185186
MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds);
186187
mlUndeployModelNodesRequest.setTenantId(tenantId);
187188

188-
client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(r -> {
189-
if (r.getNodes().isEmpty()) {
190-
bulkSetModelIndexToUndeploy(modelIds);
189+
client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(response -> {
190+
if (response.getNodes().isEmpty()) {
191+
bulkSetModelIndexToUndeploy(modelIds, listener, response);
192+
return;
191193
}
192-
listener.onResponse(new MLUndeployModelsResponse(r));
194+
listener.onResponse(new MLUndeployModelsResponse(response));
193195
}, listener::onFailure));
194196
}
195197

196-
private void bulkSetModelIndexToUndeploy(String[] modelIds) {
198+
private void bulkSetModelIndexToUndeploy(
199+
String[] modelIds,
200+
ActionListener<MLUndeployModelsResponse> listener,
201+
MLUndeployModelNodesResponse response
202+
) {
197203
BulkRequest bulkUpdateRequest = new BulkRequest();
198204
for (String modelId : modelIds) {
199205
UpdateRequest updateRequest = new UpdateRequest();
@@ -209,10 +215,16 @@ private void bulkSetModelIndexToUndeploy(String[] modelIds) {
209215
updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build());
210216
bulkUpdateRequest.add(updateRequest);
211217
}
218+
212219
bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
213-
log.info("No models service: {}", modelIds.toString());
214-
client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); }, e -> {
220+
log.info("No nodes service: {}", modelIds.toString());
221+
222+
client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> {
223+
log.debug("Successfully set modelIds to UNDEPLOY in index");
224+
listener.onResponse(new MLUndeployModelsResponse(response));
225+
}, e -> {
215226
log.error("Failed to set modelIds to UNDEPLOY in index", e);
227+
listener.onFailure(e);
216228
}));
217229
}
218230

0 commit comments

Comments
 (0)