Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
8a37ca9
Account for the SearchHit source in circuit breaker
andreidan Feb 6, 2025
9b524a3
Add source memory accounting for enrich source
andreidan Feb 6, 2025
64a696a
spotless
andreidan Feb 6, 2025
19592ef
test compile
andreidan Feb 6, 2025
40fc178
Test compilation
andreidan Feb 6, 2025
72d2c0e
test compile
andreidan Feb 6, 2025
55f40ef
no sysout
andreidan Feb 6, 2025
8b1f8c1
DecRef for newly created hit on circuit breaking exception
andreidan Feb 7, 2025
edaeebe
FetchPhase doc iterator lets CircuitBreakingException bubble up
andreidan Feb 7, 2025
4e3d55f
Merge branch 'main' into source-mem-accounting
andreidan Feb 7, 2025
ff6ab24
DecRef after hit creation
andreidan Feb 7, 2025
c103451
Purge hits on CBE
andreidan Feb 8, 2025
d4b22e1
Enrich purge hits on CBE
andreidan Feb 8, 2025
667873d
Merge branch 'main' into source-mem-accounting
andreidan Feb 8, 2025
86dccd6
Merge branch 'main' into source-mem-accounting
elasticmachine Feb 10, 2025
fcf7711
Merge branch 'main' into source-mem-accounting
andreidan Feb 11, 2025
ffe9053
Use a mem accountign ref counted for the parent, unfiltered, source
andreidan Feb 11, 2025
4083d64
Use unfiltered parent source ref counted
andreidan Feb 11, 2025
8831a33
Compile
andreidan Feb 11, 2025
6ffec2d
Compile
andreidan Feb 11, 2025
f4e111b
Use the existing refCounted field in SearchHit
andreidan Feb 11, 2025
b64bed5
Leaktracker.wrap
andreidan Feb 11, 2025
69644ba
compile
andreidan Feb 11, 2025
9aef36b
remove sout
andreidan Feb 11, 2025
87308c9
Only call the breaker if bytes != 0
andreidan Feb 12, 2025
e5c366a
Merge branch 'main' into source-mem-accounting
andreidan Feb 12, 2025
5a9c770
Local accounting up to 32kb
andreidan Feb 18, 2025
e3ea850
Merge branch 'main' into source-mem-accounting
elasticmachine Feb 18, 2025
558d4af
Cosmetics and enhance test
andreidan Feb 18, 2025
b793459
Revert enrich modifications
andreidan Feb 18, 2025
1062778
Revert enrich stuff
andreidan Feb 18, 2025
fbc4bb7
Release things before asserting in finally block
andreidan Feb 18, 2025
c89d103
Remove unused field
andreidan Feb 18, 2025
35e8dce
Remove needless changes
andreidan Feb 18, 2025
d926d33
Update docs/changelog/121920.yaml
andreidan Feb 18, 2025
3c57ced
Assert only if last ref in finally block
andreidan Feb 18, 2025
d3d2205
Update assert to account for fetch phase tripping the cb
andreidan Feb 18, 2025
23f3794
Merge branch 'main' into source-mem-accounting
andreidan Feb 19, 2025
ee6d444
Default accounting buffer to 1M, drop int boxing.
andreidan Feb 19, 2025
a8e842a
Unify the in consumer and supplier in one interface
andreidan Feb 19, 2025
64959b2
use lambda
andreidan Feb 19, 2025
6c3a59d
Line length
andreidan Feb 19, 2025
f366e7f
Use volatile int in `MemoryAccountingBytesRefCounted`
andreidan Feb 19, 2025
49df2a7
Remove field volatile and add some docs as to why
andreidan Feb 19, 2025
c0bdf15
Merge branch 'main' into source-mem-accounting
elasticmachine Feb 20, 2025
86da7fd
Some docs for the functional interface
andreidan Feb 20, 2025
ebcba60
Move the mem accounting buffer size in aggregationcontext
andreidan Feb 20, 2025
83e37e6
AssertBusy to allow for the decRefs to be dec-ed to 0
andreidan Feb 20, 2025
a8cf2c5
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
4e47ef6
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
9d8c238
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
31694f6
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
de7492d
Use the real memory circuit breaker in the fetch phase
andreidan Feb 22, 2025
e7f2a3c
Compile
andreidan Feb 22, 2025
1b0ca70
static
andreidan Feb 22, 2025
979203a
Drop bool
andreidan Feb 22, 2025
403720c
Fix mocks
andreidan Feb 23, 2025
06dcb68
Have a checkRealMemoryCB method on the SearchContext
andreidan Feb 23, 2025
41b74cb
Revert use of fetch phase source loader trip
andreidan Feb 23, 2025
6e60c6e
Add unit test for the tracking memory in the fetch phase
andreidan Feb 23, 2025
1469c33
[CI] Auto commit changes from spotless
Feb 23, 2025
39a0cc0
Unit test the search context checkRealMemoryCB method
andreidan Feb 23, 2025
abacdb1
Only check the breaker at the end of the segment if the source is req…
andreidan Feb 23, 2025
acf2f63
Merge branch 'main' into source-mem-accounting
andreidan Feb 23, 2025
fab8b9b
Account per fetch phase and add min value for the local acocunting bu…
andreidan Feb 24, 2025
01f4c90
Spotless
andreidan Feb 24, 2025
0be02e5
Drop the flag on finalizing local accounting
andreidan Feb 24, 2025
cff3ad8
Drop leftover
andreidan Feb 24, 2025
47aac2a
Merge branch 'main' into source-mem-accounting
andreidan Feb 24, 2025
13715a5
Renamings
andreidan Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/121920.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 121920
summary: Account for the `SearchHit` source in circuit breaker
area: Search
type: enhancement
issues:
- 89656
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;

