- 
                Notifications
    
You must be signed in to change notification settings  - Fork 25.6k
 
Periodically check the available memory when fetching search hits source #121920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 62 commits
8a37ca9
              9b524a3
              64a696a
              19592ef
              40fc178
              72d2c0e
              55f40ef
              8b1f8c1
              edaeebe
              4e3d55f
              ff6ab24
              c103451
              d4b22e1
              667873d
              86dccd6
              fcf7711
              ffe9053
              4083d64
              8831a33
              6ffec2d
              f4e111b
              b64bed5
              69644ba
              9aef36b
              87308c9
              e5c366a
              5a9c770
              e3ea850
              558d4af
              b793459
              1062778
              fbc4bb7
              c89d103
              35e8dce
              d926d33
              3c57ced
              d3d2205
              23f3794
              ee6d444
              a8e842a
              64959b2
              6c3a59d
              f366e7f
              49df2a7
              c0bdf15
              86da7fd
              ebcba60
              83e37e6
              a8cf2c5
              4e47ef6
              9d8c238
              31694f6
              de7492d
              e7f2a3c
              1b0ca70
              979203a
              403720c
              06dcb68
              41b74cb
              6e60c6e
              1469c33
              39a0cc0
              abacdb1
              acf2f63
              fab8b9b
              01f4c90
              0be02e5
              cff3ad8
              47aac2a
              13715a5
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 121920 | ||
| summary: Account for the `SearchHit` source in circuit breaker | ||
| area: Search | ||
| type: enhancement | ||
| issues: | ||
| - 89656 | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -111,7 +111,6 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted | |
| 
     | 
||
| private final RefCounted refCounted; | ||
| 
     | 
||
| // used only in tests | ||
| public SearchHit(int docId) { | ||
| this(docId, null); | ||
| } | ||
| 
          
            
          
           | 
    @@ -293,6 +292,7 @@ public static SearchHit unpooled(int docId, String id) { | |
| } | ||
| 
     | 
||
| public static SearchHit unpooled(int nestedTopDocId, String id, NestedIdentity nestedIdentity) { | ||
| // always referenced search hits do NOT call #deallocate | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might seem as noise but it helped me when making sense of the pool/unpool stuff so thought maybe it's useful to future spelunkers.  | 
||
| return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED); | ||
| } | ||
| 
     | 
||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -274,6 +274,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |
| Property.NodeScope | ||
| ); | ||
| 
     | 
||
| /** | ||
| * The size of the buffer used for memory accounting. | ||
| * This buffer is used to locally track the memory accummulate during the executiong of | ||
                
      
                  andreidan marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| * a search request before submitting the accumulated value to the circuit breaker. | ||
| */ | ||
| public static final Setting<ByteSizeValue> MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting( | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to make this configurable? We're adding a lot of surface area for something that is supposed to be a stopgap solution and in any case, this one feels like something we should be able to set to a useful value based on heap size and search thread-count? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 I think a setting makes sense precisely to accommodate for these kind of guesses in production for various workloads / hardware configurations. I'm happy to go with 1M default (the higher the value the less granularity we have when accounting memory, but 1M doesn't sound too bad) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure if we don't feel comfortable with making a guess here, lets make it configurable (though I do have a hard time seeing the practicality I must say). Also, this isn't really a per-cluster thing? It's more of a node setting since it depends on the size of a node? As far as granularity goes, you want as little granularity as you can get away with. Take my example from a couple days ago: Say you have a budget of 2M but can survive allocating 2.5M and 3 threads each wanting to load 1M during fetch, all of them running in  parallel with at least some overlap. => seems to me the math here is rather easy. We need to leave a considerable safety margin anyway because the accounting super super approximate and we should leverage the fact that it is so approximate because that's an unchangable fact today anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 That's exactly right, which is why I think having it configurable is important. 
 This kind of change is what I envision if we get a hardware profile + workload where the margin of 1MiB is not good enough. 
 This is a good question. Having it a dynamic settings helps with operations (avoiding involved node restarts whilst maintaining HA) whilst local values can still be configured in  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should give this a lower bound of 1M actually and maybe and upper bound of 32M and default it to the G1GC region size. If you think about it, checking at a finer granularity than the TLAB size really makes no sense whatsoever. Likewise, going way above the granularity makes like sense as well.  | 
||
| "search.memory_accounting_buffer_size", | ||
| ByteSizeValue.of(1, ByteSizeUnit.MB), | ||
| Property.Dynamic, | ||
| Property.NodeScope | ||
| ); | ||
| 
     | 
||
| public static final int DEFAULT_SIZE = 10; | ||
| public static final int DEFAULT_FROM = 0; | ||
| private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0]; | ||
| 
        
          
        
         | 
    @@ -291,6 +303,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |
