Skip to content

Commit 276464d

Browse files
committed
Track posting bytes in memory
1 parent fd1bd79 commit 276464d

File tree

6 files changed

+196
-13
lines changed

6 files changed

+196
-13
lines changed

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,15 +653,26 @@ public static Version parseVersionLenient(String toParse, Version defaultValue)
653653
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
654654
*/
655655
public static SegmentReader segmentReader(LeafReader reader) {
656+
SegmentReader segmentReader = tryUnwrapSegmentReader(reader);
657+
if (segmentReader == null) {
658+
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
659+
}
660+
return segmentReader;
661+
}
662+
663+
/**
664+
* Tries to extract a segment reader from the given index reader. Unlike {@link #segmentReader(LeafReader)} this method returns
665+
* null if no SegmentReader can be unwrapped instead of throwing an exception.
666+
*/
667+
public static SegmentReader tryUnwrapSegmentReader(LeafReader reader) {
656668
if (reader instanceof SegmentReader) {
657669
return (SegmentReader) reader;
658670
} else if (reader instanceof final FilterLeafReader fReader) {
659-
return segmentReader(FilterLeafReader.unwrap(fReader));
671+
return tryUnwrapSegmentReader(FilterLeafReader.unwrap(fReader));
660672
} else if (reader instanceof final FilterCodecReader fReader) {
661-
return segmentReader(FilterCodecReader.unwrap(fReader));
673+
return tryUnwrapSegmentReader(FilterCodecReader.unwrap(fReader));
662674
}
663-
// hard fail - we can't get a SegmentReader
664-
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
675+
return null;
665676
}
666677

