diff --git a/docs/changelog/121920.yaml b/docs/changelog/121920.yaml new file mode 100644 index 0000000000000..6f0f4462d52ae --- /dev/null +++ b/docs/changelog/121920.yaml @@ -0,0 +1,6 @@ +pr: 121920 +summary: Account for the `SearchHit` source in circuit breaker +area: Search +type: enhancement +issues: + - 89656 diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 7397382866388..c09430e668a7f 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -535,6 +535,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchModule.SCRIPTED_METRICS_AGG_ALLOWED_STORED_SCRIPTS, SearchService.SEARCH_WORKER_THREADS_ENABLED, SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, + SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE, ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING, ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index baba9e94db7a7..6824f0d668c02 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -834,6 +834,10 @@ public ScriptService getScriptService() { return scriptService; } + public CircuitBreakerService breakerService() { + return circuitBreakerService; + } + List getIndexOperationListeners() { // pkg private for testing return indexingOperationListeners; } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 7095d3ec92c72..a6c15fc6b1a57 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.routing.IndexRouting; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -103,6 +104,7 @@ final class DefaultSearchContext extends SearchContext { private final IndexShard indexShard; private final IndexService indexService; private final ContextIndexSearcher searcher; + private final long memoryAccountingBufferSize; private DfsSearchResult dfsResult; private QuerySearchResult queryResult; private RankFeatureResult rankFeatureResult; @@ -168,7 +170,8 @@ final class DefaultSearchContext extends SearchContext { Executor executor, SearchService.ResultsType resultsType, boolean enableQueryPhaseParallelCollection, - int minimumDocsPerSlice + int minimumDocsPerSlice, + long memoryAccountingBufferSize ) throws IOException { this.readerContext = readerContext; this.request = request; @@ -179,6 +182,7 @@ final class DefaultSearchContext extends SearchContext { this.shardTarget = shardTarget; this.indexService = readerContext.indexService(); this.indexShard = readerContext.indexShard(); + this.memoryAccountingBufferSize = memoryAccountingBufferSize; Engine.Searcher engineSearcher = readerContext.acquireSearcher("search"); int maximumNumberOfSlices = determineMaximumNumberOfSlices( @@ -902,6 +906,16 @@ public Profilers getProfilers() { return profilers; } + @Override + public CircuitBreaker circuitBreaker() { + return indexService.breakerService().getBreaker(CircuitBreaker.REQUEST); + } + + @Override + public long memAccountingBufferSize() { + return memoryAccountingBufferSize; + } + public void setProfilers(Profilers profilers) { this.profilers = profilers; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchHit.java b/server/src/main/java/org/elasticsearch/search/SearchHit.java index 8a70f8a7f41a6..b603b346e9b15 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchHit.java +++ b/server/src/main/java/org/elasticsearch/search/SearchHit.java @@ -111,7 +111,6 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted private final RefCounted refCounted; - // used only in tests public SearchHit(int docId) { this(docId, null); } @@ -293,6 +292,7 @@ public static SearchHit unpooled(int docId, String id) { } public static SearchHit unpooled(int nestedTopDocId, String id, NestedIdentity nestedIdentity) { + // always referenced search hits do NOT call #deallocate return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 9a6860349a8b3..855e099a57d3f 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -274,6 +274,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + /** + * The size of the buffer used for memory accounting. + * This buffer is used to locally track the memory accummulated during the execution of + * a search request before submitting the accumulated value to the circuit breaker. + */ + public static final Setting MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting( + "search.memory_accounting_buffer_size", + ByteSizeValue.of(1, ByteSizeUnit.MB), + ByteSizeValue.of(1, ByteSizeUnit.MB), + ByteSizeValue.ofBytes(Long.MAX_VALUE), + Property.Dynamic, + Property.NodeScope + ); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0]; @@ -291,6 +305,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final BigArrays bigArrays; private final FetchPhase fetchPhase; + private final CircuitBreaker circuitBreaker; private volatile Executor searchExecutor; private volatile boolean enableQueryPhaseParallelCollection; @@ -310,6 +325,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private volatile boolean enableRewriteAggsToFilterByFilter; + private volatile long memoryAccountingBufferSize; + private final Cancellable keepAliveReaper; private final AtomicLong idGenerator = new AtomicLong(); @@ -341,11 +358,8 @@ public SearchService( this.scriptService = scriptService; this.bigArrays = bigArrays; this.fetchPhase = fetchPhase; - this.multiBucketConsumerService = new MultiBucketConsumerService( - clusterService, - settings, - circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) - ); + circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); + this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker); this.executorSelector = executorSelector; this.tracer = tracer; @@ -390,6 +404,10 @@ public SearchService( enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection); + + memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes(); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes()); } private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) { @@ -1195,7 +1213,8 @@ private DefaultSearchContext createSearchContext( searchExecutor, resultsType, enableQueryPhaseParallelCollection, - minimumDocsPerSlice + minimumDocsPerSlice, + memoryAccountingBufferSize ); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 2fbe3c1fc1532..dbf11ece9e82a 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; import org.elasticsearch.index.mapper.IdLoader; @@ -129,7 +130,7 @@ private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Pr StoredFieldLoader storedFieldLoader = profiler.storedFields(StoredFieldLoader.fromSpec(storedFieldsSpec)); IdLoader idLoader = context.newIdLoader(); boolean requiresSource = storedFieldsSpec.requiresSource(); - + final int[] locallyAccumulatedBytes = new int[1]; NestedDocuments nestedDocuments = context.getSearchExecutionContext().getNestedDocuments(); FetchPhaseDocsIterator docsIterator = new FetchPhaseDocsIterator() { @@ -162,6 +163,11 @@ protected SearchHit nextDoc(int doc) throws IOException { if (context.isCancelled()) { throw new TaskCancelledException("cancelled"); } + if (context.checkRealMemoryCB(locallyAccumulatedBytes[0], "fetch source")) { + // if we checked the real memory breaker, we restart our local accounting + locallyAccumulatedBytes[0] = 0; + } + HitContext hit = prepareHitContext( context, requiresSource, @@ -181,6 +187,11 @@ protected SearchHit nextDoc(int doc) throws IOException { for (FetchSubPhaseProcessor processor : processors) { processor.process(hit); } + + BytesReference sourceRef = hit.hit().getSourceRef(); + if (sourceRef != null) { + locallyAccumulatedBytes[0] += sourceRef.length(); + } success = true; return hit.hit(); } finally { @@ -369,8 +380,8 @@ private static HitContext prepareNestedHitContext( assert nestedIdentity != null; Source nestedSource = nestedIdentity.extractSource(rootSource); - SearchHit hit = new SearchHit(topDocId, rootId, nestedIdentity); - return new HitContext(hit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource, rankDoc); + SearchHit nestedHit = new SearchHit(topDocId, rootId, nestedIdentity); + return new HitContext(nestedHit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource, rankDoc); } interface Profiler { diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java index 4a242f70e8d02..552f9e339bd7c 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.ReaderUtil; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; @@ -100,6 +101,9 @@ public final SearchHit[] iterate( } } catch (SearchTimeoutException e) { throw e; + } catch (CircuitBreakingException e) { + purgeSearchHits(searchHits); + throw e; } catch (Exception e) { purgeSearchHits(searchHits); throw new FetchPhaseExecutionException(shardTarget, "Error running fetch phase for doc [" + currentDoc + "]", e); diff --git a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index 5bad06d08f96b..3ed2bf0fff5c0 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.mapper.IdLoader; @@ -460,4 +461,14 @@ public SourceLoader newSourceLoader() { public IdLoader newIdLoader() { return in.newIdLoader(); } + + @Override + public CircuitBreaker circuitBreaker() { + return in.circuitBreaker(); + } + + @Override + public long memAccountingBufferSize() { + return in.memAccountingBufferSize(); + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 5a8c280e12f85..d5e26b9c91a8d 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -369,6 +370,31 @@ public Query rewrittenQuery() { */ public abstract Profilers getProfilers(); + /** + * The circuit breaker used to account for the search operation. + */ + public abstract CircuitBreaker circuitBreaker(); + + /** + * Return the amount of memory to buffer locally before accounting for it in the breaker. + */ + public abstract long memAccountingBufferSize(); + + /** + * Checks if the accumulated bytes are greater than the buffer size and if so, checks the available memory in the parent breaker + * (the real memory breaker). + * @param locallyAccumulatedBytes the number of bytes accumulated locally + * @param label the label to use in the breaker + * @return true if the real memory breaker is called and false otherwise + */ + public final boolean checkRealMemoryCB(int locallyAccumulatedBytes, String label) { + if (locallyAccumulatedBytes >= memAccountingBufferSize()) { + circuitBreaker().addEstimateBytesAndMaybeBreak(0, label); + return true; + } + return false; + } + /** * Adds a releasable that will be freed when this context is closed. */ diff --git a/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java b/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java index 951a9b0cf3520..57a9b9282023f 100644 --- a/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.mapper.IdLoader; @@ -203,6 +204,16 @@ public Profilers getProfilers() { return null; } + @Override + public CircuitBreaker circuitBreaker() { + return parent.circuitBreaker(); + } + + @Override + public long memAccountingBufferSize() { + return parent.memAccountingBufferSize(); + } + @Override public long getRelativeTimeInMillis() { return parent.getRelativeTimeInMillis(); diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 2be679c91bd36..1978189c9dde4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -9,6 +9,9 @@ package org.elasticsearch.action.search; import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Query; @@ -20,16 +23,19 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; @@ -55,6 +61,7 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.lookup.Source; import org.elasticsearch.search.profile.ProfileResult; import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult; import org.elasticsearch.search.profile.SearchProfileShardResult; @@ -72,10 +79,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class FetchSearchPhaseTests extends ESTestCase { @@ -808,6 +817,62 @@ public void testFetchTimeoutNoPartialResults() throws IOException { } } + public void testFetchPhaseChecksMemoryBreaker() throws IOException { + Directory dir = newDirectory(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + // we're indexing 100 documents with a field that is 48KB long so the fetch phase should check the memory breaker 4 times + // (every 22 documents that accumulate 1MiB in source sizes, so we'll have 4 checks at roughly 4.1MiB and the last 12 documents will + // not + // accumulate 1MiB anymore so won't check the breaker anymore) + + String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }"; + for (int i = 0; i < 100; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(new StoredField("_source", new BytesRef(body))); + w.addDocument(document); + } + // we account per fetch phase so it doesn't matter if it's one or multiple segments, so let's test both + if (randomBoolean()) { + w.forceMerge(1); + } + IndexReader r = w.getReader(); + w.close(); + ContextIndexSearcher contextIndexSearcher = createSearcher(r); + AtomicInteger breakerCalledCount = new AtomicInteger(0); + NoopCircuitBreaker breakingCircuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST) { + @Override + public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + breakerCalledCount.incrementAndGet(); + } + }; + try (SearchContext searchContext = createSearchContext(contextIndexSearcher, true, breakingCircuitBreaker)) { + FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() { + @Override + public void setNextReader(LeafReaderContext readerContext) throws IOException { + + } + + @Override + public void process(FetchSubPhase.HitContext hitContext) throws IOException { + Source source = hitContext.source(); + hitContext.hit().sourceRef(source.internalSourceRef()); + } + + @Override + public StoredFieldsSpec storedFieldsSpec() { + return StoredFieldsSpec.NEEDS_SOURCE; + } + })); + fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null); + assertThat(breakerCalledCount.get(), is(4)); + } finally { + r.close(); + dir.close(); + } + } + private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException { return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() { @Override @@ -845,6 +910,14 @@ public StoredFieldsSpec storedFieldsSpec() { } private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) { + return createSearchContext(contextIndexSearcher, allowPartialResults, null); + } + + private static SearchContext createSearchContext( + ContextIndexSearcher contextIndexSearcher, + boolean allowPartialResults, + @Nullable CircuitBreaker circuitBreaker + ) { IndexSettings indexSettings = new IndexSettings( IndexMetadata.builder("index") .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) @@ -917,6 +990,15 @@ public FetchSearchResult fetchResult() { public ShardSearchRequest request() { return request; } + + @Override + public CircuitBreaker circuitBreaker() { + if (circuitBreaker != null) { + return circuitBreaker; + } else { + return super.circuitBreaker(); + } + } }; searchContext.addReleasable(searchContext.fetchResult()::decRef); searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap())); diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 82737b13a1a46..335815fcea445 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -33,6 +33,8 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -58,6 +60,7 @@ import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -99,6 +102,8 @@ public class DefaultSearchContextTests extends MapperServiceTestCase { + private static final long MEMORY_ACCOUNTING_BUFFER_SIZE = 1024 * 1024L; + public void testPreProcess() throws Exception { TimeValue timeout = new TimeValue(randomIntBetween(1, 100)); ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class); @@ -184,7 +189,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE ); contextWithoutScroll.from(300); contextWithoutScroll.close(); @@ -226,7 +232,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE + ) ) { context1.from(300); @@ -308,7 +316,8 @@ public ScrollContext scrollContext() { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE ) ) { @@ -350,7 +359,8 @@ public ScrollContext scrollContext() { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE ) ) { context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(); @@ -381,7 +391,8 @@ public ScrollContext scrollContext() { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE ) ) { context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(); @@ -452,7 +463,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE ); assertThat(context.searcher().hasCancellations(), is(false)); @@ -963,6 +975,21 @@ public void testGetFieldCardinalityRuntimeField() { assertEquals(-1, DefaultSearchContext.getFieldCardinality("field", indexService, null)); } + public void testCheckRealMemoryCB() throws Exception { + IndexShard indexShard = null; + try (DefaultSearchContext context = createDefaultSearchContext(Settings.EMPTY)) { + indexShard = context.indexShard(); + // allocated more than the 1MiB buffer + assertThat(context.checkRealMemoryCB(1024 * 1800, "test"), is(true)); + // allocated less than the 1MiB buffer + assertThat(context.checkRealMemoryCB(1024 * 5, "test"), is(false)); + } finally { + if (indexShard != null) { + indexShard.getThreadPool().shutdown(); + } + } + } + private DefaultSearchContext createDefaultSearchContext(Settings providedIndexSettings) throws IOException { return createDefaultSearchContext(providedIndexSettings, null); } @@ -990,7 +1017,9 @@ private DefaultSearchContext createDefaultSearchContext(Settings providedIndexSe SearchExecutionContext searchExecutionContext = mock(SearchExecutionContext.class); when(indexService.newSearchExecutionContext(eq(shardId.id()), eq(shardId.id()), any(), any(), nullable(String.class), any())) .thenReturn(searchExecutionContext); - + CircuitBreakerService breakerService = mock(CircuitBreakerService.class); + when(indexService.breakerService()).thenReturn(breakerService); + when(breakerService.getBreaker(anyString())).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST)); IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); IndexSettings indexSettings; MapperService mapperService; @@ -1054,7 +1083,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, randomFrom(SearchService.ResultsType.values()), randomBoolean(), - randomInt() + randomInt(), + MEMORY_ACCOUNTING_BUFFER_SIZE ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index d034e6e6679c1..03921dfb4359b 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -437,7 +437,12 @@ public Iterable dimensionFields() { () -> query, null, maxBucket, - () -> buildSubSearchContext(indexSettings, searchExecutionContext, bitsetFilterCache), + () -> buildSubSearchContext( + indexSettings, + searchExecutionContext, + bitsetFilterCache, + breakerService.getBreaker(CircuitBreaker.REQUEST) + ), bitsetFilterCache, randomInt(), () -> 0L, @@ -471,7 +476,8 @@ protected List objectMappers() { private SubSearchContext buildSubSearchContext( IndexSettings indexSettings, SearchExecutionContext searchExecutionContext, - BitsetFilterCache bitsetFilterCache + BitsetFilterCache bitsetFilterCache, + CircuitBreaker breaker ) { SearchContext ctx = mock(SearchContext.class); try { @@ -488,7 +494,8 @@ private SubSearchContext buildSubSearchContext( throw new RuntimeException(e); } when(ctx.fetchPhase()).thenReturn(new FetchPhase(Arrays.asList(new FetchSourcePhase(), new FetchDocValuesPhase()))); - + when(ctx.circuitBreaker()).thenReturn(breaker); + when(ctx.memAccountingBufferSize()).thenReturn(1024 * 1024L); /* * Use a QueryShardContext that doesn't contain nested documents so we * don't try to fetch them which would require mocking a whole menagerie diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index c46442485ff9e..bf612ebc70bc4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; @@ -500,6 +501,16 @@ public Profilers getProfilers() { return null; // no profiling } + @Override + public CircuitBreaker circuitBreaker() { + return indexService.getBigArrays().breakerService().getBreaker(CircuitBreaker.REQUEST); + } + + @Override + public long memAccountingBufferSize() { + return 1024 * 1024; + } + @Override public SearchExecutionContext getSearchExecutionContext() { return searchExecutionContext;