| private final BigArrays bigArrays; | ||
| 
     | 
||
| private final FetchPhase fetchPhase; | ||
| private final CircuitBreaker circuitBreaker; | ||
| private volatile Executor searchExecutor; | ||
| private volatile boolean enableQueryPhaseParallelCollection; | ||
| 
     | 
||
| 
        
          
        
         | 
    @@ -310,6 +323,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |
| 
     | 
||
| private volatile boolean enableRewriteAggsToFilterByFilter; | ||
| 
     | 
||
| private volatile long memoryAccountingBufferSize; | ||
| 
     | 
||
| private final Cancellable keepAliveReaper; | ||
| 
     | 
||
| private final AtomicLong idGenerator = new AtomicLong(); | ||
| 
          
            
          
           | 
    @@ -341,11 +356,8 @@ public SearchService( | |
| this.scriptService = scriptService; | ||
| this.bigArrays = bigArrays; | ||
| this.fetchPhase = fetchPhase; | ||
| this.multiBucketConsumerService = new MultiBucketConsumerService( | ||
| clusterService, | ||
| settings, | ||
| circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) | ||
| ); | ||
| circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); | ||
| this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker); | ||
| this.executorSelector = executorSelector; | ||
| this.tracer = tracer; | ||
| 
     | 
||
| 
          
            
          
           | 
    @@ -390,6 +402,10 @@ public SearchService( | |
| enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings); | ||
| clusterService.getClusterSettings() | ||
| .addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection); | ||
| 
     | 
||
| memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes(); | ||
| clusterService.getClusterSettings() | ||
| .addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes()); | ||
| } | ||
| 
     | 
