Skip to content

Commit 1426264

Browse files
committed
merging consumer, do full re-sample
1 parent b4710ab commit 1426264

File tree

2 files changed

+48
-14
lines changed

2 files changed

+48
-14
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,20 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
285285
final int minLength = tsdbValuesProducer.mergeStats.minLength();
286286
final int maxLength = tsdbValuesProducer.mergeStats.maxLength();
287287

288+
BinaryDocValues values = valuesProducer.getBinary(field);
289+
var sampler = new ReservoirSampler();
290+
291+
// Iteration 1: minLength, maxLength, numDocs, sample
292+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
293+
BytesRef v = values.binaryValue();
294+
sampler.processLine(v.bytes, v.offset, v.length);
295+
}
296+
297+
// Build encoder from sample
298+
FSST.SymbolTable symbolTable = FSST.SymbolTable.buildSymbolTable(sampler.getSample());
299+
288300
assert numDocsWithField <= maxDoc;
289301

290-
BinaryDocValues values = valuesProducer.getBinary(field);
291302
long start = data.getFilePointer();
292303
meta.writeLong(start); // dataOffset
293304

@@ -297,22 +308,24 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
297308
if (numDocsWithField > 0 && numDocsWithField < maxDoc) {
298309
disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
299310
}
300-
301311
assert maxLength >= minLength;
302-
if (maxLength > minLength) {
303-
offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField);
304-
}
312+
offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField);
313+
CompressedOffsetWriter offsetWriter = new CompressedOffsetWriter(offsetsAccumulator);
305314

306-
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
307-
BytesRef v = values.binaryValue();
308-
data.writeBytes(v.bytes, v.offset, v.length);
309-
if (disiAccumulator != null) {
310-
disiAccumulator.addDocId(doc);
311-
}
312-
if (offsetsAccumulator != null) {
313-
offsetsAccumulator.addDoc(v.length);
315+
values = valuesProducer.getBinary(field);
316+
try (var bulkCompressor = new BulkCompressBufferer(data, symbolTable, offsetWriter)) {
317+
// Iteration 2: compress lines
318+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
319+
BytesRef v = values.binaryValue();
320+
bulkCompressor.addLine(v.bytes, v.offset, v.length);
321+
if (disiAccumulator != null) {
322+
disiAccumulator.addDocId(doc);
323+
}
314324
}
315325
}
326+
327+
// Write metadata
328+
assert numDocsWithField <= maxDoc;
316329
meta.writeLong(data.getFilePointer() - start); // dataLength
317330

318331
if (numDocsWithField == 0) {
@@ -337,8 +350,23 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
337350
meta.writeInt(numDocsWithField);
338351
meta.writeInt(minLength);
339352
meta.writeInt(maxLength);
340-
if (offsetsAccumulator != null) {
353+
354+
int minCompressedLength = offsetWriter.minCompressedLength;
355+
int maxCompressedLength = offsetWriter.maxCompressedLength;
356+
357+
// add compression fields
358+
meta.writeInt(minCompressedLength);
359+
meta.writeInt(maxCompressedLength);
360+
byte[] compressedSymbolTable = symbolTable.exportToBytes();
361+
meta.writeBytes(compressedSymbolTable, compressedSymbolTable.length);
362+
363+
if (maxCompressedLength > minCompressedLength) {
364+
start = data.getFilePointer();
365+
meta.writeLong(start);
366+
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
367+
// copy
341368
offsetsAccumulator.build(meta, data);
369+
meta.writeLong(data.getFilePointer() - start);
342370
}
343371
} finally {
344372
IOUtils.close(disiAccumulator, offsetsAccumulator);

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.lucene.util.compress.LZ4;
4242
import org.apache.lucene.util.packed.DirectMonotonicReader;
4343
import org.apache.lucene.util.packed.PackedInts;
44+
import org.elasticsearch.common.compress.fsst.FSST;
4445
import org.elasticsearch.core.IOUtils;
4546
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
4647

@@ -1458,6 +1459,11 @@ static class BinaryEntry {
14581459
long addressesOffset;
14591460
long addressesLength;
14601461
DirectMonotonicReader.Meta addressesMeta;
1462+
1463+
// FSST
1464+
int minCompressedLength;
1465+
int maxCompressedLength;
1466+
FSST.Decoder decoder;
14611467
}
14621468

14631469
static class SortedNumericEntry extends NumericEntry {

0 commit comments

Comments
 (0)