Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/changelog/115938.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pr: 115938
summary: Wait for up to 2 seconds for yellow status before starting search
area: Machine Learning
type: bug
issues:
- 107777
- 105955
- 107815
- 112191
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -28,6 +30,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -43,6 +46,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
Expand Down Expand Up @@ -105,6 +109,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -1053,63 +1058,98 @@ public void expandIds(
}

public void getInferenceStats(String[] modelIds, @Nullable TaskId parentTaskId, ActionListener<List<InferenceStats>> listener) {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);
if (multiSearchRequest.requests().isEmpty()) {
listener.onResponse(Collections.emptyList());
return;
}
if (parentTaskId != null) {
multiSearchRequest.setParentTask(parentTaskId);
}
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
multiSearchRequest,
ActionListener.<MultiSearchResponse>wrap(responses -> {
List<InferenceStats> allStats = new ArrayList<>(modelIds.length);
int modelIndex = 0;
assert responses.getResponses().length == modelIds.length : "mismatch between search response size and models requested";
for (MultiSearchResponse.Item response : responses.getResponses()) {
if (response.isFailure()) {
if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {
modelIndex++;
continue;
}

SubscribableListener.<ClusterHealthResponse>newForked((delegate) -> {
// first wait for the index to be available
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
new ClusterHealthRequest(new TimeValue(2, TimeUnit.SECONDS), MlStatsIndex.indexPattern()).waitForYellowStatus(),
delegate,
client.admin().cluster()::health
);
})
.<List<InferenceStats>>andThen(
client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
client.threadPool().getThreadContext(),
(delegate, clusterHealthResponse) -> {
if (clusterHealthResponse.isTimedOut()) {
logger.error(
() -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",
response.getFailure()
);
listener.onFailure(
ExceptionsHelper.serverError(
"Searching for stats for models [{}] failed",
response.getFailure(),
Strings.arrayToCommaDelimitedString(modelIds)
)
"getInferenceStats Timed out waiting for index [{}] to be available, "
+ "this will probably cause the request to fail",
MlStatsIndex.indexPattern()
);
return;
}
try {
InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]);
if (inferenceStats != null) {
allStats.add(inferenceStats);
}
} catch (Exception e) {
listener.onFailure(e);

MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);
if (multiSearchRequest.requests().isEmpty()) {
delegate.onResponse(Collections.emptyList());
return;
}
if (parentTaskId != null) {
multiSearchRequest.setParentTask(parentTaskId);
}
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
multiSearchRequest,
ActionListener.<MultiSearchResponse>wrap(responses -> {
List<InferenceStats> allStats = new ArrayList<>(modelIds.length);
int modelIndex = 0;
assert responses.getResponses().length == modelIds.length
: "mismatch between search response size and models requested";
for (MultiSearchResponse.Item response : responses.getResponses()) {
if (response.isFailure()) {
if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {
modelIndex++;
continue;
}
logger.error(
() -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",
response.getFailure()
);
delegate.onFailure(
ExceptionsHelper.serverError(
"Searching for stats for models [{}] failed",
response.getFailure(),
Strings.arrayToCommaDelimitedString(modelIds)
)
);
return;
}
try {
InferenceStats inferenceStats = handleMultiNodeStatsResponse(
response.getResponse(),
modelIds[modelIndex++]
);
if (inferenceStats != null) {
allStats.add(inferenceStats);
}
} catch (Exception e) {
delegate.onFailure(e);
return;
}
}
delegate.onResponse(allStats);
}, e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
delegate.onResponse(Collections.emptyList());
return;
}
delegate.onFailure((Exception) unwrapped);
}),
client::multiSearch
);

}
listener.onResponse(allStats);
}, e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
listener.onResponse(Collections.emptyList());
return;
}
listener.onFailure((Exception) unwrapped);
}),
client::multiSearch
);
)
.addListener(
listener,
client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
client.threadPool().getThreadContext()
);
}

private static SearchRequest buildStatsSearchRequest(String modelId) {
Expand Down
Loading