667678
@SuppressForbidden(reason = "Version#parseLeniently() used in a central place")
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec;
11+
12+
import org.apache.lucene.codecs.Codec;
13+
import org.apache.lucene.codecs.FieldsConsumer;
14+
import org.apache.lucene.codecs.FieldsProducer;
15+
import org.apache.lucene.codecs.FilterCodec;
16+
import org.apache.lucene.codecs.NormsProducer;
17+
import org.apache.lucene.codecs.PostingsFormat;
18+
import org.apache.lucene.codecs.SegmentInfoFormat;
19+
import org.apache.lucene.index.Fields;
20+
import org.apache.lucene.index.FilterLeafReader;
21+
import org.apache.lucene.index.SegmentInfo;
22+
import org.apache.lucene.index.SegmentReadState;
23+
import org.apache.lucene.index.SegmentWriteState;
24+
import org.apache.lucene.index.Terms;
25+
import org.apache.lucene.index.TermsEnum;
26+
import org.apache.lucene.store.Directory;
27+
import org.apache.lucene.store.IOContext;
28+
import org.apache.lucene.util.BytesRef;
29+
30+
import java.io.IOException;
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
import java.util.function.IntConsumer;
34+
35+
public final class TrackingInMemoryPostingsBytesCodec extends FilterCodec {
36+
public static final String IN_MEMORY_POSTINGS_BYTES_KEY = "es.postings.in_memory_bytes";
37+
38+
public TrackingInMemoryPostingsBytesCodec(Codec delegate) {
39+
super(delegate.getName(), delegate);
40+
}
41+
42+
@Override
43+
public SegmentInfoFormat segmentInfoFormat() {
44+
SegmentInfoFormat format = super.segmentInfoFormat();
45+
return new SegmentInfoFormat() {
46+
@Override
47+
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
48+
return format.read(directory, segmentName, segmentID, context);
49+
}
50+
51+
@Override
52+
public void write(Directory dir, SegmentInfo info, IOContext ioContext) throws IOException {
53+
format.write(dir, info, ioContext);
54+
}
55+
};
56+
}
57+
58+
@Override
59+
public PostingsFormat postingsFormat() {
60+
PostingsFormat format = super.postingsFormat();
61+
return new PostingsFormat(format.getName()) {
62+
@Override
63+
public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
64+
FieldsConsumer consumer = format.fieldsConsumer(state);
65+
return new TrackingLengthFieldsConsumer(state, consumer);
66+
}
67+
68+
@Override
69+
public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
70+
return format.fieldsProducer(state);
71+
}
72+
};
73+
}
74+
75+
static final class TrackingLengthFieldsConsumer extends FieldsConsumer {
76+
final SegmentWriteState state;
77+
final FieldsConsumer in;
78+
final Map<String, Integer> maxLengths = new HashMap<>();
79+
80+
TrackingLengthFieldsConsumer(SegmentWriteState state, FieldsConsumer in) {
81+
this.state = state;
82+
this.in = in;
83+
}
84+
85+
@Override
86+
public void write(Fields fields, NormsProducer norms) throws IOException {
87+
in.write(new TrackingLengthFields(fields, maxLengths), norms);
88+
long totalLength = 0;
89+
for (int len : maxLengths.values()) {
90+
totalLength += len; // minTerm
91+
totalLength += len; // maxTerm
92+
}
93+
state.segmentInfo.putAttribute(IN_MEMORY_POSTINGS_BYTES_KEY, Long.toString(totalLength));
94+
}
95+
96+
@Override
97+
public void close() throws IOException {
98+
in.close();
99+
}
100+
}
101+
102+
static final class TrackingLengthFields extends FilterLeafReader.FilterFields {
103+
final Map<String, Integer> maxLengths;
104+
105+
TrackingLengthFields(Fields in, Map<String, Integer> maxLengths) {
106+
super(in);
107+
this.maxLengths = maxLengths;
108+
}
109+
110+
@Override
111+
public Terms terms(String field) throws IOException {
112+
Terms terms = super.terms(field);
113+
if (terms == null) {
114+
return null;
115+
}
116+
return new TrackingLengthTerms(terms, len -> maxLengths.compute(field, (k, v) -> v == null ? len : Math.max(v, len)));
117+
}
118+
}
119+
120+
static final class TrackingLengthTerms extends FilterLeafReader.FilterTerms {
121+
final IntConsumer onFinish;
122+
123+
TrackingLengthTerms(Terms in, IntConsumer onFinish) {
124+
super(in);
125+
this.onFinish = onFinish;
126+
}
127+
128+
@Override
129+
public TermsEnum iterator() throws IOException {
130+
return new TrackingLengthTermsEnum(super.iterator(), onFinish);
131+
}
132+
}
133+
134+
static final class TrackingLengthTermsEnum extends FilterLeafReader.FilterTermsEnum {
135+
int maxTermLength = 0;
136+
final IntConsumer onFinish;
137+
138+
TrackingLengthTermsEnum(TermsEnum in, IntConsumer onFinish) {
139+
super(in);
140+
this.onFinish = onFinish;
141+
}
142+
143+
@Override
144+
public BytesRef next() throws IOException {
145+
final BytesRef term = super.next();
146+
if (term != null) {
147+
maxTermLength = Math.max(maxTermLength, term.length);
148+
} else {
149+
onFinish.accept(maxTermLength);
150+
}
151+
return term;
152+
}
153+
}
154+
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.elasticsearch.index.IndexVersions;
8080
import org.elasticsearch.index.VersionType;
8181
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
82+
import org.elasticsearch.index.codec.TrackingInMemoryPostingsBytesCodec;
8283
import org.elasticsearch.index.mapper.DocumentParser;
8384
import org.elasticsearch.index.mapper.IdFieldMapper;
8485
import org.elasticsearch.index.mapper.LuceneDocument;
@@ -2734,7 +2735,7 @@ private IndexWriterConfig getIndexWriterConfig() {
27342735
iwc.setMaxFullFlushMergeWaitMillis(-1);
27352736
iwc.setSimilarity(engineConfig.getSimilarity());
27362737
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
2737-
iwc.setCodec(engineConfig.getCodec());
2738+
iwc.setCodec(new TrackingInMemoryPostingsBytesCodec(engineConfig.getCodec()));
27382739
boolean useCompoundFile = engineConfig.getUseCompoundFile();
27392740
iwc.setUseCompoundFile(useCompoundFile);
27402741
if (useCompoundFile == false) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.lucene.analysis.Analyzer;
1414
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
15+
import org.apache.lucene.codecs.FieldsProducer;
1516
import org.apache.lucene.index.CheckIndex;
1617
import org.apache.lucene.index.DirectoryReader;
1718
import org.apache.lucene.index.FieldInfos;
@@ -20,6 +21,7 @@
2021
import org.apache.lucene.index.LeafReader;
2122
import org.apache.lucene.index.LeafReaderContext;
2223
import org.apache.lucene.index.SegmentInfos;
24+
import org.apache.lucene.index.SegmentReader;
2325
import org.apache.lucene.search.QueryCachingPolicy;
2426
import org.apache.lucene.search.ReferenceManager;
2527
import org.apache.lucene.search.Sort;
@@ -85,6 +87,7 @@
8587
import org.elasticsearch.index.cache.request.ShardRequestCache;
8688
import org.elasticsearch.index.codec.CodecService;
8789
import org.elasticsearch.index.codec.FieldInfosWithUsages;
90+
import org.elasticsearch.index.codec.TrackingInMemoryPostingsBytesCodec;
8891
import org.elasticsearch.index.engine.CommitStats;
8992
import org.elasticsearch.index.engine.Engine;
9093
import org.elasticsearch.index.engine.Engine.GetResult;
@@ -4145,6 +4148,7 @@ public void afterRefresh(boolean didRefresh) {
41454148
int numSegments = 0;
41464149
int totalFields = 0;
41474150
long usages = 0;
4151+
long totalPostingBytes = 0;
41484152
for (LeafReaderContext leaf : searcher.getLeafContexts()) {
41494153
numSegments++;
41504154
var fieldInfos = leaf.reader().getFieldInfos();
@@ -4156,8 +4160,18 @@ public void afterRefresh(boolean didRefresh) {
41564160
} else {
41574161
usages = -1;
41584162
}
4163+
SegmentReader segmentReader = Lucene.tryUnwrapSegmentReader(leaf.reader());
4164+
if (segmentReader != null) {
4165+
FieldsProducer postingsReader = segmentReader.getPostingsReader();
4166+
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
4167+
TrackingInMemoryPostingsBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
4168+
);
4169+
if (postingBytes != null) {
4170+
totalPostingBytes += Long.parseLong(postingBytes);
4171+
}
4172+
}
41594173
}
4160-
shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages);
4174+
shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages, totalPostingBytes);
41614175
} catch (AlreadyClosedException ignored) {
41624176

41634177
}

