Skip to content

Commit 831fc95

Browse files
add acknowledge check for index creation in missing places (#2715) (#2721)
* add acknowledge check for index creation in missing places Signed-off-by: Bhavana Ramaram <[email protected]> (cherry picked from commit 0cf731c) Co-authored-by: Bhavana Ramaram <[email protected]>
1 parent c9ce062 commit 831fc95

File tree

9 files changed

+159
-57
lines changed

9 files changed

+159
-57
lines changed

ml-algorithms/src/main/java/org/opensearch/ml/engine/encryptor/EncryptorImpl.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -114,58 +114,65 @@ private void initMasterKey() {
114114

115115
CountDownLatch latch = new CountDownLatch(1);
116116
mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(r -> {
117-
GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
118-
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
119-
client.get(getRequest, ActionListener.wrap(getResponse -> {
120-
if (getResponse == null || !getResponse.isExists()) {
121-
IndexRequest indexRequest = new IndexRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
122-
final String generatedMasterKey = generateMasterKey();
123-
indexRequest
124-
.source(ImmutableMap.of(MASTER_KEY, generatedMasterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli()));
125-
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
126-
indexRequest.opType(DocWriteRequest.OpType.CREATE);
127-
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
128-
this.masterKey = generatedMasterKey;
129-
log.info("ML encryption master key initialized successfully");
130-
latch.countDown();
131-
}, e -> {
132-
133-
if (ExceptionUtils.getRootCause(e) instanceof VersionConflictEngineException) {
134-
GetRequest getMasterKeyRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
135-
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
136-
client.get(getMasterKeyRequest, ActionListener.wrap(getMasterKeyResponse -> {
137-
if (getMasterKeyResponse != null && getMasterKeyResponse.isExists()) {
138-
final String masterKey = (String) getMasterKeyResponse.getSourceAsMap().get(MASTER_KEY);
139-
this.masterKey = masterKey;
140-
log.info("ML encryption master key already initialized, no action needed");
141-
latch.countDown();
142-
} else {
143-
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
117+
if (!r) {
118+
exceptionRef.set(new RuntimeException("No response to create ML Config index"));
119+
latch.countDown();
120+
} else {
121+
GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
122+
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
123+
client.get(getRequest, ActionListener.wrap(getResponse -> {
124+
if (getResponse == null || !getResponse.isExists()) {
125+
IndexRequest indexRequest = new IndexRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
126+
final String generatedMasterKey = generateMasterKey();
127+
indexRequest
128+
.source(ImmutableMap.of(MASTER_KEY, generatedMasterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli()));
129+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
130+
indexRequest.opType(DocWriteRequest.OpType.CREATE);
131+
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
132+
this.masterKey = generatedMasterKey;
133+
log.info("ML encryption master key initialized successfully");
134+
latch.countDown();
135+
}, e -> {
136+
137+
if (ExceptionUtils.getRootCause(e) instanceof VersionConflictEngineException) {
138+
GetRequest getMasterKeyRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
139+
try (
140+
ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()
141+
) {
142+
client.get(getMasterKeyRequest, ActionListener.wrap(getMasterKeyResponse -> {
143+
if (getMasterKeyResponse != null && getMasterKeyResponse.isExists()) {
144+
final String masterKey = (String) getMasterKeyResponse.getSourceAsMap().get(MASTER_KEY);
145+
this.masterKey = masterKey;
146+
log.info("ML encryption master key already initialized, no action needed");
147+
latch.countDown();
148+
} else {
149+
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
150+
latch.countDown();
151+
}
152+
}, error -> {
153+
log.debug("Failed to get ML encryption master key", e);
154+
exceptionRef.set(error);
144155
latch.countDown();
145-
}
146-
}, error -> {
147-
log.debug("Failed to get ML encryption master key", e);
148-
exceptionRef.set(error);
149-
latch.countDown();
150-
}));
156+
}));
157+
}
158+
} else {
159+
log.debug("Failed to index ML encryption master key", e);
160+
exceptionRef.set(e);
161+
latch.countDown();
151162
}
152-
} else {
153-
log.debug("Failed to index ML encryption master key", e);
154-
exceptionRef.set(e);
155-
latch.countDown();
156-
}
157-
}));
158-
} else {
159-
final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY);
160-
this.masterKey = masterKey;
161-
log.info("ML encryption master key already initialized, no action needed");
163+
}));
164+
} else {
165+
final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY);
166+
this.masterKey = masterKey;
167+
log.info("ML encryption master key already initialized, no action needed");
168+
latch.countDown();
169+
}
170+
}, e -> {
171+
log.debug("Failed to get ML encryption master key from config index", e);
172+
exceptionRef.set(e);
162173
latch.countDown();
163-
}
164-
}, e -> {
165-
log.debug("Failed to get ML encryption master key from config index", e);
166-
exceptionRef.set(e);
167-
latch.countDown();
168-
}));
174+
}));
175+
}
169176
}
170177
}, e -> {
171178
log.debug("Failed to init ML config index", e);

ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,22 @@ public void decrypt_NullMasterKey_GetMasterKey_Exception() {
499499
encryptor.decrypt("test");
500500
}
501501

502+
@Test
503+
public void decrypt_NoResponseToInitConfigIndex() {
504+
exceptionRule.expect(RuntimeException.class);
505+
exceptionRule.expectMessage("No response to create ML Config index");
506+
507+
doAnswer(invocation -> {
508+
ActionListener<Boolean> actionListener = (ActionListener) invocation.getArgument(0);
509+
actionListener.onResponse(false);
510+
return null;
511+
}).when(mlIndicesHandler).initMLConfigIndex(any());
512+
513+
Encryptor encryptor = new EncryptorImpl(clusterService, client, mlIndicesHandler);
514+
Assert.assertNull(encryptor.getMasterKey());
515+
encryptor.decrypt("test");
516+
}
517+
502518
@Test
503519
public void decrypt_MLConfigIndexNotFound() {
504520
exceptionRule.expect(ResourceNotFoundException.class);

plugin/src/main/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public void uploadModelChunk(MLUploadModelChunkInput uploadModelChunkInput, Acti
103103
throw new Exception("Chunk size exceeds 10MB");
104104
}
105105
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> {
106+
if (!res) {
107+
wrappedListener.onFailure(new RuntimeException("No response to create ML Model index"));
108+
return;
109+
}
106110
int chunkNum = uploadModelChunkInput.getChunkNumber();
107111
MLModel mlModel = MLModel
108112
.builder()

plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,13 @@ public void run() {
186186
return;
187187
}
188188
// refresh model status
189-
mlIndicesHandler
190-
.initModelIndexIfAbsent(ActionListener.wrap(res -> { refreshModelState(modelWorkerNodes, deployingModels); }, e -> {
191-
log.error("Failed to init model index", e);
192-
}));
189+
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> {
190+
if (!res) {
191+
log.error("No response to create ML model index");
192+
return;
193+
}
194+
refreshModelState(modelWorkerNodes, deployingModels);
195+
}, e -> { log.error("Failed to init model index", e); }));
193196
}, ex -> { log.error("Failed to sync model routing", ex); }));
194197
}, e -> { log.error("Failed to sync model routing", e); }));
195198
}
@@ -211,10 +214,13 @@ private void undeployExpiredModels(
211214
log.debug("Received failures in undeploying expired models", mlUndeployModelNodesResponse.failures());
212215
}
213216

