Skip to content

Commit cb43c45

Browse files
author
Max Hniebergall
committed
Wait for up to 2 seconds for yellow before starting search
1 parent 78a531b commit cb43c45

File tree

1 file changed

+79
-53
lines changed

1 file changed

+79
-53
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelProvider.java

Lines changed: 79 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.elasticsearch.ResourceNotFoundException;
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.DocWriteRequest;
17+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
18+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1719
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1820
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1921
import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -28,6 +30,7 @@
2830
import org.elasticsearch.action.search.SearchResponse;
2931
import org.elasticsearch.action.search.TransportSearchAction;
3032
import org.elasticsearch.action.support.IndicesOptions;
33+
import org.elasticsearch.action.support.SubscribableListener;
3134
import org.elasticsearch.action.support.WriteRequest;
3235
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
3336
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -43,6 +46,7 @@
4346
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
4447
import org.elasticsearch.common.xcontent.XContentHelper;
4548
import org.elasticsearch.core.Nullable;
49+
import org.elasticsearch.core.TimeValue;
4650
import org.elasticsearch.core.Tuple;
4751
import org.elasticsearch.index.IndexNotFoundException;
4852
import org.elasticsearch.index.engine.VersionConflictEngineException;
@@ -105,6 +109,7 @@
105109
import java.util.Objects;
106110
import java.util.Set;
107111
import java.util.TreeSet;
112+
import java.util.concurrent.TimeUnit;
108113
import java.util.function.Function;
109114
import java.util.stream.Collectors;
110115

@@ -1053,63 +1058,84 @@ public void expandIds(
10531058
}
10541059

10551060
public void getInferenceStats(String[] modelIds, @Nullable TaskId parentTaskId, ActionListener<List<InferenceStats>> listener) {
1056-
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
1057-
Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);
1058-
if (multiSearchRequest.requests().isEmpty()) {
1059-
listener.onResponse(Collections.emptyList());
1060-
return;
1061-
}
1062-
if (parentTaskId != null) {
1063-
multiSearchRequest.setParentTask(parentTaskId);
1064-
}
1065-
executeAsyncWithOrigin(
1066-
client.threadPool().getThreadContext(),
1067-
ML_ORIGIN,
1068-
multiSearchRequest,
1069-
ActionListener.<MultiSearchResponse>wrap(responses -> {
1070-
List<InferenceStats> allStats = new ArrayList<>(modelIds.length);
1071-
int modelIndex = 0;
1072-
assert responses.getResponses().length == modelIds.length : "mismatch between search response size and models requested";
1073-
for (MultiSearchResponse.Item response : responses.getResponses()) {
1074-
if (response.isFailure()) {
1075-
if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {
1076-
modelIndex++;
1077-
continue;
1061+
1062+
SubscribableListener.<ClusterHealthResponse>newForked((delegate) -> {
1063+
// first wait for the index to be available
1064+
executeAsyncWithOrigin(
1065+
client.threadPool().getThreadContext(),
1066+
ML_ORIGIN,
1067+
new ClusterHealthRequest(new TimeValue(2, TimeUnit.SECONDS), MlStatsIndex.indexPattern()).waitForYellowStatus(),
1068+
delegate,
1069+
client.admin().cluster()::health
1070+
);
1071+
}).<List<InferenceStats>>andThen((delegate, clusterHealthResponse) -> {
1072+
if (clusterHealthResponse.isTimedOut()) {
1073+
logger.error(
1074+
"getInferenceStats Timed out waiting for index [{}] to be available, this will probably cause the request to fail",
1075+
MlStatsIndex.indexPattern()
1076+
);
1077+
}
1078+
1079+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
1080+
Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);
1081+
if (multiSearchRequest.requests().isEmpty()) {
1082+
listener.onResponse(Collections.emptyList());
1083+
return;
1084+
}
1085+
if (parentTaskId != null) {
1086+
multiSearchRequest.setParentTask(parentTaskId);
1087+
}
1088+
executeAsyncWithOrigin(
1089+
client.threadPool().getThreadContext(),
1090+
ML_ORIGIN,
1091+
multiSearchRequest,
1092+
ActionListener.<MultiSearchResponse>wrap(responses -> {
1093+
List<InferenceStats> allStats = new ArrayList<>(modelIds.length);
1094+
int modelIndex = 0;
1095+
assert responses.getResponses().length == modelIds.length
1096+
: "mismatch between search response size and models requested";
1097+
for (MultiSearchResponse.Item response : responses.getResponses()) {
1098+
if (response.isFailure()) {
1099+
if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {
1100+
modelIndex++;
1101+
continue;
1102+
}
1103+
logger.error(
1104+
() -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",
1105+
response.getFailure()
1106+
);
1107+
listener.onFailure(
1108+
ExceptionsHelper.serverError(
1109+
"Searching for stats for models [{}] failed",
1110+
response.getFailure(),
1111+
Strings.arrayToCommaDelimitedString(modelIds)
1112+
)
1113+
);
1114+
return;
10781115
}
1079-
logger.error(
1080-
() -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",
1081-
response.getFailure()
1082-
);
1083-
listener.onFailure(
1084-
ExceptionsHelper.serverError(
1085-
"Searching for stats for models [{}] failed",
1086-
response.getFailure(),
1087-
Strings.arrayToCommaDelimitedString(modelIds)
1088-
)
1089-
);
1090-
return;
1091-
}
1092-
try {
1093-
InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]);
1094-
if (inferenceStats != null) {
1095-
allStats.add(inferenceStats);
1116+
try {
1117+
InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]);
1118+
if (inferenceStats != null) {
1119+
allStats.add(inferenceStats);
1120+
}
1121+
} catch (Exception e) {
1122+
listener.onFailure(e);
1123+
return;
10961124
}
1097-
} catch (Exception e) {
1098-
listener.onFailure(e);
1125+
}
1126+
listener.onResponse(allStats);
1127+
}, e -> {
1128+
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
1129+
if (unwrapped instanceof ResourceNotFoundException) {
1130+
listener.onResponse(Collections.emptyList());
10991131
return;
11001132
}
1101-
}
1102-
listener.onResponse(allStats);
1103-
}, e -> {
1104-
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
1105-
if (unwrapped instanceof ResourceNotFoundException) {
1106-
listener.onResponse(Collections.emptyList());
1107-
return;
1108-
}
1109-
listener.onFailure((Exception) unwrapped);
1110-
}),
1111-
client::multiSearch
1112-
);
1133+
listener.onFailure((Exception) unwrapped);
1134+
}),
1135+
client::multiSearch
1136+
);
1137+
1138+
}).addListener(listener);
11131139
}
11141140

11151141
private static SearchRequest buildStatsSearchRequest(String modelId) {

0 commit comments

Comments
 (0)