|
7 | 7 |
|
8 | 8 | import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX; |
9 | 9 |
|
| 10 | +import java.time.Instant; |
10 | 11 | import java.util.Arrays; |
11 | 12 | import java.util.List; |
12 | 13 | import java.util.stream.Collectors; |
13 | 14 |
|
14 | 15 | import org.opensearch.ExceptionsHelper; |
15 | 16 | import org.opensearch.OpenSearchStatusException; |
16 | 17 | import org.opensearch.action.ActionRequest; |
| 18 | +import org.opensearch.action.bulk.BulkRequest; |
17 | 19 | import org.opensearch.action.search.SearchRequest; |
18 | 20 | import org.opensearch.action.search.SearchResponse; |
19 | 21 | import org.opensearch.action.support.ActionFilters; |
20 | 22 | import org.opensearch.action.support.HandledTransportAction; |
| 23 | +import org.opensearch.action.support.WriteRequest; |
| 24 | +import org.opensearch.action.update.UpdateRequest; |
21 | 25 | import org.opensearch.client.Client; |
22 | 26 | import org.opensearch.cluster.service.ClusterService; |
23 | 27 | import org.opensearch.common.inject.Inject; |
|
33 | 37 | import org.opensearch.index.query.TermsQueryBuilder; |
34 | 38 | import org.opensearch.ml.cluster.DiscoveryNodeHelper; |
35 | 39 | import org.opensearch.ml.common.MLModel; |
| 40 | +import org.opensearch.ml.common.model.MLModelState; |
36 | 41 | import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest; |
37 | 42 | import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction; |
38 | 43 | import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest; |
|
57 | 62 | import org.opensearch.transport.TransportService; |
58 | 63 |
|
59 | 64 | import com.google.common.annotations.VisibleForTesting; |
| 65 | +import com.google.common.collect.ImmutableMap; |
60 | 66 |
|
61 | 67 | import lombok.extern.log4j.Log4j2; |
62 | 68 |
|
@@ -180,10 +186,36 @@ private void undeployModels( |
180 | 186 | mlUndeployModelNodesRequest.setTenantId(tenantId); |
181 | 187 |
|
182 | 188 | client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(r -> { |
| 189 | + if (r.getNodes().isEmpty()) { |
| 190 | + bulkSetModelIndexToUndeploy(modelIds); |
| 191 | + } |
183 | 192 | listener.onResponse(new MLUndeployModelsResponse(r)); |
184 | 193 | }, listener::onFailure)); |
185 | 194 | } |
186 | 195 |
|
| 196 | + private void bulkSetModelIndexToUndeploy(String[] modelIds) { |
| 197 | + BulkRequest bulkUpdateRequest = new BulkRequest(); |
| 198 | + for (String modelId : modelIds) { |
| 199 | + UpdateRequest updateRequest = new UpdateRequest(); |
| 200 | + Instant now = Instant.now(); |
| 201 | + ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); |
| 202 | + builder.put(MLModel.MODEL_STATE_FIELD, MLModelState.UNDEPLOYED.name()); |
| 203 | + |
| 204 | + builder.put(MLModel.PLANNING_WORKER_NODES_FIELD, List.of()); |
| 205 | + builder.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, 0); |
| 206 | + |
| 207 | + builder.put(MLModel.LAST_UPDATED_TIME_FIELD, now.toEpochMilli()); |
| 208 | + builder.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, 0); |
| 209 | + updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build()); |
| 210 | + bulkUpdateRequest.add(updateRequest); |
| 211 | + } |
| 212 | + bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 213 | + log.info("No models service: {}", modelIds.toString()); |
| 214 | + client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); }, e -> { |
| 215 | + log.error("Failed to set modelIds to UNDEPLOY in index", e); |
| 216 | + })); |
| 217 | + } |
| 218 | + |
187 | 219 | private void validateAccess(String modelId, String tenantId, ActionListener<Boolean> listener) { |
188 | 220 | User user = RestActionUtils.getUserContext(client); |
189 | 221 | boolean isSuperAdmin = isSuperAdminUserWrapper(clusterService, client); |
|
0 commit comments