214-
mlIndicesHandler
215-
.initModelIndexIfAbsent(ActionListener.wrap(res -> { refreshModelState(modelWorkerNodes, deployingModels); }, e -> {
216-
log.error("Failed to init model index", e);
217-
}));
217+
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> {
218+
if (!res) {
219+
log.error("No response to create ML model index");
220+
return;
221+
}
222+
refreshModelState(modelWorkerNodes, deployingModels);
223+
}, e -> { log.error("Failed to init model index", e); }));
218224
}, e -> { log.error("Failed to undeploy models {}", expiredModels, e); }));
219225
}
220226

@@ -224,6 +230,10 @@ void initMLConfig() {
224230
return;
225231
}
226232
mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(r -> {
233+
if (!r) {
234+
log.error("Failed to initialize or update ML Config index");
235+
return;
236+
}
227237
GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
228238
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
229239
client.get(getRequest, ActionListener.wrap(getResponse -> {

plugin/src/main/java/org/opensearch/ml/model/MLModelGroupManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ public void createModelGroup(MLRegisterModelGroupInput input, ActionListener<Str
112112
}
113113

114114
mlIndicesHandler.initModelGroupIndexIfAbsent(ActionListener.wrap(res -> {
115+
if (!res) {
116+
wrappedListener.onFailure(new RuntimeException("No response to create ML Model Group index"));
117+
return;
118+
}
115119
IndexRequest indexRequest = new IndexRequest(ML_MODEL_GROUP_INDEX);
116120
indexRequest
117121
.source(

plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,10 @@ private void uploadMLModelMeta(MLRegisterModelMetaInput mlRegisterModelMetaInput
292292
ActionListener<String> wrappedListener = ActionListener.runBefore(listener, () -> context.restore());
293293
String modelName = mlRegisterModelMetaInput.getName();
294294
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> {
295+
if (!res) {
296+
wrappedListener.onFailure(new RuntimeException("No response to create ML Model index"));
297+
return;
298+
}
295299
Instant now = Instant.now();
296300
MLModel mlModelMeta = MLModel
297301
.builder()
@@ -527,6 +531,10 @@ private void indexRemoteModel(
527531
}
528532

529533
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(boolResponse -> {
534+
if (!boolResponse) {
535+
listener.onFailure(new RuntimeException("No response to create ML Model index"));
536+
return;
537+
}
530538
MLModel mlModelMeta = MLModel
531539
.builder()
532540
.name(modelName)
@@ -594,6 +602,10 @@ void indexRemoteModel(MLRegisterModelInput registerModelInput, MLTask mlTask, St
594602
registerModelInput.getConnector().encrypt(mlEngine::encrypt);
595603
}
596604
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore(ActionListener.wrap(res -> {
605+
if (!res) {
606+
handleException(functionName, taskId, new RuntimeException("No response to create ML Model index"));
607+
return;
608+
}
597609
MLModel mlModelMeta = MLModel
598610
.builder()
599611
.name(modelName)
@@ -663,6 +675,10 @@ private void registerModelFromUrl(MLRegisterModelInput registerModelInput, MLTas
663675
String modelGroupId = registerModelInput.getModelGroupId();
664676
Instant now = Instant.now();
665677
mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore(ActionListener.wrap(res -> {
678+
if (!res) {
679+
handleException(functionName, taskId, new RuntimeException("No response to create ML Model index"));
680+
return;
681+
}
666682
MLModel mlModelMeta = MLModel
667683
.builder()
668684
.name(modelName)

plugin/src/test/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploaderTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,20 @@ public void testUploadModelChunk() {
151151
verify(actionListener).onResponse(argumentCaptor.capture());
152152
}
153153

154+
public void test_NoResponseInitModelIndex() {
155+
doAnswer(invocation -> {
156+
ActionListener<Boolean> actionListener = invocation.getArgument(0);
157+
actionListener.onResponse(false);
158+
return null;
159+
}).when(mlIndicesHandler).initModelIndexIfAbsent(any());
160+
161+
MLUploadModelChunkInput uploadModelChunkInput = prepareRequest();
162+
mlModelChunkUploader.uploadModelChunk(uploadModelChunkInput, actionListener);
163+
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
164+
verify(actionListener).onFailure(argumentCaptor.capture());
165+
assertEquals("No response to create ML Model index", argumentCaptor.getValue().getMessage());
166+
}
167+
154168
private MLUploadModelChunkInput prepareRequest() {
155169
final byte[] content = new byte[] { 1, 2, 3, 4 };
156170
MLUploadModelChunkInput input = MLUploadModelChunkInput.builder().chunkNumber(0).modelId("someModelId").content(content).build();

plugin/src/test/java/org/opensearch/ml/model/MLModelGroupManagerTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,23 @@ public void test_NotFoundGetModelGroup() throws IOException {
389389
assertEquals("Failed to find model group with ID: testModelGroupID", argumentCaptor.getValue().getMessage());
390390
}
391391

392+
public void test_NoResponseoInitModelGroup() throws IOException {
393+
doAnswer(invocation -> {
394+
ActionListener<Boolean> actionListener = invocation.getArgument(0);
395+
actionListener.onResponse(false);
396+
return null;
397+
}).when(mlIndicesHandler).initModelGroupIndexIfAbsent(any());
398+
399+
when(modelAccessControlHelper.isSecurityEnabledAndModelAccessControlEnabled(any())).thenReturn(false);
400+
401+
MLRegisterModelGroupInput mlRegisterModelGroupInput = prepareRequest(null, null, null);
402+
mlModelGroupManager.createModelGroup(mlRegisterModelGroupInput, actionListener);
403+
404+
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
405+
verify(actionListener).onFailure(argumentCaptor.capture());
406+
assertEquals("No response to create ML Model Group index", argumentCaptor.getValue().getMessage());
407+
}
408+
392409
private MLRegisterModelGroupInput prepareRequest(List<String> backendRoles, AccessMode modelAccessMode, Boolean isAddAllBackendRoles) {
393410
return MLRegisterModelGroupInput
394411
.builder()

plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,20 @@ public void testRegisterModelMeta_FailedToInitIndexIfPresent() {
11581158
verify(actionListener).onFailure(argumentCaptor.capture());
11591159
}
11601160

1161+
public void testRegisterModelMeta_NoResponseToInitIndex() {
1162+
setupForModelMeta();
1163+
doAnswer(invocation -> {
1164+
ActionListener<Boolean> actionListener = invocation.getArgument(0);
1165+
actionListener.onResponse(false);
1166+
return null;
1167+
}).when(mlIndicesHandler).initModelIndexIfAbsent(any());
1168+
MLRegisterModelMetaInput mlUploadInput = prepareRequest();
1169+
modelManager.registerModelMeta(mlUploadInput, actionListener);
1170+
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
1171+
verify(actionListener).onFailure(argumentCaptor.capture());
1172+
assertEquals("No response to create ML Model index", argumentCaptor.getValue().getMessage());
1173+
}
1174+
11611175
public void test_trackPredictDuration_sync() {
11621176
Supplier<String> mockResult = () -> {
11631177
try {

0 commit comments

Comments
 (0)