Skip to content

Commit 89dab70

Browse files
authored
add more stats: connector count, connector/config index status; fix model count bug (#1181)
Signed-off-by: Yaliang Wu <[email protected]>
1 parent 5c2dc5d commit 89dab70

File tree

10 files changed

+134
-31
lines changed

10 files changed

+134
-31
lines changed

ml-algorithms/src/main/java/org/opensearch/ml/engine/encryptor/EncryptorImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
@Log4j2
3535
public class EncryptorImpl implements Encryptor {
3636

37+
public static final String MASTER_KEY_NOT_READY_ERROR = "The ML encryption master key has not been initialized yet. Please retry after waiting for 10 seconds.";
3738
private ClusterService clusterService;
3839
private Client client;
3940
private volatile String masterKey;
@@ -114,15 +115,15 @@ private void initMasterKey() {
114115
String masterKey = (String) r.getSourceAsMap().get(MASTER_KEY);
115116
this.masterKey = masterKey;
116117
} else {
117-
exceptionRef.set(new ResourceNotFoundException("ML encryption master key not initialized yet"));
118+
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
118119
}
119120
}, e -> {
120121
log.error("Failed to get ML encryption master key", e);
121122
exceptionRef.set(e);
122123
}), latch));
123124
}
124125
} else {
125-
exceptionRef.set(new ResourceNotFoundException("ML encryption master key not initialized yet"));
126+
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
126127
latch.countDown();
127128
}
128129

@@ -141,7 +142,7 @@ private void initMasterKey() {
141142
}
142143
}
143144
if (masterKey == null) {
144-
throw new ResourceNotFoundException("ML encryption master key not initialized yet");
145+
throw new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR);
145146
}
146147
}
147148
}