public class TransportSearchIT extends ESIntegTestCase {
Expand Down Expand Up @@ -487,7 +488,10 @@ public void onFailure(Exception e) {
Exception.class,
client.prepareSearch("test").addAggregation(new TestAggregationBuilder("test"))
);
assertThat(exc.getCause().getMessage(), containsString("<reduce_aggs>"));
assertThat(
exc.getCause().getMessage(),
either(containsString("<reduce_aggs>")).or(containsString("fetch phase source loader"))
);
});

final AtomicArray<Exception> exceptions = new AtomicArray<>(10);
Expand All @@ -514,7 +518,10 @@ public void onFailure(Exception exc) {
latch.await();
assertThat(exceptions.asList().size(), equalTo(10));
for (Exception exc : exceptions.asList()) {
assertThat(exc.getCause().getMessage(), containsString("<reduce_aggs>"));
assertThat(
exc.getCause().getMessage(),
either(containsString("<reduce_aggs>")).or(containsString("fetch phase source loader"))
);
}
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.core.AbstractRefCounted;

/**
* A ref counted object that accounts for memory usage in bytes and releases the
* accounted memory from the circuit breaker when the reference count reaches zero.
*/
public final class MemoryAccountingBytesRefCounted extends AbstractRefCounted {

private int bytes;
private final CircuitBreaker breaker;

private MemoryAccountingBytesRefCounted(int bytes, CircuitBreaker breaker) {
this.bytes = bytes;
this.breaker = breaker;
}

public static MemoryAccountingBytesRefCounted create(CircuitBreaker breaker) {
return new MemoryAccountingBytesRefCounted(0, breaker);
}

public void account(int bytes, String label) {
breaker.addEstimateBytesAndMaybeBreak(bytes, label);
this.bytes += bytes;
}

@Override
protected void closeInternal() {
breaker.addWithoutBreaking(-bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchModule.SCRIPTED_METRICS_AGG_ALLOWED_STORED_SCRIPTS,
SearchService.SEARCH_WORKER_THREADS_ENABLED,
SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED,
SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE,
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/search/SearchHit.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted

private final RefCounted refCounted;

// used only in tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong :)

public SearchHit(int docId) {
this(docId, null);
}
Expand All @@ -124,7 +123,7 @@ public SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity) {
this(nestedTopDocId, id, nestedIdentity, null);
}

private SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity, @Nullable RefCounted refCounted) {
public SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity, @Nullable RefCounted refCounted) {
this(
nestedTopDocId,
DEFAULT_SCORE,
Expand Down Expand Up @@ -293,6 +292,7 @@ public static SearchHit unpooled(int docId, String id) {
}

public static SearchHit unpooled(int nestedTopDocId, String id, NestedIdentity nestedIdentity) {
// always referenced search hits do NOT call #deallocate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might seem as noise but it helped me when making sense of the pool/unpool stuff so thought maybe it's useful to future spelunkers.

return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED);
}

Expand Down
41 changes: 33 additions & 8 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

/**
* The size of the buffer used for memory accounting.
* This buffer is used to locally track the memory accummulate during the executiong of
* a search request before submitting the accumulated value to the circuit breaker.
*/
public static final Setting<ByteSizeValue> MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to make this configurable? We're adding a lot of surface area for something that is supposed to be a stopgap solution and in any case, this one feels like something we should be able to set to a useful value based on heap size and search thread-count?
Also, the value seems to me should be far higher than 32kb? If I had to guess, I'd instinctively go for something like ~1M?
So for e.g. a 8 core => 12 threads node (which I think would have 4 or 8G heap in most of our setups?) we'd move in chunks of 0.25% of total heap or less. That seems reasonable and makes the inherent fairness issue of a breaker based solution less bad?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the value seems to me should be far higher than 32kb? If I had to guess, I'd instinctively go for something like ~1M?

I think a setting makes sense precisely to accommodate for these kind of guesses in production for various workloads / hardware configurations.

I'm happy to go with 1M default (the higher the value the less granularity we have when accounting memory, but 1M doesn't sound too bad)
@javanna any strong opinions here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure if we don't feel comfortable with making a guess here, lets make it configurable (though I do have a hard time seeing the practicality I must say). Also, this isn't really a per-cluster thing? It's more of a node setting since it depends on the size of a node?

As far as granularity goes, you want as little granularity as you can get away with. Take my example from a couple days ago:

Say you have a budget of 2M but can survive allocating 2.5M and 3 threads each wanting to load 1M during fetch, all of them running in parallel with at least some overlap.
If your granularity is 1b, then it is almost guaranteed that all of them will get to somewhere around 2/3 of what they're doing and they all fail. Under pressure we degrade into a completely broken state and waste lots of memory only to eventually throw 429.
If your granularity is set to 0.5M then you actually never exceed what you can survive and at least 1 thread will be able to finish its task.
But also, if you go to 1M you might OOM :)

=> seems to me the math here is rather easy. We need to leave a considerable safety margin anyway because the accounting super super approximate and we should leverage the fact that it is so approximate because that's an unchangable fact today anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as granularity goes, you want as little granularity as you can get away with.

That's exactly right, which is why I think having it configurable is important.

If your granularity is set to 0.5M then you actually never exceed what you can survive and at least 1 thread will be able to finish its task.
But also, if you go to 1M you might OOM :)

