Skip to content

Commit 0d37677

Browse files
Optimize ES819 doc values address offset calculation (#126732) (#127079)
When writing the doc values addresses, we currently perform an iteration over all the sorted numeric doc values to calculate the addresses. When merging sorted segments, this iteration is expensive as it requires performing a merge sort. This patch removes this iteration by instead calculating the addresses while we are writing the values, writing the addresses to a temporary file. Afterwards, they are copied from the temporary file into the merged segment. Relates to #126111
1 parent a33775d commit 0d37677

File tree

3 files changed

+146
-32
lines changed

3 files changed

+146
-32
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,11 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
110110
}
111111
};
112112

113-
writeField(field, producer, -1);
113+
writeField(field, producer, -1, null);
114114
}
115115

116-
private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd) throws IOException {
116+
private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd, OffsetsAccumulator offsetsAccumulator)
117+
throws IOException {
117118
int numDocsWithValue = 0;
118119
long numValues = 0;
119120

@@ -163,6 +164,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
163164
disiAccumulator.addDocId(doc);
164165
}
165166
final int count = values.docValueCount();
167+
if (offsetsAccumulator != null) {
168+
offsetsAccumulator.addDoc(count);
169+
}
166170
for (int i = 0; i < count; ++i) {
167171
buffer[bufferSize++] = values.nextValue();
168172
if (bufferSize == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
@@ -372,7 +376,7 @@ public long cost() {
372376
}
373377
SortedDocValues sorted = valuesProducer.getSorted(field);
374378
int maxOrd = sorted.getValueCount();
375-
writeField(field, producer, maxOrd);
379+
writeField(field, producer, maxOrd, null);
376380
addTermsDict(DocValues.singleton(valuesProducer.getSorted(field)));
377381
}
378382

@@ -528,31 +532,46 @@ private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valu
528532
if (maxOrd > -1) {
529533
meta.writeByte((byte) 1); // multiValued (1 = multiValued)
530534
}
531-
long[] stats = writeField(field, valuesProducer, maxOrd);
532-
int numDocsWithField = Math.toIntExact(stats[0]);
533-
long numValues = stats[1];
534-
assert numValues >= numDocsWithField;
535535

536-
if (numValues > numDocsWithField) {
537-
long start = data.getFilePointer();
538-
meta.writeLong(start);
539-
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
536+
if (valuesProducer.mergeStats.supported()) {
537+
int numDocsWithField = valuesProducer.mergeStats.sumNumDocsWithField();
538+
long numValues = valuesProducer.mergeStats.sumNumValues();
539+
if (numDocsWithField == numValues) {
540+
writeField(field, valuesProducer, maxOrd, null);
541+
} else {
542+
assert numValues > numDocsWithField;
543+
try (var accumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField)) {
544+
writeField(field, valuesProducer, maxOrd, accumulator);
545+
accumulator.build(meta, data);
546+
}
547+
}
548+
} else {
549+
long[] stats = writeField(field, valuesProducer, maxOrd, null);
550+
int numDocsWithField = Math.toIntExact(stats[0]);
551+
long numValues = stats[1];
552+
assert numValues >= numDocsWithField;
540553

541-
final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance(
542-
meta,
543-
data,
544-
numDocsWithField + 1L,
545-
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
546-
);
547-
long addr = 0;
548-
addressesWriter.add(addr);
549-
SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
550-
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
551-
addr += values.docValueCount();
554+
if (numValues > numDocsWithField) {
555+
long start = data.getFilePointer();
556+
meta.writeLong(start);
557+
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
558+
559+
final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance(
560+
meta,
561+
data,
562+
numDocsWithField + 1L,
563+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
564+
);
565+
long addr = 0;
552566
addressesWriter.add(addr);
567+
SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
568+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
569+
addr += values.docValueCount();
570+
addressesWriter.add(addr);
571+
}
572+
addressesWriter.finish();
573+
meta.writeLong(data.getFilePointer() - start);
553574
}
554-
addressesWriter.finish();
555-
meta.writeLong(data.getFilePointer() - start);
556575
}
557576
}
558577

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.apache.lucene.store.ByteBuffersDataOutput;
13+
import org.apache.lucene.store.ByteBuffersIndexOutput;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.IOContext;
16+
import org.apache.lucene.store.IndexOutput;
17+
import org.apache.lucene.util.IOUtils;
18+
import org.apache.lucene.util.packed.DirectMonotonicWriter;
19+
import org.elasticsearch.core.SuppressForbidden;
20+
21+
import java.io.Closeable;
22+
import java.io.IOException;
23+
24+
/**
25+
* Builds the doc values address offset table iteratively, one document at a time. Useful to avoid a separate docvalues iteration
26+
* to build the address offset table.
27+
*/
28+
final class OffsetsAccumulator implements Closeable {
29+
private final Directory dir;
30+
private final IOContext context;
31+
32+
private final ByteBuffersDataOutput addressMetaBuffer;
33+
private final ByteBuffersIndexOutput addressMetaOutput;
34+
private final IndexOutput addressDataOutput;
35+
private final DirectMonotonicWriter addressesWriter;
36+
37+
private final String addressOffsetsTempFileName;
38+
39+
private long addr = 0;
40+
41+
OffsetsAccumulator(Directory dir, IOContext context, IndexOutput data, long numDocsWithField) throws IOException {
42+
this.dir = dir;
43+
this.context = context;
44+
45+
addressMetaBuffer = new ByteBuffersDataOutput();
46+
addressMetaOutput = new ByteBuffersIndexOutput(addressMetaBuffer, "meta-temp", "meta-temp");
47+
addressDataOutput = dir.createTempOutput(data.getName(), "address-data", context);
48+
addressOffsetsTempFileName = addressDataOutput.getName();
49+
addressesWriter = DirectMonotonicWriter.getInstance(
50+
addressMetaOutput,
51+
addressDataOutput,
52+
numDocsWithField + 1L,
53+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
54+
);
55+
}
56+
57+
public void addDoc(int docValueCount) throws IOException {
58+
addressesWriter.add(addr);
59+
addr += docValueCount;
60+
}
61+
62+
public void build(IndexOutput meta, IndexOutput data) throws IOException {
63+
addressesWriter.add(addr);
64+
addressesWriter.finish();
65+
long start = data.getFilePointer();
66+
meta.writeLong(start);
67+
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
68+
addressMetaBuffer.copyTo(meta);
69+
addressDataOutput.close();
70+
try (var addressDataInput = dir.openInput(addressOffsetsTempFileName, context)) {
71+
data.copyBytes(addressDataInput, addressDataInput.length());
72+
meta.writeLong(data.getFilePointer() - start);
73+
}
74+
}
75+
76+
@Override
77+
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")
78+
public void close() throws IOException {
79+
IOUtils.close(addressMetaOutput, addressDataOutput);
80+
if (addressOffsetsTempFileName != null) {
81+
IOUtils.deleteFilesIgnoringExceptions(dir, addressOffsetsTempFileName);
82+
}
83+
}
84+
}

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public void testForceMergeDenseCase() throws Exception {
6464

6565
int numDocs = 256 + random().nextInt(1024);
6666
int numHosts = numDocs / 20;
67+
6768
for (int i = 0; i < numDocs; i++) {
6869
var d = new Document();
6970

@@ -77,7 +78,12 @@ public void testForceMergeDenseCase() throws Exception {
7778
d.add(new NumericDocValuesField("counter_1", counter1++));
7879
d.add(new SortedNumericDocValuesField("counter_2", counter2++));
7980
d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
80-
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
81+
82+
int numGauge2 = 1 + random().nextInt(8);
83+
for (int j = 0; j < numGauge2; j++) {
84+
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[(i + j) % gauge2Values.length]));
85+
}
86+
8187
int numTags = 1 + random().nextInt(8);
8288
for (int j = 0; j < numTags; j++) {
8389
d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
@@ -144,9 +150,10 @@ public void testForceMergeDenseCase() throws Exception {
144150
assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0);
145151

146152
assertEquals(i, gaugeTwoDV.nextDoc());
147-
assertEquals(1, gaugeTwoDV.docValueCount());
148-
long gaugeTwoValue = gaugeTwoDV.nextValue();
149-
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
153+
for (int j = 0; j < gaugeTwoDV.docValueCount(); j++) {
154+
long gaugeTwoValue = gaugeTwoDV.nextValue();
155+
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
156+
}
150157

151158
assertEquals(i, tagsDV.nextDoc());
152159
for (int j = 0; j < tagsDV.docValueCount(); j++) {
@@ -277,7 +284,10 @@ public void testForceMergeSparseCase() throws Exception {
277284
d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
278285
}
279286
if (random().nextBoolean()) {
280-
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
287+
int numGauge2 = 1 + random().nextInt(8);
288+
for (int j = 0; j < numGauge2; j++) {
289+
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[(i + j) % gauge2Values.length]));
290+
}
281291
}
282292
if (random().nextBoolean()) {
283293
int numTags = 1 + random().nextInt(8);
@@ -356,9 +366,10 @@ public void testForceMergeSparseCase() throws Exception {
356366
}
357367

358368
if (gaugeTwoDV.advanceExact(i)) {
359-
assertEquals(1, gaugeTwoDV.docValueCount());
360-
long gaugeTwoValue = gaugeTwoDV.nextValue();
361-
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
369+
for (int j = 0; j < gaugeTwoDV.docValueCount(); j++) {
370+
long gaugeTwoValue = gaugeTwoDV.nextValue();
371+
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
372+
}
362373
}
363374

364375
if (tagsDV.advanceExact(i)) {

0 commit comments

Comments
 (0)