Skip to content

Commit 3fbfa0b

Browse files
committed
Fork Lucene90BlockTreeTermsReader and friends in order to avoid loading large minTerm and maxTerm into jvm heap.
1 parent 2559e28 commit 3fbfa0b

File tree

11 files changed

+3917
-2
lines changed

11 files changed

+3917
-2
lines changed

server/src/main/java/org/elasticsearch/index/codec/postings/ES812PostingsFormat.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.lucene.codecs.PostingsFormat;
2828
import org.apache.lucene.codecs.PostingsReaderBase;
2929
import org.apache.lucene.codecs.PostingsWriterBase;
30-
import org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsReader;
3130
import org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter;
3231
import org.apache.lucene.index.IndexOptions;
3332
import org.apache.lucene.index.SegmentReadState;
@@ -37,6 +36,8 @@
3736
import org.apache.lucene.util.packed.PackedInts;
3837
import org.elasticsearch.core.IOUtils;
3938
import org.elasticsearch.index.codec.ForUtil;
39+
import org.elasticsearch.index.codec.postings.terms.Lucene90BlockTreeTermsReader;
40+
import org.elasticsearch.index.codec.postings.terms.NonForkedHelper;
4041

4142
import java.io.IOException;
4243

@@ -414,7 +415,12 @@ public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException
414415
PostingsReaderBase postingsReader = new ES812PostingsReader(state);
415416
boolean success = false;
416417
try {
417-
FieldsProducer ret = new Lucene90BlockTreeTermsReader(postingsReader, state);
418+
FieldsProducer ret;
419+
if (NonForkedHelper.USE_FORKED_TERMS_READER.isEnabled()) {
420+
ret = new Lucene90BlockTreeTermsReader(postingsReader, state);
421+
} else {
422+
ret = new org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsReader(postingsReader, state);
423+
}
418424
success = true;
419425
return ret;
420426
} finally {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.index.codec.postings.terms;
10+
11+
import org.apache.lucene.store.DataInput;
12+
import org.apache.lucene.util.compress.LowercaseAsciiCompression;
13+
14+
import java.io.IOException;
15+
16+
/** Compression algorithm used for suffixes of a block of terms. */
17+
enum CompressionAlgorithm {
18+
NO_COMPRESSION(0x00) {
19+
20+
@Override
21+
void read(DataInput in, byte[] out, int len) throws IOException {
22+
in.readBytes(out, 0, len);
23+
}
24+
},
25+
26+
LOWERCASE_ASCII(0x01) {
27+
28+
@Override
29+
void read(DataInput in, byte[] out, int len) throws IOException {
30+
LowercaseAsciiCompression.decompress(in, out, len);
31+
}
32+
},
33+
34+
LZ4(0x02) {
35+
36+
@Override
37+
void read(DataInput in, byte[] out, int len) throws IOException {
38+
org.apache.lucene.util.compress.LZ4.decompress(in, len, out, 0);
39+
}
40+
};
41+
42+
private static final CompressionAlgorithm[] BY_CODE = new CompressionAlgorithm[3];
43+
44+
static {
45+
for (CompressionAlgorithm alg : CompressionAlgorithm.values()) {
46+
BY_CODE[alg.code] = alg;
47+
}
48+
}
49+
50+
/** Look up a {@link CompressionAlgorithm} by its {@link CompressionAlgorithm#code}. */
51+
static CompressionAlgorithm byCode(int code) {
52+
if (code < 0 || code >= BY_CODE.length) {
53+
throw new IllegalArgumentException("Illegal code for a compression algorithm: " + code);
54+
}
55+
return BY_CODE[code];
56+
}
57+
58+
public final int code;
59+
60+
CompressionAlgorithm(int code) {
61+
this.code = code;
62+
}
63+
64+
abstract void read(DataInput in, byte[] out, int len) throws IOException;
65+
}
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.index.codec.postings.terms;
10+
11+
import org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter;
12+
import org.apache.lucene.index.FieldInfo;
13+
import org.apache.lucene.index.IndexOptions;
14+
import org.apache.lucene.index.Terms;
15+
import org.apache.lucene.index.TermsEnum;
16+
import org.apache.lucene.store.ByteArrayDataInput;
17+
import org.apache.lucene.store.DataInput;
18+
import org.apache.lucene.store.IndexInput;
19+
import org.apache.lucene.util.BytesRef;
20+
import org.apache.lucene.util.automaton.CompiledAutomaton;
21+
import org.apache.lucene.util.fst.ByteSequenceOutputs;
22+
import org.apache.lucene.util.fst.FST;
23+
import org.apache.lucene.util.fst.OffHeapFSTStore;
24+
25+
import java.io.IOException;
26+
27+
import static org.elasticsearch.index.codec.postings.terms.Lucene90BlockTreeTermsReader.VERSION_MSB_VLONG_OUTPUT;
28+
29+
/**
30+
* BlockTree's implementation of {@link Terms}.
31+
*
32+
* @lucene.internal
33+
*/
34+
public final class FieldReader extends Terms {
35+
36+
// private final boolean DEBUG = BlockTreeTermsWriter.DEBUG;
37+
38+
final long numTerms;
39+
final FieldInfo fieldInfo;
40+
final long sumTotalTermFreq;
41+
final long sumDocFreq;
42+
final int docCount;
43+
final long rootBlockFP;
44+
final BytesRef rootCode;
45+
final BytesRef minTerm;
46+
final BytesRef maxTerm;
47+
final Lucene90BlockTreeTermsReader parent;
48+
49+
final FST<BytesRef> index;
50+
51+
// private boolean DEBUG;
52+
53+
FieldReader(
54+
Lucene90BlockTreeTermsReader parent,
55+
FieldInfo fieldInfo,
56+
long numTerms,
57+
BytesRef rootCode,
58+
long sumTotalTermFreq,
59+
long sumDocFreq,
60+
int docCount,
61+
long indexStartFP,
62+
IndexInput metaIn,
63+
IndexInput indexIn,
64+
BytesRef minTerm,
65+
BytesRef maxTerm
66+
) throws IOException {
67+
assert numTerms > 0;
68+
this.fieldInfo = fieldInfo;
69+
// DEBUG = BlockTreeTermsReader.DEBUG && fieldInfo.name.equals("id");
70+
this.parent = parent;
71+
this.numTerms = numTerms;
72+
this.sumTotalTermFreq = sumTotalTermFreq;
73+
this.sumDocFreq = sumDocFreq;
74+
this.docCount = docCount;
75+
this.minTerm = minTerm;
76+
this.maxTerm = maxTerm;
77+
// if (DEBUG) {
78+
// System.out.println("BTTR: seg=" + segment + " field=" + fieldInfo.name + " rootBlockCode="
79+
// + rootCode + " divisor=" + indexDivisor);
80+
// }
81+
rootBlockFP = readVLongOutput(new ByteArrayDataInput(rootCode.bytes, rootCode.offset, rootCode.length))
82+
>>> Lucene90BlockTreeTermsReader.OUTPUT_FLAGS_NUM_BITS;
83+
// Initialize FST always off-heap.
84+
var metadata = FST.readMetadata(metaIn, ByteSequenceOutputs.getSingleton());
85+
index = FST.fromFSTReader(metadata, new OffHeapFSTStore(indexIn, indexStartFP, metadata));
86+
/*
87+
if (false) {
88+
final String dotFileName = segment + "_" + fieldInfo.name + ".dot";
89+
Writer w = new OutputStreamWriter(new FileOutputStream(dotFileName));
90+
Util.toDot(index, w, false, false);
91+
System.out.println("FST INDEX: SAVED to " + dotFileName);
92+
w.close();
93+
}
94+
*/
95+
BytesRef emptyOutput = metadata.getEmptyOutput();
96+
if (rootCode.equals(emptyOutput) == false) {
97+
// TODO: this branch is never taken
98+
assert false;
99+
this.rootCode = rootCode;
100+
} else {
101+
this.rootCode = emptyOutput;
102+
}
103+
}
104+
105+
long readVLongOutput(DataInput in) throws IOException {
106+
if (parent.version >= VERSION_MSB_VLONG_OUTPUT) {
107+
return readMSBVLong(in);
108+
} else {
109+
return in.readVLong();
110+
}
111+
}
112+
113+
/**
114+
* Decodes a variable length byte[] in MSB order back to long, as written by {@link
115+
* Lucene90BlockTreeTermsWriter#writeMSBVLong}.
116+
*
117+
* <p>Package private for testing.
118+
*/
119+
static long readMSBVLong(DataInput in) throws IOException {
120+
long l = 0L;
121+
while (true) {
122+
byte b = in.readByte();
123+
l = (l << 7) | (b & 0x7FL);
124+
if ((b & 0x80) == 0) {
125+
break;
126+
}
127+
}
128+
return l;
129+
}
130+
131+
@Override
132+
public BytesRef getMin() throws IOException {
133+
if (minTerm == null) {
134+
// Older index that didn't store min/maxTerm
135+
return super.getMin();
136+
} else {
137+
return minTerm;
138+
}
139+
}
140+
141+
@Override
142+
public BytesRef getMax() throws IOException {
143+
if (maxTerm == null) {
144+
// Older index that didn't store min/maxTerm
145+
return super.getMax();
146+
} else {
147+
return maxTerm;
148+
}
149+
}
150+
151+
/** For debugging -- used by CheckIndex too */
152+
@Override
153+
public Stats getStats() throws IOException {
154+
return new SegmentTermsEnum(this).computeBlockStats();
155+
}
156+
157+
@Override
158+
public boolean hasFreqs() {
159+
return fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
160+
}
161+
162+
@Override
163+
public boolean hasOffsets() {
164+
return fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
165+
}
166+
167+
@Override
168+
public boolean hasPositions() {
169+
return fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
170+
}
171+
172+
@Override
173+
public boolean hasPayloads() {
174+
return fieldInfo.hasPayloads();
175+
}
176+
177+
@Override
178+
public TermsEnum iterator() throws IOException {
179+
return new SegmentTermsEnum(this);
180+
}
181+
182+
@Override
183+
public long size() {
184+
return numTerms;
185+
}
186+
187+
@Override
188+
public long getSumTotalTermFreq() {
189+
return sumTotalTermFreq;
190+
}
191+
192+
@Override
193+
public long getSumDocFreq() {
194+
return sumDocFreq;
195+
}
196+
197+
@Override
198+
public int getDocCount() {
199+
return docCount;
200+
}
201+
202+
@Override
203+
public TermsEnum intersect(CompiledAutomaton compiled, BytesRef startTerm) throws IOException {
204+
// if (DEBUG) System.out.println(" FieldReader.intersect startTerm=" +
205+
// ToStringUtils.bytesRefToString(startTerm));
206+
// System.out.println("intersect: " + compiled.type + " a=" + compiled.automaton);
207+
// TODO: we could push "it's a range" or "it's a prefix" down into IntersectTermsEnum?
208+
// can we optimize knowing that...?
209+
if (compiled.type != CompiledAutomaton.AUTOMATON_TYPE.NORMAL) {
210+
throw new IllegalArgumentException("please use CompiledAutomaton.getTermsEnum instead");
211+
}
212+
return new IntersectTermsEnum(
213+
this,
214+
compiled.getTransitionAccessor(),
215+
compiled.getByteRunnable(),
216+
compiled.commonSuffixRef,
217+
startTerm
218+
);
219+
}
220+
221+
@Override
222+
public String toString() {
223+
return "BlockTreeTerms(seg="
224+
+ parent.segment
225+
+ " terms="
226+
+ numTerms
227+
+ ",postings="
228+
+ sumDocFreq
229+
+ ",positions="
230+
+ sumTotalTermFreq
231+
+ ",docs="
232+
+ docCount
233+
+ ")";
234+
}
235+
236+
// CHANGES:
237+
238+
public BytesRef getMinTerm() {
239+
return minTerm;
240+
}
241+
242+
public BytesRef getMaxTerm() {
243+
return maxTerm;
244+
}
245+
246+
// END CHANGES:
247+
}

0 commit comments

Comments
 (0)