This kind of change is what I envision if we get a hardware profile + workload where the margin of 1MiB is not good enough.

Also, this isn't really a per-cluster thing?

This is a good question. Having it a dynamic settings helps with operations (avoiding involved node restarts whilst maintaining HA) whilst local values can still be configured in elasticsearch.yml to help with cases where the clusters are non-homogeneous.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should give this a lower bound of 1M actually and maybe and upper bound of 32M and default it to the G1GC region size. If you think about it, checking at a finer granularity than the TLAB size really makes no sense whatsoever. Likewise, going way above the granularity makes like sense as well.
We can't really default the setting to the TLAB size cleanly I think if we make it a cluster setting so I think 1M might be an ok default since that's often our region size?
But since OpenJDK doesn't allow a lower value for the region size we probably shouldn't either.

"search.memory_accounting_buffer_size",
ByteSizeValue.of(32, ByteSizeUnit.KB),
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;
private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0];
Expand All @@ -291,6 +303,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final BigArrays bigArrays;

private final FetchPhase fetchPhase;
private final CircuitBreaker circuitBreaker;
private volatile Executor searchExecutor;
private volatile boolean enableQueryPhaseParallelCollection;

Expand All @@ -310,6 +323,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile boolean enableRewriteAggsToFilterByFilter;

