| 
14 | 14 | import org.elasticsearch.ResourceNotFoundException;  | 
15 | 15 | import org.elasticsearch.action.ActionListener;  | 
16 | 16 | import org.elasticsearch.action.DocWriteRequest;  | 
 | 17 | +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;  | 
 | 18 | +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;  | 
17 | 19 | import org.elasticsearch.action.admin.indices.refresh.RefreshAction;  | 
18 | 20 | import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;  | 
19 | 21 | import org.elasticsearch.action.bulk.BulkItemResponse;  | 
 | 
28 | 30 | import org.elasticsearch.action.search.SearchResponse;  | 
29 | 31 | import org.elasticsearch.action.search.TransportSearchAction;  | 
30 | 32 | import org.elasticsearch.action.support.IndicesOptions;  | 
 | 33 | +import org.elasticsearch.action.support.SubscribableListener;  | 
31 | 34 | import org.elasticsearch.action.support.WriteRequest;  | 
32 | 35 | import org.elasticsearch.action.support.broadcast.BroadcastResponse;  | 
33 | 36 | import org.elasticsearch.action.support.master.AcknowledgedResponse;  | 
 | 
43 | 46 | import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;  | 
44 | 47 | import org.elasticsearch.common.xcontent.XContentHelper;  | 
45 | 48 | import org.elasticsearch.core.Nullable;  | 
 | 49 | +import org.elasticsearch.core.TimeValue;  | 
46 | 50 | import org.elasticsearch.core.Tuple;  | 
47 | 51 | import org.elasticsearch.index.IndexNotFoundException;  | 
48 | 52 | import org.elasticsearch.index.engine.VersionConflictEngineException;  | 
 | 
105 | 109 | import java.util.Objects;  | 
106 | 110 | import java.util.Set;  | 
107 | 111 | import java.util.TreeSet;  | 
 | 112 | +import java.util.concurrent.TimeUnit;  | 
108 | 113 | import java.util.function.Function;  | 
109 | 114 | import java.util.stream.Collectors;  | 
110 | 115 | 
 
  | 
@@ -1053,63 +1058,98 @@ public void expandIds(  | 
1053 | 1058 |     }  | 
1054 | 1059 | 
 
  | 
