Skip to content

Commit f4e111b

Browse files
committed
Use the existing refCounted field in SearchHit
1 parent 6ffec2d commit f4e111b

File tree

4 files changed

+26
-56
lines changed

4 files changed

+26
-56
lines changed

server/src/main/java/org/elasticsearch/common/MemoryAccountingBytesRefCounted.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,30 @@
1010
package org.elasticsearch.common;
1111

1212
import org.elasticsearch.common.breaker.CircuitBreaker;
13-
import org.elasticsearch.common.bytes.BytesReference;
1413
import org.elasticsearch.core.AbstractRefCounted;
1514

1615
public final class MemoryAccountingBytesRefCounted extends AbstractRefCounted {
1716

18-
private BytesReference bytes;
19-
private CircuitBreaker breaker;
17+
private long bytes;
18+
private final CircuitBreaker breaker;
2019

21-
private MemoryAccountingBytesRefCounted(BytesReference bytes, CircuitBreaker breaker) {
20+
private MemoryAccountingBytesRefCounted(long bytes, CircuitBreaker breaker) {
2221
this.bytes = bytes;
2322
this.breaker = breaker;
2423
}
2524

26-
public static MemoryAccountingBytesRefCounted createAndAccountForBytes(
27-
BytesReference bytes,
28-
CircuitBreaker breaker,
29-
String memAccountingLabel
30-
) {
31-
breaker.addEstimateBytesAndMaybeBreak(bytes.length(), memAccountingLabel);
32-
return new MemoryAccountingBytesRefCounted(bytes, breaker);
25+
public static MemoryAccountingBytesRefCounted create(CircuitBreaker breaker) {
26+
return new MemoryAccountingBytesRefCounted(0L, breaker);
27+
}
28+
29+
public void account(long bytes, String label) {
30+
breaker.addEstimateBytesAndMaybeBreak(bytes, label);
31+
this.bytes += bytes;
3332
}
3433

3534
@Override
3635
protected void closeInternal() {
37-
breaker.addWithoutBreaking(-bytes.length());
36+
System.err.println("REMOVING " + bytes);
37+
breaker.addWithoutBreaking(-bytes);
3838
}
3939
}

server/src/main/java/org/elasticsearch/search/SearchHit.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted
8383
private long primaryTerm;
8484

8585
private BytesReference source;
86-
@Nullable
87-
private RefCounted unfilteredSource;
8886

8987
private final Map<String, DocumentField> documentFields;
9088
private final Map<String, DocumentField> metaFields;
@@ -125,7 +123,7 @@ public SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity) {
125123
this(nestedTopDocId, id, nestedIdentity, null);
126124
}
127125

128-
private SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity, @Nullable RefCounted refCounted) {
126+
public SearchHit(int nestedTopDocId, String id, NestedIdentity nestedIdentity, @Nullable RefCounted refCounted) {
129127
this(
130128
nestedTopDocId,
131129
DEFAULT_SCORE,
@@ -449,17 +447,6 @@ public SearchHit sourceRef(BytesReference source) {
449447
return this;
450448
}
451449

452-
/**
453-
* We track the unfiltered, entire, source so we can release the entire size from the
454-
* circuit breakers when the hit is released.
455-
* The regular source might be a subset of the unfiltered source due to either
456-
* source filtering, field collapsing or inner hits.
457-
*/
458-
public SearchHit unfilteredSource(RefCounted source) {
459-
this.unfilteredSource = source;
460-
return this;
461-
}
462-
463450
/**
464451
* Is the source available or not. A source with no fields will return true. This will return false if {@code fields} doesn't contain
465452
* {@code _source} or if source is disabled in the mapping.

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.search.rank.RankDoc;
4141
import org.elasticsearch.search.rank.RankDocShardInfo;
4242
import org.elasticsearch.tasks.TaskCancelledException;
43+
import org.elasticsearch.transport.LeakTracker;
4344
import org.elasticsearch.xcontent.XContentType;
4445

4546
import java.io.IOException;
@@ -302,23 +303,20 @@ private static HitContext prepareNonNestedHitContext(
302303

303304
String id = idLoader.getId(subDocId);
304305
if (id == null) {
305-
SearchHit hit = new SearchHit(docId);
306+
MemoryAccountingBytesRefCounted memAccountingRefCounted = MemoryAccountingBytesRefCounted.create(circuitBreaker);
307+
SearchHit hit = new SearchHit(docId, null, null, LeakTracker.wrap(memAccountingRefCounted));
306308
// TODO: can we use real pooled buffers here as well?
307-
Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId, hit, circuitBreaker));
309+
Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId, memAccountingRefCounted));
308310
return new HitContext(hit, subReaderContext, subDocId, Map.of(), source, rankDoc);
309311
} else {
310-
SearchHit hit = new SearchHit(docId, id);
312+
MemoryAccountingBytesRefCounted memAccountingRefCounted = MemoryAccountingBytesRefCounted.create(circuitBreaker);
313+
SearchHit hit = new SearchHit(docId, id, null, LeakTracker.wrap(memAccountingRefCounted));
311314
Source source;
312315
if (requiresSource) {
313316
Timer timer = profiler.startLoadingSource();
314317
try {
315318
source = sourceLoader.source(leafStoredFieldLoader, subDocId);
316-
MemoryAccountingBytesRefCounted sourceRef = MemoryAccountingBytesRefCounted.createAndAccountForBytes(
317-
source.internalSourceRef(),
318-
circuitBreaker,
319-
"fetch phase source loader"
320-
);
321-
hit.unfilteredSource(sourceRef);
319+
memAccountingRefCounted.account(source.internalSourceRef().length(), "fetch phase source loader");
322320
} catch (CircuitBreakingException e) {
323321
hit.decRef();
324322
throw e;
@@ -328,7 +326,7 @@ private static HitContext prepareNonNestedHitContext(
328326
}
329327
}
330328
} else {
331-
source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId, hit, circuitBreaker));
329+
source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId, memAccountingRefCounted));
332330
}
333331
return new HitContext(hit, subReaderContext, subDocId, leafStoredFieldLoader.storedFields(), source, rankDoc);
334332
}
@@ -338,28 +336,15 @@ private static Supplier<Source> lazyStoredSourceLoader(
338336
Profiler profiler,
339337
LeafReaderContext ctx,
340338
int doc,
341-
SearchHit hit,
342-
CircuitBreaker circuitBreaker
339+
MemoryAccountingBytesRefCounted memAccountingRefCounted
343340
) {
344341
return () -> {
345342
StoredFieldLoader rootLoader = profiler.storedFields(StoredFieldLoader.create(true, Collections.emptySet()));
346343
try {
347344
LeafStoredFieldLoader leafRootLoader = rootLoader.getLoader(ctx, null);
348345
leafRootLoader.advanceTo(doc);
349346
BytesReference source = leafRootLoader.source();
350-
MemoryAccountingBytesRefCounted memAccountingSourceRef = MemoryAccountingBytesRefCounted.createAndAccountForBytes(
351-
source,
352-
circuitBreaker,
353-
"fetch phase source loader"
354-
);
355-
// Saving the entire source we loaded in the hit, so that we can release it entirely when the hit is released
356-
// This is important for the circuit breaker accounting - note that this lazy loader can be triggered in the case of
357-
// inner hits even though the top hit source is not requested (see {@link FetchPhase#prepareNestedHitContext} when
358-
// the `nestedSource` is created), so we need to save the entire source on the hit - we account
359-
// for the top level source via the {@link SearchHit#unfilteredSourceRef(BytesReference)} method because the
360-
// {@link SearchHit#source()} method can be null when the top level source is not requested.
361-
// NB all of the above also applies for source filtering and field collapsing
362-
hit.unfilteredSource(memAccountingSourceRef);
347+
memAccountingRefCounted.account(source.length(), "fetch phase source loader");
363348
return Source.fromBytes(source);
364349
} catch (IOException e) {
365350
throw new UncheckedIOException(e);
@@ -404,7 +389,6 @@ private static HitContext prepareNestedHitContext(
404389
if (requiresSource) {
405390
BytesReference source = leafRootLoader.source();
406391
if (source != null) {
407-
NOOP_CIRCUIT_BREAKER.addEstimateBytesAndMaybeBreak(source.length(), "fetch phase nested hit source loader");
408392
rootSource = Source.fromBytes(source);
409393
}
410394
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,12 +263,11 @@ protected MultiSearchResponse shardOperation(Request request, ShardId shardId) t
263263
}
264264
return context.getFieldType(field);
265265
});
266-
final SearchHit hit = new SearchHit(scoreDoc.doc, visitor.id());
266+
MemoryAccountingBytesRefCounted memAccountingRefCounted = MemoryAccountingBytesRefCounted.create(breaker);
267+
final SearchHit hit = new SearchHit(scoreDoc.doc, visitor.id(), null, memAccountingRefCounted);
267268
try {
268269
BytesReference sourceBytesRef = visitor.source();
269-
MemoryAccountingBytesRefCounted memAccountingSourceRef = MemoryAccountingBytesRefCounted
270-
.createAndAccountForBytes(sourceBytesRef, breaker, "enrich source");
271-
hit.unfilteredSource(memAccountingSourceRef);
270+
memAccountingRefCounted.account(sourceBytesRef.length(), "enrich source");
272271
hit.sourceRef(filterSource(fetchSourceContext, sourceBytesRef));
273272
hits[j] = hit;
274273
} catch (CircuitBreakingException e) {

0 commit comments

Comments
 (0)