Skip to content

Commit 5a9c770

Browse files
committed
Local accounting up to 32kb
1 parent e5c366a commit 5a9c770

File tree

9 files changed

+262
-34
lines changed

9 files changed

+262
-34
lines changed

server/src/main/java/org/elasticsearch/common/MemoryAccountingBytesRefCounted.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,31 @@
1212
import org.elasticsearch.common.breaker.CircuitBreaker;
1313
import org.elasticsearch.core.AbstractRefCounted;
1414

15+
/**
16+
* A ref counted object that accounts for memory usage in bytes and releases the
17+
* accounted memory from the circuit breaker when the reference count reaches zero.
18+
*/
1519
public final class MemoryAccountingBytesRefCounted extends AbstractRefCounted {
1620

17-
private long bytes;
21+
private int bytes;
1822
private final CircuitBreaker breaker;
1923

20-
private MemoryAccountingBytesRefCounted(long bytes, CircuitBreaker breaker) {
24+
private MemoryAccountingBytesRefCounted(int bytes, CircuitBreaker breaker) {
2125
this.bytes = bytes;
2226
this.breaker = breaker;
2327
}
2428

2529
public static MemoryAccountingBytesRefCounted create(CircuitBreaker breaker) {
26-
return new MemoryAccountingBytesRefCounted(0L, breaker);
30+
return new MemoryAccountingBytesRefCounted(0, breaker);
2731
}
2832

29-
public void account(long bytes, String label) {
33+
public void account(int bytes, String label) {
3034
breaker.addEstimateBytesAndMaybeBreak(bytes, label);
3135
this.bytes += bytes;
3236
}
3337

3438
@Override
3539
protected void closeInternal() {
36-
if (bytes != 0L) {
37-
breaker.addWithoutBreaking(-bytes);
38-
}
40+
breaker.addWithoutBreaking(-bytes);
3941
}
4042
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ public void apply(Settings value, Settings current, Settings previous) {
535535
SearchModule.SCRIPTED_METRICS_AGG_ALLOWED_STORED_SCRIPTS,
536536
SearchService.SEARCH_WORKER_THREADS_ENABLED,
537537
SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED,
538+
SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE,
538539
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
539540
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
540541
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
274274
Property.NodeScope
275275
);
276276

277+
/**
278+
* The size of the buffer used for memory accounting.
279+
* This buffer is used to locally track the memory accummulate during the executiong of
280+
* a search request before submitting the accumulated value to the circuit breaker.
281+
*/
282+
public static final Setting<ByteSizeValue> MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting(
283+
"search.memory_accounting_buffer_size",
284+
ByteSizeValue.of(32, ByteSizeUnit.KB),
285+
Property.Dynamic,
286+
Property.NodeScope
287+
);
288+
277289
public static final int DEFAULT_SIZE = 10;
278290
public static final int DEFAULT_FROM = 0;
279291
private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0];
@@ -311,6 +323,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
311323

312324
private volatile boolean enableRewriteAggsToFilterByFilter;
313325

326+
private volatile ByteSizeValue memoryAccountingBufferSize;
327+
314328
private final Cancellable keepAliveReaper;
315329

316330
private final AtomicLong idGenerator = new AtomicLong();
@@ -388,6 +402,9 @@ public SearchService(
388402
enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings);
389403
clusterService.getClusterSettings()
390404
.addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);
405+
406+
memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings);
407+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, this::setMemoryAccountingBufferSize);
391408
}
392409

393410
private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
@@ -402,6 +419,10 @@ private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParal
402419
this.enableQueryPhaseParallelCollection = enableQueryPhaseParallelCollection;
403420
}
404421

422+
private void setMemoryAccountingBufferSize(ByteSizeValue memoryAccountingBufferSize) {
423+
this.memoryAccountingBufferSize = memoryAccountingBufferSize;
424+
}
425+
405426
private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
406427
if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
407428
throw new IllegalArgumentException(
@@ -790,7 +811,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
790811
return searchContext.rankFeatureResult();
791812
}
792813
RankFeatureShardPhase.prepareForFetch(searchContext, request);
793-
fetchPhase.execute(searchContext, docIds, null, circuitBreaker);
814+
fetchPhase.execute(searchContext, docIds, null, circuitBreaker, memoryAccountingBufferSize.getBytes());
794815
RankFeatureShardPhase.processFetch(searchContext);
795816
var rankFeatureResult = searchContext.rankFeatureResult();
796817
rankFeatureResult.incRef();
@@ -808,7 +829,7 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
808829
Releasable scope = tracer.withScope(context.getTask());
809830
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
810831
) {
811-
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null, circuitBreaker);
832+
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null, circuitBreaker, memoryAccountingBufferSize.getBytes());
812833
if (reader.singleSession()) {
813834
freeReaderContext(reader.id());
814835
}
@@ -974,7 +995,13 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
974995
System.nanoTime()
975996
)
976997
) {
977-
fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks(), circuitBreaker);
998+
fetchPhase.execute(
999+
searchContext,
1000+
request.docIds(),
1001+
request.getRankDocks(),
1002+
circuitBreaker,
1003+
memoryAccountingBufferSize.getBytes()
1004+
);
9781005
if (readerContext.singleSession()) {
9791006
freeReaderContext(request.contextId());
9801007
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.index.query.SearchExecutionContext;
3636
import org.elasticsearch.search.SearchHit;
3737
import org.elasticsearch.search.SearchHits;
38+
import org.elasticsearch.search.SearchService;
3839
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
3940
import org.elasticsearch.search.aggregations.Aggregator;
4041
import org.elasticsearch.search.aggregations.InternalAggregation;
@@ -56,6 +57,8 @@
5657

5758
class TopHitsAggregator extends MetricsAggregator {
5859

60+
private final long memAccountingBufferSize;
61+
5962
private static class Collectors {
6063
public final TopDocsCollector<?> topDocsCollector;
6164
public final MaxScoreCollector maxScoreCollector;
@@ -87,6 +90,7 @@ private static class Collectors {
8790
this.subSearchContext = subSearchContext;
8891
this.topDocsCollectors = new LongObjectPagedHashMap<>(1, bigArrays);
8992
this.fetchProfiles = context.profiling() ? new ArrayList<>() : null;
93+
this.memAccountingBufferSize = context.getClusterSettings().get(SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE).getBytes();
9094
}
9195

9296
@Override
@@ -197,7 +201,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
197201
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
198202
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
199203
}
200-
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad, context.breaker());
204+
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad, context.breaker(), memAccountingBufferSize);
201205
if (fetchProfiles != null) {
202206
fetchProfiles.add(fetchResult.profileResult());
203207
}
@@ -221,7 +225,12 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
221225
);
222226
}
223227

224-
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad, CircuitBreaker breaker) {
228+
private static FetchSearchResult runFetchPhase(
229+
SubSearchContext subSearchContext,
230+
int[] docIdsToLoad,
231+
CircuitBreaker breaker,
232+
long memAccountingBufferSize
233+
) {
225234
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
226235
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
227236
SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) {
@@ -230,7 +239,7 @@ public SearchExecutionContext getSearchExecutionContext() {
230239
return searchExecutionContext;
231240
}
232241
};
233-
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null, breaker);
242+
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null, breaker, memAccountingBufferSize);
234243
return fetchSubSearchContext.fetchResult();
235244
}
236245

0 commit comments

Comments
 (0)