Skip to content

Commit 29ccfb8

Browse files
Track bytes used by in-memory postings (#129969)
This patch adds a field postingsInMemoryBytes to the ShardFieldStats record which tracks the memory usage of the min and max posting, which are stored in-memory by the postings FieldReader. This postingsInMemoryBytes value is then used by the serverless autoscaler to better estimate memory requirements. Most of this was already done by @dnhatn in #121476, but was never merged.
1 parent b52c6f7 commit 29ccfb8

File tree

6 files changed

+225
-10
lines changed

6 files changed

+225
-10
lines changed

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -739,15 +739,26 @@ public static Version parseVersionLenient(String toParse, Version defaultValue)
739739
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
740740
*/
741741
public static SegmentReader segmentReader(LeafReader reader) {
742+
SegmentReader segmentReader = tryUnwrapSegmentReader(reader);
743+
if (segmentReader == null) {
744+
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
745+
}
746+
return segmentReader;
747+
}
748+
749+
/**
750+
* Tries to extract a segment reader from the given index reader. Unlike {@link #segmentReader(LeafReader)} this method returns
751+
* null if no SegmentReader can be unwrapped instead of throwing an exception.
752+
*/
753+
public static SegmentReader tryUnwrapSegmentReader(LeafReader reader) {
742754
if (reader instanceof SegmentReader) {
743755
return (SegmentReader) reader;
744756
} else if (reader instanceof final FilterLeafReader fReader) {
745-
return segmentReader(FilterLeafReader.unwrap(fReader));
757+
return tryUnwrapSegmentReader(FilterLeafReader.unwrap(fReader));
746758
} else if (reader instanceof final FilterCodecReader fReader) {
747-
return segmentReader(FilterCodecReader.unwrap(fReader));
759+
return tryUnwrapSegmentReader(FilterCodecReader.unwrap(fReader));
748760
}
749-
// hard fail - we can't get a SegmentReader
750-
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
761+
return null;
751762
}
752763

753764
@SuppressForbidden(reason = "Version#parseLeniently() used in a central place")
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec;
11+
12+
import org.apache.lucene.codecs.Codec;
13+
import org.apache.lucene.codecs.FieldsConsumer;
14+
import org.apache.lucene.codecs.FieldsProducer;
15+
import org.apache.lucene.codecs.FilterCodec;
16+
import org.apache.lucene.codecs.NormsProducer;
17+
import org.apache.lucene.codecs.PostingsFormat;
18+
import org.apache.lucene.index.FieldInfos;
19+
import org.apache.lucene.index.Fields;
20+
import org.apache.lucene.index.FilterLeafReader;
21+
import org.apache.lucene.index.SegmentReadState;
22+
import org.apache.lucene.index.SegmentWriteState;
23+
import org.apache.lucene.index.Terms;
24+
import org.apache.lucene.index.TermsEnum;
25+
import org.apache.lucene.internal.hppc.IntIntHashMap;
26+
import org.apache.lucene.util.BytesRef;
27+
import org.elasticsearch.common.util.FeatureFlag;
28+
29+
import java.io.IOException;
30+
import java.util.function.IntConsumer;
31+
32+
/**
33+
* A codec that tracks the length of the min and max written terms. Used to improve memory usage estimates in serverless, since
34+
* {@link org.apache.lucene.codecs.lucene90.blocktree.FieldReader} keeps an in-memory reference to the min and max term.
35+
*/
36+
public class TrackingPostingsInMemoryBytesCodec extends FilterCodec {
37+
public static final FeatureFlag TRACK_POSTINGS_IN_MEMORY_BYTES = new FeatureFlag("track_postings_in_memory_bytes");
38+
39+
public static final String IN_MEMORY_POSTINGS_BYTES_KEY = "es.postings.in_memory_bytes";
40+
41+
public TrackingPostingsInMemoryBytesCodec(Codec delegate) {
42+
super(delegate.getName(), delegate);
43+
}
44+
45+
@Override
46+
public PostingsFormat postingsFormat() {
47+
PostingsFormat format = super.postingsFormat();
48+
49+
return new PostingsFormat(format.getName()) {
50+
@Override
51+
public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
52+
FieldsConsumer consumer = format.fieldsConsumer(state);
53+
return new TrackingLengthFieldsConsumer(state, consumer);
54+
}
55+
56+
@Override
57+
public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
58+
return format.fieldsProducer(state);
59+
}
60+
};
61+
}
62+
63+
static final class TrackingLengthFieldsConsumer extends FieldsConsumer {
64+
final SegmentWriteState state;
65+
final FieldsConsumer in;
66+
final IntIntHashMap termsBytesPerField;
67+
68+
TrackingLengthFieldsConsumer(SegmentWriteState state, FieldsConsumer in) {
69+
this.state = state;
70+
this.in = in;
71+
this.termsBytesPerField = new IntIntHashMap(state.fieldInfos.size());
72+
}
73+
74+
@Override
75+
public void write(Fields fields, NormsProducer norms) throws IOException {
76+
in.write(new TrackingLengthFields(fields, termsBytesPerField, state.fieldInfos), norms);
77+
long totalBytes = 0;
78+
for (int bytes : termsBytesPerField.values) {
79+
totalBytes += bytes;
80+
}
81+
state.segmentInfo.putAttribute(IN_MEMORY_POSTINGS_BYTES_KEY, Long.toString(totalBytes));
82+
}
83+
84+
@Override
85+
public void close() throws IOException {
86+
in.close();
87+
}
88+
}
89+
90+
static final class TrackingLengthFields extends FilterLeafReader.FilterFields {
91+
final IntIntHashMap termsBytesPerField;
92+
final FieldInfos fieldInfos;
93+
94+
TrackingLengthFields(Fields in, IntIntHashMap termsBytesPerField, FieldInfos fieldInfos) {
95+
super(in);
96+
this.termsBytesPerField = termsBytesPerField;
97+
this.fieldInfos = fieldInfos;
98+
}
99+
100+
@Override
101+
public Terms terms(String field) throws IOException {
102+
Terms terms = super.terms(field);
103+
if (terms == null) {
104+
return null;
105+
}
106+
int fieldNum = fieldInfos.fieldInfo(field).number;
107+
return new TrackingLengthTerms(
108+
terms,
109+
bytes -> termsBytesPerField.put(fieldNum, Math.max(termsBytesPerField.getOrDefault(fieldNum, 0), bytes))
110+
);
111+
}
112+
}
113+
114+
static final class TrackingLengthTerms extends FilterLeafReader.FilterTerms {
115+
final IntConsumer onFinish;
116+
117+
TrackingLengthTerms(Terms in, IntConsumer onFinish) {
118+
super(in);
119+
this.onFinish = onFinish;
120+
}
121+
122+
@Override
123+
public TermsEnum iterator() throws IOException {
124+
return new TrackingLengthTermsEnum(super.iterator(), onFinish);
125+
}
126+
}
127+
128+
static final class TrackingLengthTermsEnum extends FilterLeafReader.FilterTermsEnum {
129+
int maxTermLength = 0;
130+
int minTermLength = 0;
131+
int termCount = 0;
132+
final IntConsumer onFinish;
133+
134+
TrackingLengthTermsEnum(TermsEnum in, IntConsumer onFinish) {
135+
super(in);
136+
this.onFinish = onFinish;
137+
}
138+
139+
@Override
140+
public BytesRef next() throws IOException {
141+
final BytesRef term = super.next();
142+
if (term != null) {
143+
if (termCount == 0) {
144+
minTermLength = term.length;
145+
}
146+
maxTermLength = term.length;
147+
termCount++;
148+
} else {
149+
if (termCount == 1) {
150+
// If the minTerm and maxTerm are the same, only one instance is kept on the heap.
151+
assert minTermLength == maxTermLength;
152+
onFinish.accept(maxTermLength);
153+
} else {
154+
onFinish.accept(maxTermLength + minTermLength);
155+
}
156+
}
157+
return term;
158+
}
159+
}
160+
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.elasticsearch.index.IndexVersion;
6363
import org.elasticsearch.index.VersionType;
6464
import org.elasticsearch.index.codec.FieldInfosWithUsages;
65+
import org.elasticsearch.index.codec.TrackingPostingsInMemoryBytesCodec;
6566
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
6667
import org.elasticsearch.index.mapper.DocumentParser;
6768
import org.elasticsearch.index.mapper.LuceneDocument;
@@ -275,6 +276,7 @@ protected static ShardFieldStats shardFieldStats(List<LeafReaderContext> leaves)
275276
int numSegments = 0;
276277
int totalFields = 0;
277278
long usages = 0;
279+
long totalPostingBytes = 0;
278280
for (LeafReaderContext leaf : leaves) {
279281
numSegments++;
280282
var fieldInfos = leaf.reader().getFieldInfos();
@@ -286,8 +288,19 @@ protected static ShardFieldStats shardFieldStats(List<LeafReaderContext> leaves)
286288
} else {
287289
usages = -1;
288290
}
291+
if (TrackingPostingsInMemoryBytesCodec.TRACK_POSTINGS_IN_MEMORY_BYTES.isEnabled()) {
292+
SegmentReader segmentReader = Lucene.tryUnwrapSegmentReader(leaf.reader());
293+
if (segmentReader != null) {
294+
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
295+
TrackingPostingsInMemoryBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
296+
);
297+
if (postingBytes != null) {
298+
totalPostingBytes += Long.parseLong(postingBytes);
299+
}
300+
}
301+
}
289302
}
290-
return new ShardFieldStats(numSegments, totalFields, usages);
303+
return new ShardFieldStats(numSegments, totalFields, usages, totalPostingBytes);
291304
}
292305

