Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
8a37ca9
Account for the SearchHit source in circuit breaker
andreidan Feb 6, 2025
9b524a3
Add source memory accounting for enrich source
andreidan Feb 6, 2025
64a696a
spotless
andreidan Feb 6, 2025
19592ef
test compile
andreidan Feb 6, 2025
40fc178
Test compilation
andreidan Feb 6, 2025
72d2c0e
test compile
andreidan Feb 6, 2025
55f40ef
no sysout
andreidan Feb 6, 2025
8b1f8c1
DecRef for newly created hit on circuit breaking exception
andreidan Feb 7, 2025
edaeebe
FetchPhase doc iterator lets CircuitBreakingException bubble up
andreidan Feb 7, 2025
4e3d55f
Merge branch 'main' into source-mem-accounting
andreidan Feb 7, 2025
ff6ab24
DecRef after hit creation
andreidan Feb 7, 2025
c103451
Purge hits on CBE
andreidan Feb 8, 2025
d4b22e1
Enrich purge hits on CBE
andreidan Feb 8, 2025
667873d
Merge branch 'main' into source-mem-accounting
andreidan Feb 8, 2025
86dccd6
Merge branch 'main' into source-mem-accounting
elasticmachine Feb 10, 2025
fcf7711
Merge branch 'main' into source-mem-accounting
andreidan Feb 11, 2025
ffe9053
Use a mem accountign ref counted for the parent, unfiltered, source
andreidan Feb 11, 2025
4083d64
Use unfiltered parent source ref counted
andreidan Feb 11, 2025
8831a33
Compile
andreidan Feb 11, 2025
6ffec2d
Compile
andreidan Feb 11, 2025
f4e111b
Use the existing refCounted field in SearchHit
andreidan Feb 11, 2025
b64bed5
Leaktracker.wrap
andreidan Feb 11, 2025
69644ba
compile
andreidan Feb 11, 2025
9aef36b
remove sout
andreidan Feb 11, 2025
87308c9
Only call the breaker if bytes != 0
andreidan Feb 12, 2025
e5c366a
Merge branch 'main' into source-mem-accounting
andreidan Feb 12, 2025
5a9c770
Local accounting up to 32kb
andreidan Feb 18, 2025
e3ea850
Merge branch 'main' into source-mem-accounting
elasticmachine Feb 18, 2025
558d4af
Cosmetics and enhance test
andreidan Feb 18, 2025
b793459
Revert enrich modifications
andreidan Feb 18, 2025
1062778
Revert enrich stuff
andreidan Feb 18, 2025
fbc4bb7
Release things before asserting in finally block
andreidan Feb 18, 2025
c89d103
Remove unused field
andreidan Feb 18, 2025
35e8dce
Remove needless changes
andreidan Feb 18, 2025
d926d33
Update docs/changelog/121920.yaml
andreidan Feb 18, 2025
3c57ced
Assert only if last ref in finally block
andreidan Feb 18, 2025
d3d2205
Update assert to account for fetch phase tripping the cb
andreidan Feb 18, 2025
23f3794
Merge branch 'main' into source-mem-accounting
andreidan Feb 19, 2025
ee6d444
Default accounting buffer to 1M, drop int boxing.
andreidan Feb 19, 2025
a8e842a
Unify the in consumer and supplier in one interface
andreidan Feb 19, 2025
64959b2
use lambda
andreidan Feb 19, 2025
6c3a59d
Line length
andreidan Feb 19, 2025
f366e7f
Use volatile int in `MemoryAccountingBytesRefCounted`
andreidan Feb 19, 2025
49df2a7
Remove field volatile and add some docs as to why
andreidan Feb 19, 2025
c0bdf15
Merge branch 'main' into source-mem-accounting
elasticmachine Feb 20, 2025
86da7fd
Some docs for the functional interface
andreidan Feb 20, 2025
ebcba60
Move the mem accounting buffer size in aggregationcontext
andreidan Feb 20, 2025
83e37e6
AssertBusy to allow for the decRefs to be dec-ed to 0
andreidan Feb 20, 2025
a8cf2c5
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
4e47ef6
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
9d8c238
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
31694f6
Merge branch 'main' into source-mem-accounting
andreidan Feb 21, 2025
de7492d
Use the real memory circuit breaker in the fetch phase
andreidan Feb 22, 2025
e7f2a3c
Compile
andreidan Feb 22, 2025
1b0ca70
static
andreidan Feb 22, 2025
979203a
Drop bool
andreidan Feb 22, 2025
403720c
Fix mocks
andreidan Feb 23, 2025
06dcb68
Have a checkRealMemoryCB method on the SearchContext
andreidan Feb 23, 2025
41b74cb
Revert use of fetch phase source loader trip
andreidan Feb 23, 2025
6e60c6e
Add unit test for the tracking memory in the fetch phase
andreidan Feb 23, 2025
1469c33
[CI] Auto commit changes from spotless
Feb 23, 2025
39a0cc0
Unit test the search context checkRealMemoryCB method
andreidan Feb 23, 2025
abacdb1
Only check the breaker at the end of the segment if the source is req…
andreidan Feb 23, 2025
acf2f63
Merge branch 'main' into source-mem-accounting
andreidan Feb 23, 2025
fab8b9b
Account per fetch phase and add min value for the local acocunting bu…
andreidan Feb 24, 2025
01f4c90
Spotless
andreidan Feb 24, 2025
0be02e5
Drop the flag on finalizing local accounting
andreidan Feb 24, 2025
cff3ad8
Drop leftover
andreidan Feb 24, 2025
47aac2a
Merge branch 'main' into source-mem-accounting
andreidan Feb 24, 2025
13715a5
Renamings
andreidan Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions server/src/main/java/org/elasticsearch/search/SearchHit.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
Expand Down Expand Up @@ -83,6 +85,8 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted
private long primaryTerm;

