|
9 | 9 | package org.elasticsearch.action.search; |
10 | 10 |
|
11 | 11 | import org.apache.lucene.document.Document; |
| 12 | +import org.apache.lucene.document.Field; |
| 13 | +import org.apache.lucene.document.StoredField; |
| 14 | +import org.apache.lucene.document.StringField; |
12 | 15 | import org.apache.lucene.index.IndexReader; |
13 | 16 | import org.apache.lucene.index.LeafReaderContext; |
14 | 17 | import org.apache.lucene.search.Query; |
|
20 | 23 | import org.apache.lucene.tests.index.RandomIndexWriter; |
21 | 24 | import org.apache.lucene.tests.store.MockDirectoryWrapper; |
22 | 25 | import org.apache.lucene.util.Accountable; |
| 26 | +import org.apache.lucene.util.BytesRef; |
23 | 27 | import org.elasticsearch.action.ActionListener; |
24 | 28 | import org.elasticsearch.action.OriginalIndices; |
25 | 29 | import org.elasticsearch.cluster.metadata.IndexMetadata; |
26 | 30 | import org.elasticsearch.common.UUIDs; |
27 | 31 | import org.elasticsearch.common.breaker.CircuitBreaker; |
| 32 | +import org.elasticsearch.common.breaker.CircuitBreakingException; |
28 | 33 | import org.elasticsearch.common.breaker.NoopCircuitBreaker; |
29 | 34 | import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; |
30 | 35 | import org.elasticsearch.common.settings.Settings; |
31 | 36 | import org.elasticsearch.common.util.concurrent.AtomicArray; |
32 | 37 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
| 38 | +import org.elasticsearch.core.Nullable; |
33 | 39 | import org.elasticsearch.index.IndexSettings; |
34 | 40 | import org.elasticsearch.index.IndexVersion; |
35 | 41 | import org.elasticsearch.index.cache.bitset.BitsetFilterCache; |
|
55 | 61 | import org.elasticsearch.search.internal.SearchContext; |
56 | 62 | import org.elasticsearch.search.internal.ShardSearchContextId; |
57 | 63 | import org.elasticsearch.search.internal.ShardSearchRequest; |
| 64 | +import org.elasticsearch.search.lookup.Source; |
58 | 65 | import org.elasticsearch.search.profile.ProfileResult; |
59 | 66 | import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult; |
60 | 67 | import org.elasticsearch.search.profile.SearchProfileShardResult; |
|
72 | 79 | import java.util.concurrent.CountDownLatch; |
73 | 80 | import java.util.concurrent.atomic.AtomicInteger; |
74 | 81 | import java.util.function.BiFunction; |
| 82 | +import java.util.stream.IntStream; |
75 | 83 |
|
76 | 84 | import static org.hamcrest.Matchers.arrayWithSize; |
77 | 85 | import static org.hamcrest.Matchers.equalTo; |
78 | 86 | import static org.hamcrest.Matchers.hasSize; |
| 87 | +import static org.hamcrest.Matchers.is; |
79 | 88 | import static org.hamcrest.Matchers.nullValue; |
80 | 89 |
|
81 | 90 | public class FetchSearchPhaseTests extends ESTestCase { |
@@ -820,6 +829,57 @@ public void testFetchTimeoutNoPartialResults() throws IOException { |
820 | 829 | } |
821 | 830 | } |
822 | 831 |
|
| 832 | + public void testFetchPhaseChecksMemoryBreaker() throws IOException { |
| 833 | + Directory dir = newDirectory(); |
| 834 | + RandomIndexWriter w = new RandomIndexWriter(random(), dir); |
| 835 | + |
| 836 | + // we're indexing 100 documents with a field that is 48KB long so the fetch phase should check the memory breaker 5 times |
| 837 | + // (every 22 documents that accumulate 1MiB in source sizes, and then a final time when we finished processing the one segment) |
| 838 | + |
| 839 | + String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }"; |
| 840 | + for (int i = 0; i < 100; i++) { |
| 841 | + Document document = new Document(); |
| 842 | + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); |
| 843 | + document.add(new StoredField("_source", new BytesRef(body))); |
| 844 | + w.addDocument(document); |
| 845 | + } |
| 846 | + w.forceMerge(1); |
| 847 | + IndexReader r = w.getReader(); |
| 848 | + w.close(); |
| 849 | + ContextIndexSearcher contextIndexSearcher = createSearcher(r); |
| 850 | + AtomicInteger breakerCalledCount = new AtomicInteger(0); |
| 851 | + NoopCircuitBreaker breakingCircuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST) { |
| 852 | + @Override |
| 853 | + public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { |
| 854 | + breakerCalledCount.incrementAndGet(); |
| 855 | + } |
| 856 | + }; |
| 857 | + try (SearchContext searchContext = createSearchContext(contextIndexSearcher, true, breakingCircuitBreaker)) { |
| 858 | + FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() { |
| 859 | + @Override |
| 860 | + public void setNextReader(LeafReaderContext readerContext) throws IOException { |
| 861 | + |
| 862 | + } |
| 863 | + |
| 864 | + @Override |
| 865 | + public void process(FetchSubPhase.HitContext hitContext) throws IOException { |
| 866 | + Source source = hitContext.source(); |
| 867 | + hitContext.hit().sourceRef(source.internalSourceRef()); |
| 868 | + } |
| 869 | + |
| 870 | + @Override |
| 871 | + public StoredFieldsSpec storedFieldsSpec() { |
| 872 | + return StoredFieldsSpec.NEEDS_SOURCE; |
| 873 | + } |
| 874 | + })); |
| 875 | + fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null); |
| 876 | + assertThat(breakerCalledCount.get(), is(5)); |
| 877 | + } finally { |
| 878 | + r.close(); |
| 879 | + dir.close(); |
| 880 | + } |
| 881 | + } |
| 882 | + |
823 | 883 | private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException { |
824 | 884 | return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() { |
825 | 885 | @Override |
@@ -857,6 +917,14 @@ public StoredFieldsSpec storedFieldsSpec() { |
857 | 917 | } |
858 | 918 |
|
859 | 919 | private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) { |
| 920 | + return createSearchContext(contextIndexSearcher, allowPartialResults, null); |
| 921 | + } |
| 922 | + |
| 923 | + private static SearchContext createSearchContext( |
| 924 | + ContextIndexSearcher contextIndexSearcher, |
| 925 | + boolean allowPartialResults, |
| 926 | + @Nullable CircuitBreaker circuitBreaker |
| 927 | + ) { |
860 | 928 | IndexSettings indexSettings = new IndexSettings( |
861 | 929 | IndexMetadata.builder("index") |
862 | 930 | .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) |
@@ -929,6 +997,15 @@ public FetchSearchResult fetchResult() { |
929 | 997 | public ShardSearchRequest request() { |
930 | 998 | return request; |
931 | 999 | } |
| 1000 | + |
| 1001 | + @Override |
| 1002 | + public CircuitBreaker circuitBreaker() { |
| 1003 | + if (circuitBreaker != null) { |
| 1004 | + return circuitBreaker; |
| 1005 | + } else { |
| 1006 | + return super.circuitBreaker(); |
| 1007 | + } |
| 1008 | + } |
932 | 1009 | }; |
933 | 1010 | searchContext.addReleasable(searchContext.fetchResult()::decRef); |
934 | 1011 | searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap())); |
|
0 commit comments