|
7 | 7 |
|
8 | 8 | import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX; |
9 | 9 |
|
| 10 | +import java.time.Instant; |
10 | 11 | import java.util.Arrays; |
11 | 12 | import java.util.List; |
12 | 13 | import java.util.stream.Collectors; |
13 | 14 |
|
14 | 15 | import org.opensearch.OpenSearchStatusException; |
15 | 16 | import org.opensearch.action.ActionRequest; |
| 17 | +import org.opensearch.action.bulk.BulkRequest; |
16 | 18 | import org.opensearch.action.search.SearchRequest; |
17 | 19 | import org.opensearch.action.search.SearchResponse; |
18 | 20 | import org.opensearch.action.support.ActionFilters; |
19 | 21 | import org.opensearch.action.support.HandledTransportAction; |
| 22 | +import org.opensearch.action.support.WriteRequest; |
| 23 | +import org.opensearch.action.update.UpdateRequest; |
20 | 24 | import org.opensearch.client.Client; |
21 | 25 | import org.opensearch.cluster.service.ClusterService; |
22 | 26 | import org.opensearch.common.inject.Inject; |
|
32 | 36 | import org.opensearch.index.query.TermsQueryBuilder; |
33 | 37 | import org.opensearch.ml.cluster.DiscoveryNodeHelper; |
34 | 38 | import org.opensearch.ml.common.MLModel; |
| 39 | +import org.opensearch.ml.common.model.MLModelState; |
35 | 40 | import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest; |
36 | 41 | import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction; |
37 | 42 | import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest; |
|
51 | 56 | import org.opensearch.transport.TransportService; |
52 | 57 |
|
53 | 58 | import com.google.common.annotations.VisibleForTesting; |
| 59 | +import com.google.common.collect.ImmutableMap; |
54 | 60 |
|
55 | 61 | import lombok.extern.log4j.Log4j2; |
56 | 62 |
|
@@ -157,10 +163,36 @@ private void undeployModels(String[] targetNodeIds, String[] modelIds, ActionLis |
157 | 163 | MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds); |
158 | 164 |
|
159 | 165 | client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(r -> { |
| 166 | + if (r.getNodes().isEmpty()) { |
| 167 | + bulkSetModelIndexToUndeploy(modelIds); |
| 168 | + } |
160 | 169 | listener.onResponse(new MLUndeployModelsResponse(r)); |
161 | 170 | }, listener::onFailure)); |
162 | 171 | } |
163 | 172 |
|
| 173 | + private void bulkSetModelIndexToUndeploy(String[] modelIds) { |
| 174 | + BulkRequest bulkUpdateRequest = new BulkRequest(); |
| 175 | + for (String modelId : modelIds) { |
| 176 | + UpdateRequest updateRequest = new UpdateRequest(); |
| 177 | + Instant now = Instant.now(); |
| 178 | + ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); |
| 179 | + builder.put(MLModel.MODEL_STATE_FIELD, MLModelState.UNDEPLOYED.name()); |
| 180 | + |
| 181 | + builder.put(MLModel.PLANNING_WORKER_NODES_FIELD, List.of()); |
| 182 | + builder.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, 0); |
| 183 | + |
| 184 | + builder.put(MLModel.LAST_UPDATED_TIME_FIELD, now.toEpochMilli()); |
| 185 | + builder.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, 0); |
| 186 | + updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build()); |
| 187 | + bulkUpdateRequest.add(updateRequest); |
| 188 | + } |
| 189 | + bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 190 | + log.info("No models service: {}", modelIds.toString()); |
| 191 | + client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); }, e -> { |
| 192 | + log.error("Failed to set modelIds to UNDEPLOY in index", e); |
| 193 | + })); |
| 194 | + } |
| 195 | + |
164 | 196 | private void validateAccess(String modelId, ActionListener<Boolean> listener) { |
165 | 197 | User user = RestActionUtils.getUserContext(client); |
166 | 198 | boolean isSuperAdmin = isSuperAdminUserWrapper(clusterService, client); |
|
0 commit comments