ml-algorithms/src/main/java/org/opensearch/ml/engine/utils/ScriptUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public static Optional<String> executePreprocessFunction(ScriptService scriptSer
3737
}
3838
return Optional.empty();
3939
}
40+
4041
public static Optional<String> executePostprocessFunction(ScriptService scriptService,
4142
String postProcessFunction,
4243
String resultJson) {

ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.opensearch.ml.common.CommonValue.CREATE_TIME_FIELD;
3232
import static org.opensearch.ml.common.CommonValue.MASTER_KEY;
3333
import static org.opensearch.ml.common.CommonValue.ML_CONFIG_INDEX;
34+
import static org.opensearch.ml.engine.encryptor.EncryptorImpl.MASTER_KEY_NOT_READY_ERROR;
3435

3536
public class EncryptorImplTest {
3637
@Rule
@@ -121,7 +122,7 @@ public void decrypt() {
121122
@Test
122123
public void encrypt_NullMasterKey_NullMasterKey_MasterKeyNotExistInIndex() {
123124
exceptionRule.expect(ResourceNotFoundException.class);
124-
exceptionRule.expectMessage("ML encryption master key not initialized yet");
125+
exceptionRule.expectMessage(MASTER_KEY_NOT_READY_ERROR);
125126

126127
doAnswer(invocation -> {
127128
ActionListener<GetResponse> listener = invocation.getArgument(1);
@@ -155,7 +156,7 @@ public void decrypt_NullMasterKey_GetMasterKey_Exception() {
155156
@Test
156157
public void decrypt_MLConfigIndexNotFound() {
157158
exceptionRule.expect(ResourceNotFoundException.class);
158-
exceptionRule.expectMessage("ML encryption master key not initialized yet");
159+
exceptionRule.expectMessage(MASTER_KEY_NOT_READY_ERROR);
159160

160161
Metadata metadata = new Metadata.Builder().indices(ImmutableMap.of()).build();
161162
when(clusterState.metadata()).thenReturn(metadata);

plugin/src/main/java/org/opensearch/ml/model/MLModelGroupManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ public void createModelGroup(MLRegisterModelGroupInput input, ActionListener<Str
7777
new IllegalArgumentException(
7878
"The name you provided is already being used by another model with ID: "
7979
+ id
80-
+ ". Please provide a different name"
80+
+ ". Please provide a different name or add \"model_group_id\": \""
81+
+ id
82+
+ "\" to request body"
8183
)
8284
);
8385
}

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.ml.plugin;
77

8+
import static org.opensearch.ml.common.CommonValue.ML_CONFIG_INDEX;
9+
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
810
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
911
import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX;
1012

@@ -293,8 +295,12 @@ public Collection<Object> createComponents(
293295
Map<Enum, MLStat<?>> stats = new ConcurrentHashMap<>();
294296
// cluster level stats
295297
stats.put(MLClusterLevelStat.ML_MODEL_INDEX_STATUS, new MLStat<>(true, new IndexStatusSupplier(indexUtils, ML_MODEL_INDEX)));
298+
stats
299+
.put(MLClusterLevelStat.ML_CONNECTOR_INDEX_STATUS, new MLStat<>(true, new IndexStatusSupplier(indexUtils, ML_CONNECTOR_INDEX)));
300+
stats.put(MLClusterLevelStat.ML_CONFIG_INDEX_STATUS, new MLStat<>(true, new IndexStatusSupplier(indexUtils, ML_CONFIG_INDEX)));
296301
stats.put(MLClusterLevelStat.ML_TASK_INDEX_STATUS, new MLStat<>(true, new IndexStatusSupplier(indexUtils, ML_TASK_INDEX)));
297302
stats.put(MLClusterLevelStat.ML_MODEL_COUNT, new MLStat<>(true, new CounterSupplier()));
303+
stats.put(MLClusterLevelStat.ML_CONNECTOR_COUNT, new MLStat<>(true, new CounterSupplier()));
298304
// node level stats
299305
stats.put(MLNodeLevelStat.ML_NODE_EXECUTING_TASK_COUNT, new MLStat<>(false, new CounterSupplier()));
300306
stats.put(MLNodeLevelStat.ML_NODE_TOTAL_REQUEST_COUNT, new MLStat<>(false, new CounterSupplier()));
@@ -452,7 +458,7 @@ public List<RestHandler> getRestHandlers(
452458
IndexNameExpressionResolver indexNameExpressionResolver,
453459
Supplier<DiscoveryNodes> nodesInCluster
454460
) {
455-
RestMLStatsAction restMLStatsAction = new RestMLStatsAction(mlStats, clusterService, indexUtils);
461+
RestMLStatsAction restMLStatsAction = new RestMLStatsAction(mlStats, clusterService, indexUtils, xContentRegistry);
456462
RestMLTrainingAction restMLTrainingAction = new RestMLTrainingAction();
457463
RestMLTrainAndPredictAction restMLTrainAndPredictAction = new RestMLTrainAndPredictAction();
458464
RestMLPredictionAction restMLPredictionAction = new RestMLPredictionAction(mlModelManager);

plugin/src/main/java/org/opensearch/ml/rest/RestMLStatsAction.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.ml.rest;
77

88
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
9+
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
910
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
1011
import static org.opensearch.ml.plugin.MachineLearningPlugin.ML_BASE_URI;
1112
import static org.opensearch.ml.utils.RestActionUtils.splitCommaSeparatedParam;
@@ -27,6 +28,7 @@
2728
import org.opensearch.cluster.service.ClusterService;
2829
import org.opensearch.core.action.ActionListener;
2930
import org.opensearch.core.rest.RestStatus;
31+
import org.opensearch.core.xcontent.NamedXContentRegistry;
3032
import org.opensearch.core.xcontent.ToXContent;
3133
import org.opensearch.core.xcontent.XContentBuilder;
3234
import org.opensearch.core.xcontent.XContentParser;
@@ -54,17 +56,26 @@ public class RestMLStatsAction extends BaseRestHandler {
5456
private MLStats mlStats;
5557
private ClusterService clusterService;
5658
private IndexUtils indexUtils;
59+
private NamedXContentRegistry xContentRegistry;
60+
private static final String QUERY_ALL_MODEL_META_DOC =
61+
"{\"query\":{\"bool\":{\"must_not\":{\"exists\":{\"field\":\"chunk_number\"}}}}}";
5762

5863
/**
5964
* Constructor
6065
* @param mlStats MLStats object
6166
* @param clusterService cluster service
6267
* @param indexUtils index util
6368
*/
64-
public RestMLStatsAction(MLStats mlStats, ClusterService clusterService, IndexUtils indexUtils) {
69+
public RestMLStatsAction(
70+
MLStats mlStats,
71+
ClusterService clusterService,
72+
IndexUtils indexUtils,
73+
NamedXContentRegistry xContentRegistry
74+
) {
6575
this.mlStats = mlStats;
6676
this.clusterService = clusterService;
6777
this.indexUtils = indexUtils;
78+
this.xContentRegistry = xContentRegistry;
6879
}
6980

7081
@Override
@@ -109,14 +120,27 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
109120
if (finalMlStatsInput.getTargetStatLevels().contains(MLStatLevel.CLUSTER)
110121
&& (finalMlStatsInput.retrieveAllClusterLevelStats()
111122
|| finalMlStatsInput.getClusterLevelStats().contains(MLClusterLevelStat.ML_MODEL_COUNT))) {
112-
indexUtils.getNumberOfDocumentsInIndex(ML_MODEL_INDEX, ActionListener.wrap(count -> {
113-
clusterStatsMap.put(MLClusterLevelStat.ML_MODEL_COUNT, count);
114-
getNodeStats(finalMlStatsInput, clusterStatsMap, client, mlStatsNodesRequest, channel);
115-
}, e -> {
116-
String errorMessage = "Failed to get ML model count";
117-
log.error(errorMessage, e);
118-
onFailure(channel, RestStatus.INTERNAL_SERVER_ERROR, errorMessage, e);
119-
}));
123+
indexUtils
124+
.getNumberOfDocumentsInIndex(
125+
ML_MODEL_INDEX,
126+
QUERY_ALL_MODEL_META_DOC,
127+
xContentRegistry,
128+
ActionListener.wrap(modelCount -> {
129+
clusterStatsMap.put(MLClusterLevelStat.ML_MODEL_COUNT, modelCount);
130+
indexUtils.getNumberOfDocumentsInIndex(ML_CONNECTOR_INDEX, ActionListener.wrap(connectorCount -> {
131+
clusterStatsMap.put(MLClusterLevelStat.ML_CONNECTOR_COUNT, connectorCount);
132+
getNodeStats(finalMlStatsInput, clusterStatsMap, client, mlStatsNodesRequest, channel);
133+
}, e -> {
134+
String errorMessage = "Failed to get ML model count";
135+
log.error(errorMessage, e);
136+
onFailure(channel, RestStatus.INTERNAL_SERVER_ERROR, errorMessage, e);
137+
}));
138+
}, e -> {
139+
String errorMessage = "Failed to get ML model count";
140+
log.error(errorMessage, e);
141+
onFailure(channel, RestStatus.INTERNAL_SERVER_ERROR, errorMessage, e);
142+
})
143+
);
120144
} else {
121145
getNodeStats(finalMlStatsInput, clusterStatsMap, client, mlStatsNodesRequest, channel);
122146
}

plugin/src/main/java/org/opensearch/ml/stats/MLClusterLevelStat.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111
*/
1212
public enum MLClusterLevelStat {
1313
ML_MODEL_INDEX_STATUS,
14+
ML_CONNECTOR_INDEX_STATUS,
15+
ML_CONFIG_INDEX_STATUS,
1416
ML_TASK_INDEX_STATUS,
15-
ML_MODEL_COUNT;
17+
ML_MODEL_COUNT,
18+
ML_CONNECTOR_COUNT;
1619

1720
public static MLClusterLevelStat from(String value) {
1821
try {

plugin/src/main/java/org/opensearch/ml/utils/IndexUtils.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,22 @@
88
import java.util.List;
99
import java.util.Locale;
1010

11+
import org.opensearch.OpenSearchStatusException;
1112
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
13+
import org.opensearch.action.search.SearchRequest;
1214
import org.opensearch.client.Client;
1315
import org.opensearch.cluster.health.ClusterIndexHealth;
1416
import org.opensearch.cluster.metadata.IndexMetadata;
1517
import org.opensearch.cluster.service.ClusterService;
1618
import org.opensearch.common.inject.Inject;
19+
import org.opensearch.common.util.concurrent.ThreadContext;
20+
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
21+
import org.opensearch.common.xcontent.XContentType;
1722
import org.opensearch.core.action.ActionListener;
23+
import org.opensearch.core.rest.RestStatus;
24+
import org.opensearch.core.xcontent.NamedXContentRegistry;
25+
import org.opensearch.core.xcontent.XContentParser;
26+
import org.opensearch.search.builder.SearchSourceBuilder;
1827

1928
public class IndexUtils {
2029
/**
@@ -96,4 +105,33 @@ public void getNumberOfDocumentsInIndex(String indexName, ActionListener<Long> l
96105
}
97106
}
98107

108+
// TODO: add connector count stats
109+
public void getNumberOfDocumentsInIndex(
110+
String indexName,
111+
String searchQuery,
112+
NamedXContentRegistry xContentRegistry,
113+
ActionListener<Long> listener
114+
) {
115+
if (clusterService.state().getRoutingTable().hasIndex(indexName)) {
116+
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
117+
SearchRequest searchRequest = new SearchRequest();
118+
XContentParser parser = XContentType.JSON
119+
.xContent()
120+
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchQuery);
121+
SearchSourceBuilder builder = SearchSourceBuilder.fromXContent(parser);
122+
builder.fetchSource(false);
123+
searchRequest.source(builder).indices(indexName);
124+
125+
client.search(searchRequest, ActionListener.runBefore(ActionListener.wrap(r -> {
126+
long count = r.getHits().getTotalHits().value;
127+
listener.onResponse(count);
128+
}, e -> { listener.onFailure(e); }), () -> context.restore()));
129+
} catch (Exception e) {
130+
throw new OpenSearchStatusException("Failed to search index " + indexName, RestStatus.BAD_REQUEST);
131+
}
132+
} else {
133+
listener.onResponse(0L);
134+
}
135+
}
136+
99137
}

plugin/src/test/java/org/opensearch/ml/model/MLModelGroupManagerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void test_SuccessAddAllBackendRolesTrue() {
129129
verify(actionListener).onResponse(argumentCaptor.capture());
130130
}
131131

132-
public void test_ModelGroupNameNotUnique() throws IOException {
132+
public void test_ModelGroupNameNotUnique() throws IOException {//
133133
SearchResponse searchResponse = createModelGroupSearchResponse(1);
134134
doAnswer(invocation -> {
135135
ActionListener<SearchResponse> listener = invocation.getArgument(1);
@@ -143,7 +143,7 @@ public void test_ModelGroupNameNotUnique() throws IOException {
143143
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
144144
verify(actionListener).onFailure(argumentCaptor.capture());
145145
assertEquals(
146-
"The name you provided is already being used by another model with ID: model_group_ID. Please provide a different name",
146+
"The name you provided is already being used by another model with ID: model_group_ID. Please provide a different name or add \"model_group_id\": \"model_group_ID\" to request body",
147147
argumentCaptor.getValue().getMessage()
148148
);
149149

0 commit comments

Comments
 (0)