server/src/main/java/org/elasticsearch/index/shard/ShardFieldStats.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
* A per shard stats including the number of segments and total fields across those segments.
1414
* These stats should be recomputed whenever the shard is refreshed.
1515
*
16-
* @param numSegments the number of segments
17-
* @param totalFields the total number of fields across the segments
18-
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
19-
* -1 if unavailable
16+
* @param numSegments the number of segments
17+
* @param totalFields the total number of fields across the segments
18+
* @param fieldUsages the number of usages for segment-level fields (e.g., doc_values, postings, norms, points)
19+
* -1 if unavailable
20+
* @param postingsInMemoryBytes the total bytes in memory used for postings across all fields
2021
*/
21-
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages) {
22+
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes) {
2223

2324
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,7 +1877,7 @@ public void testShardFieldStats() throws IOException {
18771877
assertThat(stats.totalFields(), equalTo(0));
18781878
assertThat(stats.fieldUsages(), equalTo(0L));
18791879
// index some documents
1880-
int numDocs = between(1, 10);
1880+
int numDocs = between(2, 10);
18811881
for (int i = 0; i < numDocs; i++) {
18821882
indexDoc(shard, "_doc", "first_" + i, """
18831883
{
@@ -1895,6 +1895,7 @@ public void testShardFieldStats() throws IOException {
18951895
// _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms),
18961896
// f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv),
18971897
assertThat(stats.fieldUsages(), equalTo(13L));
1898+
assertThat(stats.postingsInMemoryBytes(), equalTo(40L));
18981899
// don't re-compute on refresh without change
18991900
if (randomBoolean()) {
19001901
shard.refresh("test");
@@ -1912,7 +1913,7 @@ public void testShardFieldStats() throws IOException {
19121913
}
19131914
assertThat(shard.getShardFieldStats(), sameInstance(stats));
19141915
// index more docs
1915-
numDocs = between(1, 10);
1916+
numDocs = between(2, 10);
19161917
for (int i = 0; i < numDocs; i++) {
19171918
indexDoc(shard, "_doc", "first_" + i, """
19181919
{
@@ -1949,6 +1950,7 @@ public void testShardFieldStats() throws IOException {
19491950
// _id(term), _source(0), _version(dv), _primary_term(dv), _seq_no(point,dv), f1(postings,norms),
19501951
// f1.keyword(term,dv), f2(postings,norms), f2.keyword(term,dv), f3(postings,norms), f3.keyword(term,dv), __soft_deletes
19511952
assertThat(stats.fieldUsages(), equalTo(18L));
1953+
assertThat(stats.postingsInMemoryBytes(), equalTo(64L));
19521954
closeShards(shard);
19531955
}
19541956

0 commit comments

Comments
 (0)