-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Periodically check the available memory when fetching search hits source #121920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 67 commits
8a37ca9
9b524a3
64a696a
19592ef
40fc178
72d2c0e
55f40ef
8b1f8c1
edaeebe
4e3d55f
ff6ab24
c103451
d4b22e1
667873d
86dccd6
fcf7711
ffe9053
4083d64
8831a33
6ffec2d
f4e111b
b64bed5
69644ba
9aef36b
87308c9
e5c366a
5a9c770
e3ea850
558d4af
b793459
1062778
fbc4bb7
c89d103
35e8dce
d926d33
3c57ced
d3d2205
23f3794
ee6d444
a8e842a
64959b2
6c3a59d
f366e7f
49df2a7
c0bdf15
86da7fd
ebcba60
83e37e6
a8cf2c5
4e47ef6
9d8c238
31694f6
de7492d
e7f2a3c
1b0ca70
979203a
403720c
06dcb68
41b74cb
6e60c6e
1469c33
39a0cc0
abacdb1
acf2f63
fab8b9b
01f4c90
0be02e5
cff3ad8
47aac2a
13715a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 121920 | ||
| summary: Account for the `SearchHit` source in circuit breaker | ||
| area: Search | ||
| type: enhancement | ||
| issues: | ||
| - 89656 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -111,7 +111,6 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted | |
|
|
||
| private final RefCounted refCounted; | ||
|
|
||
| // used only in tests | ||
| public SearchHit(int docId) { | ||
| this(docId, null); | ||
| } | ||
|
|
@@ -293,6 +292,7 @@ public static SearchHit unpooled(int docId, String id) { | |
| } | ||
|
|
||
| public static SearchHit unpooled(int nestedTopDocId, String id, NestedIdentity nestedIdentity) { | ||
| // always referenced search hits do NOT call #deallocate | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might seem as noise but it helped me when making sense of the pool/unpool stuff so thought maybe it's useful to future spelunkers. |
||
| return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -274,6 +274,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |
| Property.NodeScope | ||
| ); | ||
|
|
||
| /** | ||
| * The size of the buffer used for memory accounting. | ||
| * This buffer is used to locally track the memory accummulate during the executiong of | ||
andreidan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * a search request before submitting the accumulated value to the circuit breaker. | ||
| */ | ||
| public static final Setting<ByteSizeValue> MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to make this configurable? We're adding a lot of surface area for something that is supposed to be a stopgap solution and in any case, this one feels like something we should be able to set to a useful value based on heap size and search thread-count? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think a setting makes sense precisely to accommodate for these kind of guesses in production for various workloads / hardware configurations. I'm happy to go with 1M default (the higher the value the less granularity we have when accounting memory, but 1M doesn't sound too bad) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure if we don't feel comfortable with making a guess here, lets make it configurable (though I do have a hard time seeing the practicality I must say). Also, this isn't really a per-cluster thing? It's more of a node setting since it depends on the size of a node? As far as granularity goes, you want as little granularity as you can get away with. Take my example from a couple days ago: Say you have a budget of 2M but can survive allocating 2.5M and 3 threads each wanting to load 1M during fetch, all of them running in parallel with at least some overlap. => seems to me the math here is rather easy. We need to leave a considerable safety margin anyway because the accounting super super approximate and we should leverage the fact that it is so approximate because that's an unchangable fact today anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's exactly right, which is why I think having it configurable is important.
This kind of change is what I envision if we get a hardware profile + workload where the margin of 1MiB is not good enough.
This is a good question. Having it a dynamic settings helps with operations (avoiding involved node restarts whilst maintaining HA) whilst local values can still be configured in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should give this a lower bound of 1M actually and maybe and upper bound of 32M and default it to the G1GC region size. If you think about it, checking at a finer granularity than the TLAB size really makes no sense whatsoever. Likewise, going way above the granularity makes like sense as well. |
||
| "search.memory_accounting_buffer_size", | ||
| ByteSizeValue.of(1, ByteSizeUnit.MB), | ||
| ByteSizeValue.of(1, ByteSizeUnit.MB), | ||
| ByteSizeValue.ofBytes(Long.MAX_VALUE), | ||
| Property.Dynamic, | ||
| Property.NodeScope | ||
| ); | ||
|
|
||
| public static final int DEFAULT_SIZE = 10; | ||
| public static final int DEFAULT_FROM = 0; | ||
| private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0]; | ||
|
|
@@ -291,6 +305,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |
| private final BigArrays bigArrays; | ||
|
|
||
| private final FetchPhase fetchPhase; | ||
| private final CircuitBreaker circuitBreaker; | ||
| private volatile Executor searchExecutor; | ||
| private volatile boolean enableQueryPhaseParallelCollection; | ||
|
|
||
|
|
@@ -310,6 +325,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv | |
|
|
||
| private volatile boolean enableRewriteAggsToFilterByFilter; | ||
|
|
||
| private volatile long memoryAccountingBufferSize; | ||
|
|
||
| private final Cancellable keepAliveReaper; | ||
|
|
||
| private final AtomicLong idGenerator = new AtomicLong(); | ||
|
|
@@ -341,11 +358,8 @@ public SearchService( | |
| this.scriptService = scriptService; | ||
| this.bigArrays = bigArrays; | ||
| this.fetchPhase = fetchPhase; | ||
| this.multiBucketConsumerService = new MultiBucketConsumerService( | ||
| clusterService, | ||
| settings, | ||
| circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) | ||
| ); | ||
| circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); | ||
| this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker); | ||
| this.executorSelector = executorSelector; | ||
| this.tracer = tracer; | ||
|
|
||
|
|
@@ -390,6 +404,10 @@ public SearchService( | |
| enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings); | ||
| clusterService.getClusterSettings() | ||
| .addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection); | ||
|
|
||
| memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes(); | ||
| clusterService.getClusterSettings() | ||
| .addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes()); | ||
| } | ||
|
|
||
| private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) { | ||
|
|
@@ -1195,7 +1213,8 @@ private DefaultSearchContext createSearchContext( | |
| searchExecutor, | ||
| resultsType, | ||
| enableQueryPhaseParallelCollection, | ||
| minimumDocsPerSlice | ||
| minimumDocsPerSlice, | ||
| memoryAccountingBufferSize | ||
| ); | ||
| // we clone the query shard context here just for rewriting otherwise we | ||
| // might end up with incorrect state since we are using now() or script services | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| import org.apache.lucene.search.Query; | ||
| import org.apache.lucene.search.TotalHits; | ||
| import org.elasticsearch.action.search.SearchType; | ||
| import org.elasticsearch.common.breaker.CircuitBreaker; | ||
| import org.elasticsearch.core.Assertions; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.Releasable; | ||
|
|
@@ -369,6 +370,31 @@ public Query rewrittenQuery() { | |
| */ | ||
| public abstract Profilers getProfilers(); | ||
|
|
||
| /** | ||
| * The circuit breaker used to account for the search operation. | ||
| */ | ||
| public abstract CircuitBreaker circuitBreaker(); | ||
|
|
||
| /** | ||
| * Return the amount of memory to buffer locally before accounting for it in the breaker. | ||
| */ | ||
| public abstract long memAccountingBufferSize(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it necessary to add these two new methods to SearchContext? I am not quite following where they are used, are they needed mostly for testing purposes or is there more to it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess what I am wondering is if we could make them perhaps arguments of the following method instead, or something along those lines, just to decrease the blast radius of this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had a version where the parameters were exposed as part of |
||
|
|
||
| /** | ||
| * Checks if the accumulated bytes are greater than the buffer size and if so, checks the available memory in the parent breaker | ||
| * (the real memory breaker). | ||
| * @param locallyAccumulatedBytes the number of bytes accumulated locally | ||
| * @param label the label to use in the breaker | ||
| * @return true if the real memory breaker is called and false otherwise | ||
| */ | ||
| public final boolean checkRealMemoryCB(int locallyAccumulatedBytes, String label) { | ||
| if (locallyAccumulatedBytes >= memAccountingBufferSize()) { | ||
| circuitBreaker().addEstimateBytesAndMaybeBreak(0, label); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Adds a releasable that will be freed when this context is closed. | ||
| */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong :)