diff --git a/docs/changelog/115938.yaml b/docs/changelog/115938.yaml new file mode 100644 index 0000000000000..e096d8821a1d7 --- /dev/null +++ b/docs/changelog/115938.yaml @@ -0,0 +1,9 @@ +pr: 115938 +summary: Wait for up to 2 seconds for yellow status before starting search +area: Machine Learning +type: bug +issues: + - 107777 + - 105955 + - 107815 + - 112191 diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelProvider.java index ff5f37427b18f..5cf349b96a4f7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelProvider.java @@ -14,6 +14,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -28,6 +30,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -43,6 +46,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -105,6 +109,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -1053,63 +1058,98 @@ public void expandIds( } public void getInferenceStats(String[] modelIds, @Nullable TaskId parentTaskId, ActionListener> listener) { - MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); - Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add); - if (multiSearchRequest.requests().isEmpty()) { - listener.onResponse(Collections.emptyList()); - return; - } - if (parentTaskId != null) { - multiSearchRequest.setParentTask(parentTaskId); - } - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - ML_ORIGIN, - multiSearchRequest, - ActionListener.wrap(responses -> { - List allStats = new ArrayList<>(modelIds.length); - int modelIndex = 0; - assert responses.getResponses().length == modelIds.length : "mismatch between search response size and models requested"; - for (MultiSearchResponse.Item response : responses.getResponses()) { - if (response.isFailure()) { - if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) { - modelIndex++; - continue; - } + + SubscribableListener.newForked((delegate) -> { + // first wait for the index to be available + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + ML_ORIGIN, + new ClusterHealthRequest(new TimeValue(2, TimeUnit.SECONDS), MlStatsIndex.indexPattern()).waitForYellowStatus(), + delegate, + client.admin().cluster()::health + ); + }) + .>andThen( + client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME), + client.threadPool().getThreadContext(), + (delegate, clusterHealthResponse) -> { + if (clusterHealthResponse.isTimedOut()) { logger.error( - () -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models", - response.getFailure() - ); - listener.onFailure( - ExceptionsHelper.serverError( - "Searching for stats for models [{}] failed", - response.getFailure(), - Strings.arrayToCommaDelimitedString(modelIds) - ) + "getInferenceStats Timed out waiting for index [{}] to be available, " + + "this will probably cause the request to fail", + MlStatsIndex.indexPattern() ); - return; } - try { - InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]); - if (inferenceStats != null) { - allStats.add(inferenceStats); - } - } catch (Exception e) { - listener.onFailure(e); + + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add); + if (multiSearchRequest.requests().isEmpty()) { + delegate.onResponse(Collections.emptyList()); return; } + if (parentTaskId != null) { + multiSearchRequest.setParentTask(parentTaskId); + } + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + ML_ORIGIN, + multiSearchRequest, + ActionListener.wrap(responses -> { + List allStats = new ArrayList<>(modelIds.length); + int modelIndex = 0; + assert responses.getResponses().length == modelIds.length + : "mismatch between search response size and models requested"; + for (MultiSearchResponse.Item response : responses.getResponses()) { + if (response.isFailure()) { + if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) { + modelIndex++; + continue; + } + logger.error( + () -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models", + response.getFailure() + ); + delegate.onFailure( + ExceptionsHelper.serverError( + "Searching for stats for models [{}] failed", + response.getFailure(), + Strings.arrayToCommaDelimitedString(modelIds) + ) + ); + return; + } + try { + InferenceStats inferenceStats = handleMultiNodeStatsResponse( + response.getResponse(), + modelIds[modelIndex++] + ); + if (inferenceStats != null) { + allStats.add(inferenceStats); + } + } catch (Exception e) { + delegate.onFailure(e); + return; + } + } + delegate.onResponse(allStats); + }, e -> { + Throwable unwrapped = ExceptionsHelper.unwrapCause(e); + if (unwrapped instanceof ResourceNotFoundException) { + delegate.onResponse(Collections.emptyList()); + return; + } + delegate.onFailure((Exception) unwrapped); + }), + client::multiSearch + ); + } - listener.onResponse(allStats); - }, e -> { - Throwable unwrapped = ExceptionsHelper.unwrapCause(e); - if (unwrapped instanceof ResourceNotFoundException) { - listener.onResponse(Collections.emptyList()); - return; - } - listener.onFailure((Exception) unwrapped); - }), - client::multiSearch - ); + ) + .addListener( + listener, + client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME), + client.threadPool().getThreadContext() + ); } private static SearchRequest buildStatsSearchRequest(String modelId) {