Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/changelog/115938.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pr: 115938
summary: Wait for up to 2 seconds for yellow status before starting search
area: Machine Learning
type: bug
issues:
- 107777
- 105955
- 107815
- 112191
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -28,6 +30,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -43,6 +46,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
Expand Down Expand Up @@ -105,6 +109,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -1053,63 +1058,84 @@ public void expandIds(
}

public void getInferenceStats(String[] modelIds, @Nullable TaskId parentTaskId, ActionListener<List<InferenceStats>> listener) {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);
if (multiSearchRequest.requests().isEmpty()) {
listener.onResponse(Collections.emptyList());
return;
}
if (parentTaskId != null) {
multiSearchRequest.setParentTask(parentTaskId);
}
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
multiSearchRequest,
ActionListener.<MultiSearchResponse>wrap(responses -> {
List<InferenceStats> allStats = new ArrayList<>(modelIds.length);
int modelIndex = 0;
assert responses.getResponses().length == modelIds.length : "mismatch between search response size and models requested";
for (MultiSearchResponse.Item response : responses.getResponses()) {
if (response.isFailure()) {
if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {
modelIndex++;
continue;

SubscribableListener.<ClusterHealthResponse>newForked((delegate) -> {
// first wait for the index to be available
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
new ClusterHealthRequest(new TimeValue(2, TimeUnit.SECONDS), MlStatsIndex.indexPattern()).waitForYellowStatus(),
delegate,
client.admin().cluster()::health
);
}).<List<InferenceStats>>andThen((delegate, clusterHealthResponse) -> {
if (clusterHealthResponse.isTimedOut()) {
logger.error(
"getInferenceStats Timed out waiting for index [{}] to be available, this will probably cause the request to fail",
MlStatsIndex.indexPattern()
);
}

MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);
if (multiSearchRequest.requests().isEmpty()) {
listener.onResponse(Collections.emptyList());
return;
}
if (parentTaskId != null) {
multiSearchRequest.setParentTask(parentTaskId);
}
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
multiSearchRequest,
ActionListener.<MultiSearchResponse>wrap(responses -> {
List<InferenceStats> allStats = new ArrayList<>(modelIds.length);
int modelIndex = 0;
assert responses.getResponses().length == modelIds.length
: "mismatch between search response size and models requested";
for (MultiSearchResponse.Item response : responses.getResponses()) {
if (response.isFailure()) {
if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {
modelIndex++;
continue;
}
logger.error(
() -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",
response.getFailure()
);
listener.onFailure(
ExceptionsHelper.serverError(
"Searching for stats for models [{}] failed",
response.getFailure(),
Strings.arrayToCommaDelimitedString(modelIds)
)
);
return;
}
logger.error(
() -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",
response.getFailure()
);
listener.onFailure(
ExceptionsHelper.serverError(
"Searching for stats for models [{}] failed",
response.getFailure(),
Strings.arrayToCommaDelimitedString(modelIds)
)
);
return;
}
try {
InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]);
if (inferenceStats != null) {
allStats.add(inferenceStats);
try {
InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]);
if (inferenceStats != null) {
allStats.add(inferenceStats);
}
} catch (Exception e) {
listener.onFailure(e);
return;
}
} catch (Exception e) {
listener.onFailure(e);
}
listener.onResponse(allStats);
}, e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
listener.onResponse(Collections.emptyList());
return;
}
}
listener.onResponse(allStats);
}, e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
listener.onResponse(Collections.emptyList());
return;
}
listener.onFailure((Exception) unwrapped);
}),
client::multiSearch
);
listener.onFailure((Exception) unwrapped);
}),
client::multiSearch
);

}).addListener(listener);
}

private static SearchRequest buildStatsSearchRequest(String modelId) {
Expand Down
Loading