private volatile ByteSizeValue memoryAccountingBufferSize;

private final Cancellable keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
Expand Down Expand Up @@ -341,11 +356,8 @@ public SearchService(
this.scriptService = scriptService;
this.bigArrays = bigArrays;
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(
clusterService,
settings,
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
);
circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker);
this.executorSelector = executorSelector;
this.tracer = tracer;

Expand Down Expand Up @@ -390,6 +402,9 @@ public SearchService(
enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);

memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, this::setMemoryAccountingBufferSize);
}

private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
Expand All @@ -404,6 +419,10 @@ private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParal
this.enableQueryPhaseParallelCollection = enableQueryPhaseParallelCollection;
}

private void setMemoryAccountingBufferSize(ByteSizeValue memoryAccountingBufferSize) {
this.memoryAccountingBufferSize = memoryAccountingBufferSize;
}

private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -792,7 +811,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
return searchContext.rankFeatureResult();
}
RankFeatureShardPhase.prepareForFetch(searchContext, request);
fetchPhase.execute(searchContext, docIds, null);
fetchPhase.execute(searchContext, docIds, null, circuitBreaker, memoryAccountingBufferSize.getBytes());
RankFeatureShardPhase.processFetch(searchContext);
var rankFeatureResult = searchContext.rankFeatureResult();
rankFeatureResult.incRef();
Expand All @@ -810,7 +829,7 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
Releasable scope = tracer.withScope(context.getTask());
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
) {
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null);
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null, circuitBreaker, memoryAccountingBufferSize.getBytes());
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
Expand Down Expand Up @@ -976,7 +995,13 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
System.nanoTime()
)
) {
fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks());
fetchPhase.execute(
searchContext,
request.docIds(),
request.getRankDocks(),
circuitBreaker,
memoryAccountingBufferSize.getBytes()
);
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.TopScoreDocCollectorManager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.MaxScoreCollector;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.BigArrays;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -55,6 +57,8 @@

class TopHitsAggregator extends MetricsAggregator {

private final long memAccountingBufferSize;

private static class Collectors {
public final TopDocsCollector<?> topDocsCollector;
public final MaxScoreCollector maxScoreCollector;
Expand Down Expand Up @@ -86,6 +90,7 @@ private static class Collectors {
this.subSearchContext = subSearchContext;
this.topDocsCollectors = new LongObjectPagedHashMap<>(1, bigArrays);
this.fetchProfiles = context.profiling() ? new ArrayList<>() : null;
this.memAccountingBufferSize = context.getClusterSettings().get(SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE).getBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point about the settings, parsing is costly :) I we go for a setting lets store the value outright instead of parsing settings on the hot path?

Copy link
Contributor Author

@andreidan andreidan Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is this on the hot path - not familiar with aggs but I'd imagine this is at most once / search request? Happy to pull it up somewhere if this is called more often than I thought 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to the AggregationContext

}

@Override
Expand Down Expand Up @@ -196,7 +201,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
}
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad);
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad, context.breaker(), memAccountingBufferSize);
if (fetchProfiles != null) {
fetchProfiles.add(fetchResult.profileResult());
}
Expand All @@ -220,7 +225,12 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
);
}

private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) {
private static FetchSearchResult runFetchPhase(
SubSearchContext subSearchContext,
int[] docIdsToLoad,
CircuitBreaker breaker,
long memAccountingBufferSize
) {
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) {
Expand All @@ -229,7 +239,7 @@ public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext;
}
};
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null);
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null, breaker, memAccountingBufferSize);
return fetchSubSearchContext.fetchResult();
}

Expand Down
Loading