293306
/**

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.codecs.Codec;
1314
import org.apache.lucene.document.NumericDocValuesField;
1415
import org.apache.lucene.index.DirectoryReader;
1516
import org.apache.lucene.index.IndexCommit;
@@ -79,6 +80,7 @@
7980
import org.elasticsearch.index.IndexVersions;
8081
import org.elasticsearch.index.VersionType;
8182
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
83+
import org.elasticsearch.index.codec.TrackingPostingsInMemoryBytesCodec;
8284
import org.elasticsearch.index.mapper.DocumentParser;
8385
import org.elasticsearch.index.mapper.IdFieldMapper;
8486
import org.elasticsearch.index.mapper.LuceneDocument;
@@ -2778,7 +2780,13 @@ private IndexWriterConfig getIndexWriterConfig() {
27782780
iwc.setMaxFullFlushMergeWaitMillis(-1);
27792781
iwc.setSimilarity(engineConfig.getSimilarity());
27802782
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
2781-
iwc.setCodec(engineConfig.getCodec());
2783+
2784+
Codec codec = engineConfig.getCodec();
2785+
if (TrackingPostingsInMemoryBytesCodec.TRACK_POSTINGS_IN_MEMORY_BYTES.isEnabled()) {
2786+
codec = new TrackingPostingsInMemoryBytesCodec(codec);
2787+
}
2788+
iwc.setCodec(codec);
2789+
27822790
boolean useCompoundFile = engineConfig.getUseCompoundFile();
27832791
iwc.setUseCompoundFile(useCompoundFile);
27842792
if (useCompoundFile == false) {

server/src/main/java/org/elasticsearch/index/shard/ShardFieldStats.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
* @param totalFields the total number of fields across the segments
1818
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
1919
* -1 if unavailable
20+
* @param postingsInMemoryBytes the total bytes in memory used for postings across all fields
2021
*/
21-
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages) {
22+
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes) {
2223

2324
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.elasticsearch.index.IndexSettings;
7979
import org.elasticsearch.index.IndexVersion;
8080
import org.elasticsearch.index.codec.CodecService;
81+
import org.elasticsearch.index.codec.TrackingPostingsInMemoryBytesCodec;
8182
import org.elasticsearch.index.engine.CommitStats;
8283
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
8384
import org.elasticsearch.index.engine.Engine;
@@ -1882,8 +1883,12 @@ public void testShardFieldStats() throws IOException {
18821883
assertThat(stats.numSegments(), equalTo(0));
18831884
assertThat(stats.totalFields(), equalTo(0));
18841885
assertThat(stats.fieldUsages(), equalTo(0L));
1886+
assertThat(stats.postingsInMemoryBytes(), equalTo(0L));
1887+
1888+
boolean postingsBytesTrackingEnabled = TrackingPostingsInMemoryBytesCodec.TRACK_POSTINGS_IN_MEMORY_BYTES.isEnabled();
1889+
18851890
// index some documents
1886-
int numDocs = between(1, 10);
1891+
int numDocs = between(2, 10);
18871892
for (int i = 0; i < numDocs; i++) {
18881893
indexDoc(shard, "_doc", "first_" + i, """
18891894
{
@@ -1901,6 +1906,9 @@ public void testShardFieldStats() throws IOException {
19011906
// _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms),
19021907
// f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv),
19031908
assertThat(stats.fieldUsages(), equalTo(13L));
1909+
// _id: (5,8), f1: 3, f1.keyword: 3, f2: 3, f2.keyword: 3
1910+
// 5 + 8 + 3 + 3 + 3 + 3 = 25
1911+
assertThat(stats.postingsInMemoryBytes(), equalTo(postingsBytesTrackingEnabled ? 25L : 0L));
19041912
// don't re-compute on refresh without change
19051913
if (randomBoolean()) {
19061914
shard.refresh("test");
@@ -1919,11 +1927,18 @@ public void testShardFieldStats() throws IOException {
19191927
assertThat(shard.getShardFieldStats(), sameInstance(stats));
19201928
// index more docs
19211929
numDocs = between(1, 10);
1930+
indexDoc(shard, "_doc", "first_0", """
1931+
{
1932+
"f1": "lorem",
1933+
"f2": "bar",
1934+
"f3": "sit amet"
1935+
}
1936+
""");
19221937
for (int i = 0; i < numDocs; i++) {
1923-
indexDoc(shard, "_doc", "first_" + i, """
1938+
indexDoc(shard, "_doc", "first_" + i + 1, """
19241939
{
19251940
"f1": "foo",
1926-
"f2": "bar",
1941+
"f2": "ipsum",
19271942
"f3": "foobar"
19281943
}
19291944
""");
@@ -1948,13 +1963,20 @@ public void testShardFieldStats() throws IOException {
19481963
assertThat(stats.totalFields(), equalTo(21));
19491964
// first segment: 13, second segment: 13 + f3(postings,norms) + f3.keyword(term,dv), and __soft_deletes to previous segment
19501965
assertThat(stats.fieldUsages(), equalTo(31L));
1966+
// segment 1: 25 (see above)
1967+
// segment 2: _id: (5,6), f1: (3,5), f1.keyword: (3,5), f2: (3,5), f2.keyword: (3,5), f3: (4,3), f3.keyword: (6,8)
1968+
// (5+6) + (3+5) + (3+5) + (3+5) + (3+5) + (4+3) + (6+8) = 64
1969+
// 25 + 64 = 89
1970+
assertThat(stats.postingsInMemoryBytes(), equalTo(postingsBytesTrackingEnabled ? 89L : 0L));
19511971
shard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(true));
19521972
stats = shard.getShardFieldStats();
19531973
assertThat(stats.numSegments(), equalTo(1));
19541974
assertThat(stats.totalFields(), equalTo(12));
19551975
// _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms),
19561976
// f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv), f3(postings,norms), f3.keyword(term,dv), __soft_deletes
19571977
assertThat(stats.fieldUsages(), equalTo(18L));
1978+
// _id: (5,8), f1: (3,5), f1.keyword: (3,5), f2: (3,5), f2.keyword: (3,5), f3: (4,3), f3.keyword: (6,8)
1979+
assertThat(stats.postingsInMemoryBytes(), equalTo(postingsBytesTrackingEnabled ? 66L : 0L));
19581980
closeShards(shard);
19591981
}
19601982

0 commit comments

Comments
 (0)