private BytesReference source;
@Nullable
private BytesReference unfilteredSourceRef;

private final Map<String, DocumentField> documentFields;
private final Map<String, DocumentField> metaFields;
Expand Down Expand Up @@ -110,21 +114,27 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted
private Map<String, SearchHits> innerHits;

private final RefCounted refCounted;
private final CircuitBreaker circuitBreaker;

// used only in tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong :)

public SearchHit(int docId) {
this(docId, null);
public SearchHit(int docId, CircuitBreaker circuitBreaker) {
this(docId, null, circuitBreaker);
}

public SearchHit(int docId, String id) {
this(docId, id, null);
public SearchHit(int docId, String id, CircuitBreaker circuitBreaker) {
this(docId, id, null, circuitBreaker);
}

public SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity) {
this(nestedTopDocId, id, nestedIdentity, null);
public SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity, CircuitBreaker circuitBreaker) {
this(nestedTopDocId, id, nestedIdentity, null, circuitBreaker);
}

private SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity, @Nullable RefCounted refCounted) {
private SearchHit(
int nestedTopDocId,
String id,
NestedIdentity nestedIdentity,
@Nullable RefCounted refCounted,
@Nullable CircuitBreaker circuitBreaker
) {
this(
nestedTopDocId,
DEFAULT_SCORE,
Expand All @@ -145,7 +155,8 @@ private SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity,
null,
new HashMap<>(),
new HashMap<>(),
refCounted
refCounted,
circuitBreaker
);
}

Expand All @@ -169,7 +180,8 @@ public SearchHit(
Map<String, SearchHits> innerHits,
Map<String, DocumentField> documentFields,
Map<String, DocumentField> metaFields,
@Nullable RefCounted refCounted
@Nullable RefCounted refCounted,
@Nullable CircuitBreaker circuitBreaker
) {
this.docId = docId;
this.score = score;
Expand All @@ -191,6 +203,7 @@ public SearchHit(
this.documentFields = documentFields;
this.metaFields = metaFields;
this.refCounted = refCounted == null ? LeakTracker.wrap(new SimpleRefCounted()) : refCounted;
this.circuitBreaker = circuitBreaker;
}

public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOException {
Expand Down Expand Up @@ -280,7 +293,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept
innerHits,
documentFields,
metaFields,
isPooled ? null : ALWAYS_REFERENCED
isPooled ? null : ALWAYS_REFERENCED,
null
);
}

Expand All @@ -293,7 +307,8 @@ public static SearchHit unpooled(int docId, String id) {
}

public static SearchHit unpooled(int nestedTopDocId, String id, NestedIdentity nestedIdentity) {
return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED);
// always referenced search hits do NOT call #deallocate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might seem as noise but it helped me when making sense of the pool/unpool stuff so thought maybe it's useful to future spelunkers.

return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED, null);
}