||
| private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) { | ||
| 
          
            
          
           | 
    @@ -1195,7 +1211,8 @@ private DefaultSearchContext createSearchContext( | |
| searchExecutor, | ||
| resultsType, | ||
| enableQueryPhaseParallelCollection, | ||
| minimumDocsPerSlice | ||
| minimumDocsPerSlice, | ||
| memoryAccountingBufferSize | ||
| ); | ||
| // we clone the query shard context here just for rewriting otherwise we | ||
| // might end up with incorrect state since we are using now() or script services | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -13,6 +13,7 @@ | |
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.lucene.index.LeafReaderContext; | ||
| import org.apache.lucene.search.TotalHits; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; | ||
| import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; | ||
| import org.elasticsearch.index.mapper.IdLoader; | ||
| 
          
            
          
           | 
    @@ -139,11 +140,17 @@ private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Pr | |
| LeafStoredFieldLoader leafStoredFieldLoader; | ||
| SourceLoader.Leaf leafSourceLoader; | ||
| IdLoader.Leaf leafIdLoader; | ||
| int accumulatedBytesInLeaf; | ||
| int docsInLeaf; | ||
| int processedDocs; | ||
| 
     | 
||
| @Override | ||
| protected void setNextReader(LeafReaderContext ctx, int[] docsInLeaf) throws IOException { | ||
| Timer timer = profiler.startNextReader(); | ||
| this.ctx = ctx; | ||
| this.accumulatedBytesInLeaf = 0; | ||
| this.docsInLeaf = docsInLeaf.length; | ||
| this.processedDocs = 0; | ||
| this.leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(ctx); | ||
| this.leafStoredFieldLoader = storedFieldLoader.getLoader(ctx, docsInLeaf); | ||
| this.leafSourceLoader = sourceLoader.leaf(ctx.reader(), docsInLeaf); | ||
| 
        
          
        
         | 
    @@ -162,6 +169,12 @@ protected SearchHit nextDoc(int doc) throws IOException { | |
| if (context.isCancelled()) { | ||
| throw new TaskCancelledException("cancelled"); | ||
| } | ||
| ++processedDocs; | ||
                
       | 
||
| if (context.checkRealMemoryCB(accumulatedBytesInLeaf, processedDocs == docsInLeaf, "fetch source")) { | ||
| // if we checked the real memory breaker, we restart our local accounting | ||
| accumulatedBytesInLeaf = 0; | ||
| } | ||
| 
     | 
||
| HitContext hit = prepareHitContext( | ||
| context, | ||
| requiresSource, | ||
| 
        
          
        
         | 
    @@ -181,6 +194,11 @@ protected SearchHit nextDoc(int doc) throws IOException { | |
| for (FetchSubPhaseProcessor processor : processors) { | ||
| processor.process(hit); | ||
| } | ||
| 
     | 
||
| BytesReference sourceRef = hit.hit().getSourceRef(); | ||
| if (sourceRef != null) { | ||
| this.accumulatedBytesInLeaf += sourceRef.length(); | ||
| } | ||
| success = true; | ||
| return hit.hit(); | ||
| } finally { | ||
| 
          
            
          
           | 
    @@ -291,7 +309,7 @@ private static HitContext prepareNonNestedHitContext( | |
| Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId)); | ||
| return new HitContext(hit, subReaderContext, subDocId, Map.of(), source, rankDoc); | ||
| } else { | ||
| SearchHit hit = new SearchHit(docId, id); | ||
| SearchHit hit = new SearchHit(docId, id, null); | ||
                
      
                  andreidan marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| Source source; | ||
| if (requiresSource) { | ||
| Timer timer = profiler.startLoadingSource(); | ||
| 
          
            
          
           | 
    @@ -369,8 +387,8 @@ private static HitContext prepareNestedHitContext( | |
| assert nestedIdentity != null; | ||
| Source nestedSource = nestedIdentity.extractSource(rootSource); | ||
| 
     | 
||
| SearchHit hit = new SearchHit(topDocId, rootId, nestedIdentity); | ||
| return new HitContext(hit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource, rankDoc); | ||
| SearchHit nestedHit = new SearchHit(topDocId, rootId, nestedIdentity); | ||
                
      
                  original-brownbear marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| return new HitContext(nestedHit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource, rankDoc); | ||
| } | ||
| 
     | 
||
| interface Profiler { | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -12,6 +12,7 @@ | |
| import org.apache.lucene.search.Query; | ||
| import org.apache.lucene.search.TotalHits; | ||
| import org.elasticsearch.action.search.SearchType; | ||
| import org.elasticsearch.common.breaker.CircuitBreaker; | ||
| import org.elasticsearch.core.Assertions; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.Releasable; | ||
| 
          
            
          
           | 
    @@ -369,6 +370,33 @@ public Query rewrittenQuery() { | |
| */ | ||
| public abstract Profilers getProfilers(); | ||
| 
     | 
||
| /** | ||
| * The circuit breaker used to account for the search operation. | ||
| */ | ||
| public abstract CircuitBreaker circuitBreaker(); | ||
| 
     | 
||
| /** | ||
| * Return the amount of memory to buffer locally before accounting for it in the breaker. | ||
| */ | ||
| public abstract long memAccountingBufferSize(); | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it necessary to add these two new methods to SearchContext? I am not quite following where they are used, are they needed mostly for testing purposes or is there more to it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess what I am wondering is if we could make them perhaps arguments of the following method instead, or something along those lines, just to decrease the blast radius of this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had a version where the parameters were exposed as part of   | 
||
| 
     | 
||
| /** | ||
| * Checks if the accumulated bytes are greater than the buffer size and if so, checks the available memory in the parent breaker | ||
| * (the real memory breaker). | ||
| * It also checks the available memory if the caller indicates that the local accounting is finished. | ||
| * @param locallyAccumulatedBytes the number of bytes accumulated locally | ||
| * @param localAccountingFinished if the local accounting is finished | ||
| * @param label the label to use in the breaker | ||
| * @return true if the real memory breaker is called and false otherwise | ||
| */ | ||
| public final boolean checkRealMemoryCB(int locallyAccumulatedBytes, boolean localAccountingFinished, String label) { | ||
| if (locallyAccumulatedBytes >= memAccountingBufferSize() || localAccountingFinished) { | ||
| circuitBreaker().addEstimateBytesAndMaybeBreak(0, label); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| 
     | 
||
| /** | ||
| * Adds a releasable that will be freed when this context is closed. | ||
| */ | ||
| 
          
            
          
           | 
    ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong :)