|
50 | 50 | import org.elasticsearch.search.SearchPhaseResult;
|
51 | 51 | import org.elasticsearch.search.SearchShardTarget;
|
52 | 52 | import org.elasticsearch.search.fetch.FetchPhase;
|
| 53 | +import org.elasticsearch.search.fetch.FetchPhaseExecutionException; |
53 | 54 | import org.elasticsearch.search.fetch.FetchSearchResult;
|
54 | 55 | import org.elasticsearch.search.fetch.FetchSubPhase;
|
55 | 56 | import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
|
|
63 | 64 | import org.elasticsearch.search.internal.ShardSearchRequest;
|
64 | 65 | import org.elasticsearch.search.lookup.Source;
|
65 | 66 | import org.elasticsearch.search.profile.ProfileResult;
|
| 67 | +import org.elasticsearch.search.profile.Profilers; |
66 | 68 | import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
|
67 | 69 | import org.elasticsearch.search.profile.SearchProfileShardResult;
|
68 | 70 | import org.elasticsearch.search.query.QuerySearchResult;
|
@@ -873,6 +875,63 @@ public StoredFieldsSpec storedFieldsSpec() {
|
873 | 875 | }
|
874 | 876 | }
|
875 | 877 |
|
| 878 | + public void testTimerStoppedAndSubPhasesExceptionsPropagate() throws IOException { |
| 879 | + // if the timer is not stopped properly whilst profiling the fetch phase the exceptions |
| 880 | + // in sub phases#setNextReader will not propagate as the cause that failed the fetch phase (instead a timer illegal state exception |
| 881 | + // will propagate) |
| 882 | + // this tests ensures that exceptions in sub phases are propagated correctly as the cause of the fetch phase failure (which in turn |
| 883 | + // implies the timer was handled correctly) |
| 884 | + Directory dir = newDirectory(); |
| 885 | + RandomIndexWriter w = new RandomIndexWriter(random(), dir); |
| 886 | + |
| 887 | + String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }"; |
| 888 | + for (int i = 0; i < 10; i++) { |
| 889 | + Document document = new Document(); |
| 890 | + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); |
| 891 | + w.addDocument(document); |
| 892 | + } |
| 893 | + if (randomBoolean()) { |
| 894 | + w.forceMerge(1); |
| 895 | + } |
| 896 | + IndexReader r = w.getReader(); |
| 897 | + w.close(); |
| 898 | + ContextIndexSearcher contextIndexSearcher = createSearcher(r); |
| 899 | + try ( |
| 900 | + SearchContext searchContext = createSearchContext( |
| 901 | + contextIndexSearcher, |
| 902 | + true, |
| 903 | + new NoopCircuitBreaker(CircuitBreaker.REQUEST), |
| 904 | + true |
| 905 | + ) |
| 906 | + ) { |
| 907 | + FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() { |
| 908 | + @Override |
| 909 | + public void setNextReader(LeafReaderContext readerContext) throws IOException { |
| 910 | + throw new IOException("bad things"); |
| 911 | + } |
| 912 | + |
| 913 | + @Override |
| 914 | + public void process(FetchSubPhase.HitContext hitContext) throws IOException { |
| 915 | + Source source = hitContext.source(); |
| 916 | + hitContext.hit().sourceRef(source.internalSourceRef()); |
| 917 | + } |
| 918 | + |
| 919 | + @Override |
| 920 | + public StoredFieldsSpec storedFieldsSpec() { |
| 921 | + return StoredFieldsSpec.NEEDS_SOURCE; |
| 922 | + } |
| 923 | + })); |
| 924 | + FetchPhaseExecutionException fetchPhaseExecutionException = assertThrows( |
| 925 | + FetchPhaseExecutionException.class, |
| 926 | + () -> fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null) |
| 927 | + ); |
| 928 | + assertThat(fetchPhaseExecutionException.getCause().getMessage(), is("bad things")); |
| 929 | + } finally { |
| 930 | + r.close(); |
| 931 | + dir.close(); |
| 932 | + } |
| 933 | + } |
| 934 | + |
876 | 935 | private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException {
|
877 | 936 | return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() {
|
878 | 937 | @Override
|
@@ -910,13 +969,22 @@ public StoredFieldsSpec storedFieldsSpec() {
|
910 | 969 | }
|
911 | 970 |
|
912 | 971 | private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) {
|
913 |
| - return createSearchContext(contextIndexSearcher, allowPartialResults, null); |
| 972 | + return createSearchContext(contextIndexSearcher, allowPartialResults, null, false); |
914 | 973 | }
|
915 | 974 |
|
916 | 975 | private static SearchContext createSearchContext(
|
917 | 976 | ContextIndexSearcher contextIndexSearcher,
|
918 | 977 | boolean allowPartialResults,
|
919 | 978 | @Nullable CircuitBreaker circuitBreaker
|
| 979 | + ) { |
| 980 | + return createSearchContext(contextIndexSearcher, allowPartialResults, circuitBreaker, false); |
| 981 | + } |
| 982 | + |
| 983 | + private static SearchContext createSearchContext( |
| 984 | + ContextIndexSearcher contextIndexSearcher, |
| 985 | + boolean allowPartialResults, |
| 986 | + @Nullable CircuitBreaker circuitBreaker, |
| 987 | + boolean profileEnabled |
920 | 988 | ) {
|
921 | 989 | IndexSettings indexSettings = new IndexSettings(
|
922 | 990 | IndexMetadata.builder("index")
|
@@ -999,6 +1067,11 @@ public CircuitBreaker circuitBreaker() {
|
999 | 1067 | return super.circuitBreaker();
|
1000 | 1068 | }
|
1001 | 1069 | }
|
| 1070 | + |
| 1071 | + @Override |
| 1072 | + public Profilers getProfilers() { |
| 1073 | + return profileEnabled ? new Profilers(contextIndexSearcher) : null; |
| 1074 | + } |
1002 | 1075 | };
|
1003 | 1076 | searchContext.addReleasable(searchContext.fetchResult()::decRef);
|
1004 | 1077 | searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));
|
|
0 commit comments