1055 | 1060 |     public void getInferenceStats(String[] modelIds, @Nullable TaskId parentTaskId, ActionListener<List<InferenceStats>> listener) {  | 
1056 |  | -        MultiSearchRequest multiSearchRequest = new MultiSearchRequest();  | 
1057 |  | -        Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);  | 
1058 |  | -        if (multiSearchRequest.requests().isEmpty()) {  | 
1059 |  | -            listener.onResponse(Collections.emptyList());  | 
1060 |  | -            return;  | 
1061 |  | -        }  | 
1062 |  | -        if (parentTaskId != null) {  | 
1063 |  | -            multiSearchRequest.setParentTask(parentTaskId);  | 
1064 |  | -        }  | 
1065 |  | -        executeAsyncWithOrigin(  | 
1066 |  | -            client.threadPool().getThreadContext(),  | 
1067 |  | -            ML_ORIGIN,  | 
1068 |  | -            multiSearchRequest,  | 
1069 |  | -            ActionListener.<MultiSearchResponse>wrap(responses -> {  | 
1070 |  | -                List<InferenceStats> allStats = new ArrayList<>(modelIds.length);  | 
1071 |  | -                int modelIndex = 0;  | 
1072 |  | -                assert responses.getResponses().length == modelIds.length : "mismatch between search response size and models requested";  | 
1073 |  | -                for (MultiSearchResponse.Item response : responses.getResponses()) {  | 
1074 |  | -                    if (response.isFailure()) {  | 
1075 |  | -                        if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {  | 
1076 |  | -                            modelIndex++;  | 
1077 |  | -                            continue;  | 
1078 |  | -                        }  | 
 | 1061 | + | 
 | 1062 | +        SubscribableListener.<ClusterHealthResponse>newForked((delegate) -> {  | 
 | 1063 | +            // first wait for the index to be available  | 
 | 1064 | +            executeAsyncWithOrigin(  | 
 | 1065 | +                client.threadPool().getThreadContext(),  | 
 | 1066 | +                ML_ORIGIN,  | 
 | 1067 | +                new ClusterHealthRequest(new TimeValue(2, TimeUnit.SECONDS), MlStatsIndex.indexPattern()).waitForYellowStatus(),  | 
 | 1068 | +                delegate,  | 
 | 1069 | +                client.admin().cluster()::health  | 
 | 1070 | +            );  | 
 | 1071 | +        })  | 
 | 1072 | +            .<List<InferenceStats>>andThen(  | 
 | 1073 | +                client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME),  | 
 | 1074 | +                client.threadPool().getThreadContext(),  | 
 | 1075 | +                (delegate, clusterHealthResponse) -> {  | 
 | 1076 | +                    if (clusterHealthResponse.isTimedOut()) {  | 
1079 | 1077 |                         logger.error(  | 
1080 |  | -                            () -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",  | 
1081 |  | -                            response.getFailure()  | 
1082 |  | -                        );  | 
1083 |  | -                        listener.onFailure(  | 
1084 |  | -                            ExceptionsHelper.serverError(  | 
1085 |  | -                                "Searching for stats for models [{}] failed",  | 
1086 |  | -                                response.getFailure(),  | 
1087 |  | -                                Strings.arrayToCommaDelimitedString(modelIds)  | 
1088 |  | -                            )  | 
 | 1078 | +                            "getInferenceStats Timed out waiting for index [{}] to be available, "  | 
 | 1079 | +                                + "this will probably cause the request to fail",  | 
 | 1080 | +                            MlStatsIndex.indexPattern()  | 
1089 | 1081 |                         );  | 
1090 |  | -                        return;  | 
1091 | 1082 |                     }  | 
1092 |  | -                    try {  | 
1093 |  | -                        InferenceStats inferenceStats = handleMultiNodeStatsResponse(response.getResponse(), modelIds[modelIndex++]);  | 
1094 |  | -                        if (inferenceStats != null) {  | 
1095 |  | -                            allStats.add(inferenceStats);  | 
1096 |  | -                        }  | 
1097 |  | -                    } catch (Exception e) {  | 
1098 |  | -                        listener.onFailure(e);  | 
 | 1083 | + | 
 | 1084 | +                    MultiSearchRequest multiSearchRequest = new MultiSearchRequest();  | 
 | 1085 | +                    Arrays.stream(modelIds).map(TrainedModelProvider::buildStatsSearchRequest).forEach(multiSearchRequest::add);  | 
 | 1086 | +                    if (multiSearchRequest.requests().isEmpty()) {  | 
 | 1087 | +                        delegate.onResponse(Collections.emptyList());  | 
1099 | 1088 |                         return;  | 
1100 | 1089 |                     }  | 
 | 1090 | +                    if (parentTaskId != null) {  | 
 | 1091 | +                        multiSearchRequest.setParentTask(parentTaskId);  | 
 | 1092 | +                    }  | 
 | 1093 | +                    executeAsyncWithOrigin(  | 
 | 1094 | +                        client.threadPool().getThreadContext(),  | 
 | 1095 | +                        ML_ORIGIN,  | 
 | 1096 | +                        multiSearchRequest,  | 
 | 1097 | +                        ActionListener.<MultiSearchResponse>wrap(responses -> {  | 
 | 1098 | +                            List<InferenceStats> allStats = new ArrayList<>(modelIds.length);  | 
 | 1099 | +                            int modelIndex = 0;  | 
 | 1100 | +                            assert responses.getResponses().length == modelIds.length  | 
 | 1101 | +                                : "mismatch between search response size and models requested";  | 
 | 1102 | +                            for (MultiSearchResponse.Item response : responses.getResponses()) {  | 
 | 1103 | +                                if (response.isFailure()) {  | 
 | 1104 | +                                    if (ExceptionsHelper.unwrapCause(response.getFailure()) instanceof ResourceNotFoundException) {  | 
 | 1105 | +                                        modelIndex++;  | 
 | 1106 | +                                        continue;  | 
 | 1107 | +                                    }  | 
 | 1108 | +                                    logger.error(  | 
 | 1109 | +                                        () -> "[" + Strings.arrayToCommaDelimitedString(modelIds) + "] search failed for models",  | 
 | 1110 | +                                        response.getFailure()  | 
 | 1111 | +                                    );  | 
 | 1112 | +                                    delegate.onFailure(  | 
 | 1113 | +                                        ExceptionsHelper.serverError(  | 
 | 1114 | +                                            "Searching for stats for models [{}] failed",  | 
 | 1115 | +                                            response.getFailure(),  | 
 | 1116 | +                                            Strings.arrayToCommaDelimitedString(modelIds)  | 
 | 1117 | +                                        )  | 
 | 1118 | +                                    );  | 
 | 1119 | +                                    return;  | 
 | 1120 | +                                }  | 
 | 1121 | +                                try {  | 
 | 1122 | +                                    InferenceStats inferenceStats = handleMultiNodeStatsResponse(  | 
 | 1123 | +                                        response.getResponse(),  | 
 | 1124 | +                                        modelIds[modelIndex++]  | 
 | 1125 | +                                    );  | 
 | 1126 | +                                    if (inferenceStats != null) {  | 
 | 1127 | +                                        allStats.add(inferenceStats);  | 
 | 1128 | +                                    }  | 
 | 1129 | +                                } catch (Exception e) {  | 
 | 1130 | +                                    delegate.onFailure(e);  | 
 | 1131 | +                                    return;  | 
 | 1132 | +                                }  | 
 | 1133 | +                            }  | 
 | 1134 | +                            delegate.onResponse(allStats);  | 
 | 1135 | +                        }, e -> {  | 
 | 1136 | +                            Throwable unwrapped = ExceptionsHelper.unwrapCause(e);  | 
 | 1137 | +                            if (unwrapped instanceof ResourceNotFoundException) {  | 
 | 1138 | +                                delegate.onResponse(Collections.emptyList());  | 
 | 1139 | +                                return;  | 
 | 1140 | +                            }  | 
 | 1141 | +                            delegate.onFailure((Exception) unwrapped);  | 
 | 1142 | +                        }),  | 
 | 1143 | +                        client::multiSearch  | 
 | 1144 | +                    );  | 
 | 1145 | + | 
1101 | 1146 |                 }  | 
1102 |  | -                listener.onResponse(allStats);  | 
1103 |  | -            }, e -> {  | 
1104 |  | -                Throwable unwrapped = ExceptionsHelper.unwrapCause(e);  | 
1105 |  | -                if (unwrapped instanceof ResourceNotFoundException) {  | 
1106 |  | -                    listener.onResponse(Collections.emptyList());  | 
1107 |  | -                    return;  | 
1108 |  | -                }  | 
1109 |  | -                listener.onFailure((Exception) unwrapped);  | 
1110 |  | -            }),  | 
1111 |  | -            client::multiSearch  | 
1112 |  | -        );  | 
 | 1147 | +            )  | 
 | 1148 | +            .addListener(  | 
 | 1149 | +                listener,  | 
 | 1150 | +                client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME),  | 
 | 1151 | +                client.threadPool().getThreadContext()  | 
 | 1152 | +            );  | 
1113 | 1153 |     }  | 
1114 | 1154 | 
 
  | 
1115 | 1155 |     private static SearchRequest buildStatsSearchRequest(String modelId) {  | 
 | 
0 commit comments