Skip to content

Commit f1fc12f

Browse files
committed
Fork Lucene90BlockTreeTermsReader and friends in order to avoid loading large minTerm and maxTerm into jvm heap.
1 parent 2559e28 commit f1fc12f

File tree

11 files changed

+3912
-2
lines changed

11 files changed

+3912
-2
lines changed

server/src/main/java/org/elasticsearch/index/codec/postings/ES812PostingsFormat.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.lucene.codecs.PostingsFormat;
2828
import org.apache.lucene.codecs.PostingsReaderBase;
2929
import org.apache.lucene.codecs.PostingsWriterBase;
30-
import org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsReader;
3130
import org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter;
3231
import org.apache.lucene.index.IndexOptions;
3332
import org.apache.lucene.index.SegmentReadState;
@@ -37,6 +36,8 @@
3736
import org.apache.lucene.util.packed.PackedInts;
3837
import org.elasticsearch.core.IOUtils;
3938
import org.elasticsearch.index.codec.ForUtil;
39+
import org.elasticsearch.index.codec.postings.terms.Lucene90BlockTreeTermsReader;
40+
import org.elasticsearch.index.codec.postings.terms.NonForkedHelper;
4041

4142
import java.io.IOException;
4243

@@ -414,7 +415,12 @@ public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException
414415
PostingsReaderBase postingsReader = new ES812PostingsReader(state);
415416
boolean success = false;
416417
try {
417-
FieldsProducer ret = new Lucene90BlockTreeTermsReader(postingsReader, state);
418+
FieldsProducer ret;
419+
if (NonForkedHelper.USE_FORKED_TERMS_READER.isEnabled()) {
420+
ret = new Lucene90BlockTreeTermsReader(postingsReader, state);
421+
} else {
422+
ret = new org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsReader(postingsReader, state);
423+
}
418424
success = true;
419425
return ret;
420426
} finally {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.index.codec.postings.terms;
10+
11+
import org.apache.lucene.store.DataInput;
12+
import org.apache.lucene.util.compress.LowercaseAsciiCompression;
13+
14+
import java.io.IOException;
15+
16+
/** Compression algorithm used for suffixes of a block of terms. */
17+
enum CompressionAlgorithm {
18+
NO_COMPRESSION(0x00) {
19+
20+
@Override
21+
void read(DataInput in, byte[] out, int len) throws IOException {
22+
in.readBytes(out, 0, len);
23+
}
24+
},
25+
26+
LOWERCASE_ASCII(0x01) {
27+
28+
@Override
29+
void read(DataInput in, byte[] out, int len) throws IOException {
30+
LowercaseAsciiCompression.decompress(in, out, len);
31+
}
32+
},
33+
34+
LZ4(0x02) {
35+
36+
@Override
37+
void read(DataInput in, byte[] out, int len) throws IOException {
38+
org.apache.lucene.util.compress.LZ4.decompress(in, len, out, 0);
39+
}
40+
};
41+
42+
private static final CompressionAlgorithm[] BY_CODE = new CompressionAlgorithm[3];
43+
44+
static {
45+
for (CompressionAlgorithm alg : CompressionAlgorithm.values()) {
46+
BY_CODE[alg.code] = alg;
47+
}
48+
}
49+
50+
/** Look up a {@link CompressionAlgorithm} by its {@link CompressionAlgorithm#code}. */
51+
static CompressionAlgorithm byCode(int code) {
52+
if (code < 0 || code >= BY_CODE.length) {
53+
throw new IllegalArgumentException("Illegal code for a compression algorithm: " + code);
54+
}
55+
return BY_CODE[code];
56+
}
57+
58+
public final int code;
59+
60+
CompressionAlgorithm(int code) {
61+
this.code = code;
62+
}
63+
64+
abstract void read(DataInput in, byte[] out, int len) throws IOException;
65+
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.index.codec.postings.terms;
10+
11+
import org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter;
12+
import org.apache.lucene.index.FieldInfo;
13+
import org.apache.lucene.index.IndexOptions;
14+
import org.apache.lucene.index.Terms;
15+
import org.apache.lucene.index.TermsEnum;
16+
import org.apache.lucene.store.ByteArrayDataInput;
17+
import org.apache.lucene.store.DataInput;
18+
import org.apache.lucene.store.IndexInput;
19+
import org.apache.lucene.util.BytesRef;
20+
import org.apache.lucene.util.automaton.CompiledAutomaton;
21+
import org.apache.lucene.util.fst.ByteSequenceOutputs;
22+
import org.apache.lucene.util.fst.FST;
23+
import org.apache.lucene.util.fst.OffHeapFSTStore;
24+
25+
import java.io.IOException;
26+
27+
import static org.elasticsearch.index.codec.postings.terms.Lucene90BlockTreeTermsReader.VERSION_MSB_VLONG_OUTPUT;
28+
29+
/**
30+
* BlockTree's implementation of {@link Terms}.
31+
*/
32+
public final class FieldReader extends Terms {
33+
34+
// private final boolean DEBUG = BlockTreeTermsWriter.DEBUG;
35+
36+
final long numTerms;
37+
final FieldInfo fieldInfo;
38+
final long sumTotalTermFreq;
39+
final long sumDocFreq;
40+
final int docCount;
41+
final long rootBlockFP;
42+
final BytesRef rootCode;
43+
final BytesRef minTerm;
44+
final BytesRef maxTerm;
45+
final Lucene90BlockTreeTermsReader parent;
46+
47+
final FST<BytesRef> index;
48+
49+
// private boolean DEBUG;
50+
51+
FieldReader(
52+
Lucene90BlockTreeTermsReader parent,
53+
FieldInfo fieldInfo,
54+
long numTerms,
55+
BytesRef rootCode,
56+
long sumTotalTermFreq,
57+
long sumDocFreq,
58+
int docCount,
59+
long indexStartFP,
60+
IndexInput metaIn,
61+
IndexInput indexIn,
62+
BytesRef minTerm,
63+
BytesRef maxTerm
64+
) throws IOException {
65+
assert numTerms > 0;
66+
this.fieldInfo = fieldInfo;
67+
// DEBUG = BlockTreeTermsReader.DEBUG && fieldInfo.name.equals("id");
68+
this.parent = parent;
69+
this.numTerms = numTerms;
70+
this.sumTotalTermFreq = sumTotalTermFreq;
71+
this.sumDocFreq = sumDocFreq;
72+
this.docCount = docCount;
73+
this.minTerm = minTerm;
74+
this.maxTerm = maxTerm;
75+
// if (DEBUG) {
76+
// System.out.println("BTTR: seg=" + segment + " field=" + fieldInfo.name + " rootBlockCode="
77+
// + rootCode + " divisor=" + indexDivisor);
78+
// }
79+
rootBlockFP = readVLongOutput(new ByteArrayDataInput(rootCode.bytes, rootCode.offset, rootCode.length))
80+
>>> Lucene90BlockTreeTermsReader.OUTPUT_FLAGS_NUM_BITS;
81+
// Initialize FST always off-heap.
82+
var metadata = FST.readMetadata(metaIn, ByteSequenceOutputs.getSingleton());
83+
index = FST.fromFSTReader(metadata, new OffHeapFSTStore(indexIn, indexStartFP, metadata));
84+
/*
85+
if (false) {
86+
final String dotFileName = segment + "_" + fieldInfo.name + ".dot";
87+
Writer w = new OutputStreamWriter(new FileOutputStream(dotFileName));
88+
Util.toDot(index, w, false, false);
89+
System.out.println("FST INDEX: SAVED to " + dotFileName);
90+
w.close();
91+
}
92+
*/
93+
BytesRef emptyOutput = metadata.getEmptyOutput();
94+
if (rootCode.equals(emptyOutput) == false) {
95+
// TODO: this branch is never taken
96+
assert false;
97+
this.rootCode = rootCode;
98+
} else {
99+
this.rootCode = emptyOutput;
100+
}
101+
}
102+
103+
long readVLongOutput(DataInput in) throws IOException {
104+
if (parent.version >= VERSION_MSB_VLONG_OUTPUT) {
105+
return readMSBVLong(in);
106+
} else {
107+
return in.readVLong();
108+
}
109+
}
110+
111+
/**
112+
* Decodes a variable length byte[] in MSB order back to long, as written by {@link
113+
* Lucene90BlockTreeTermsWriter#writeMSBVLong}.
114+
*
115+
* <p>Package private for testing.
116+
*/
117+
static long readMSBVLong(DataInput in) throws IOException {
118+
long l = 0L;
119+
while (true) {
120+
byte b = in.readByte();
121+
l = (l << 7) | (b & 0x7FL);
122+
if ((b & 0x80) == 0) {
123+
break;
124+
}
125+
}
126+
return l;
127+
}
128+
129+
@Override
130+
public BytesRef getMin() throws IOException {
131+
if (minTerm == null) {
132+
// Older index that didn't store min/maxTerm
133+
return super.getMin();
134+
} else {
135+
return minTerm;
136+
}
137+
}
138+
139+
@Override
140+
public BytesRef getMax() throws IOException {
141+
if (maxTerm == null) {
142+
// Older index that didn't store min/maxTerm
143+
return super.getMax();
144+
} else {
145+
return maxTerm;
146+
}
147+
}
148+
149+
/** For debugging -- used by CheckIndex too */
150+
@Override
151+
public Stats getStats() throws IOException {
152+
return new SegmentTermsEnum(this).computeBlockStats();
153+
}
154+
155+
@Override
156+
public boolean hasFreqs() {
157+
return fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
158+
}
159+
160+
@Override
161+
public boolean hasOffsets() {
162+
return fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
163+
}
164+
165+
@Override
166+
public boolean hasPositions() {
167+
return fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
168+
}
169+
170+
@Override
171+
public boolean hasPayloads() {
172+
return fieldInfo.hasPayloads();
173+
}
174+
175+
@Override
176+
public TermsEnum iterator() throws IOException {
177+
return new SegmentTermsEnum(this);
178+
}
179+
180+
@Override
181+
public long size() {
182+
return numTerms;
183+
}
184+
185+
@Override
186+
public long getSumTotalTermFreq() {
187+
return sumTotalTermFreq;
188+
}
189+
190+
@Override
191+
public long getSumDocFreq() {
192+
return sumDocFreq;
193+
}
194+
195+
@Override
196+
public int getDocCount() {
197+
return docCount;
198+
}
199+
200+
@Override
201+
public TermsEnum intersect(CompiledAutomaton compiled, BytesRef startTerm) throws IOException {
202+
// if (DEBUG) System.out.println(" FieldReader.intersect startTerm=" +
203+
// ToStringUtils.bytesRefToString(startTerm));
204+
// System.out.println("intersect: " + compiled.type + " a=" + compiled.automaton);
205+
// TODO: we could push "it's a range" or "it's a prefix" down into IntersectTermsEnum?
206+
// can we optimize knowing that...?
207+
if (compiled.type != CompiledAutomaton.AUTOMATON_TYPE.NORMAL) {
208+
throw new IllegalArgumentException("please use CompiledAutomaton.getTermsEnum instead");
209+
}
210+
return new IntersectTermsEnum(
211+
this,
212+
compiled.getTransitionAccessor(),
213+
compiled.getByteRunnable(),
214+
compiled.commonSuffixRef,
215+
startTerm
216+
);
217+
}
218+
219+
@Override
220+
public String toString() {
221+
return "BlockTreeTerms(seg="
222+
+ parent.segment
223+
+ " terms="
224+
+ numTerms
225+
+ ",postings="
226+
+ sumDocFreq
227+
+ ",positions="
228+
+ sumTotalTermFreq
229+
+ ",docs="
230+
+ docCount
231+
+ ")";
232+
}
233+
234+
// CHANGES:
235+
236+
public BytesRef getMinTerm() {
237+
return minTerm;
238+
}
239+
240+
public BytesRef getMaxTerm() {
241+
return maxTerm;
242+
}
243+
244+
// END CHANGES:
245+
}

0 commit comments

Comments
 (0)