private static final Text SINGLE_MAPPING_TYPE = new Text(MapperService.SINGLE_MAPPING_NAME);
Expand Down Expand Up @@ -447,6 +462,17 @@ public SearchHit sourceRef(BytesReference source) {
return this;
}

/**
* We track the unfiltered, entire, source so we can release the entire size from the
* circuit breakers when the hit is released.
* The regular source might be a subset of the unfiltered source due to either
* source filtering, field collapsing or inner hits.
*/
public SearchHit unfilteredSourceRef(BytesReference source) {
this.unfilteredSourceRef = source;
return this;
}

/**
* Is the source available or not. A source with no fields will return true. This will return false if {@code fields} doesn't contain
* {@code _source} or if source is disabled in the mapping.
Expand Down Expand Up @@ -724,6 +750,12 @@ private void deallocate() {
r.decRef();
}
SearchHit.this.source = null;

if (unfilteredSourceRef != null && circuitBreaker != null) {
System.out.println(" removing source loaded by inner hit " + unfilteredSourceRef.length());
circuitBreaker.addWithoutBreaking(-unfilteredSourceRef.length());
}
this.unfilteredSourceRef = null;
}

@Override
Expand Down Expand Up @@ -758,7 +790,8 @@ public SearchHit asUnpooled() {
: innerHits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().asUnpooled())),
documentFields,
metaFields,
ALWAYS_REFERENCED
ALWAYS_REFERENCED,
null
);
}

Expand Down
14 changes: 6 additions & 8 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final BigArrays bigArrays;

private final FetchPhase fetchPhase;
private final CircuitBreaker circuitBreaker;
private volatile Executor searchExecutor;
private volatile boolean enableQueryPhaseParallelCollection;

Expand Down Expand Up @@ -340,11 +341,8 @@ public SearchService(
this.scriptService = scriptService;
this.bigArrays = bigArrays;
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(
clusterService,
settings,
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
);
circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker);
this.executorSelector = executorSelector;
this.tracer = tracer;

Expand Down Expand Up @@ -788,7 +786,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
return searchContext.rankFeatureResult();
}
RankFeatureShardPhase.prepareForFetch(searchContext, request);
fetchPhase.execute(searchContext, docIds, null);
fetchPhase.execute(searchContext, docIds, null, circuitBreaker);
RankFeatureShardPhase.processFetch(searchContext);
var rankFeatureResult = searchContext.rankFeatureResult();
rankFeatureResult.incRef();
Expand All @@ -806,7 +804,7 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
Releasable scope = tracer.withScope(context.getTask());
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
) {
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null);
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null, circuitBreaker);
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
Expand Down Expand Up @@ -972,7 +970,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
System.nanoTime()
)
) {
fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks());
fetchPhase.execute(searchContext, request.docIds(), request.getRankDocks(), circuitBreaker);
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.TopScoreDocCollectorManager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.MaxScoreCollector;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -196,7 +197,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
}
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad);
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad, context.breaker());
if (fetchProfiles != null) {
fetchProfiles.add(fetchResult.profileResult());
}
Expand All @@ -220,7 +221,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
);
}

private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) {
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad, CircuitBreaker breaker) {
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) {
Expand All @@ -229,7 +230,7 @@ public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext;
}
};
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null);
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null, breaker);
return fetchSubSearchContext.fetchResult();
}

Expand Down
Loading