Skip to content

Commit b972364

Browse files
Optimize ES819 doc values address offset calculation (#126732)
When writing the doc values addresses, we currently perform an iteration over all the sorted numeric doc values to calculate the addresses. When merging sorted segments, this iteration is expensive as it requires performing a merge sort. This patch removes this iteration by instead calculating the addresses while we are writing the values, writing the addresses to a temporary file. Afterwards, they are copied from the temporary file into the merged segment. Relates to #126111
1 parent 7b89f4d commit b972364

File tree

3 files changed

+146
-32
lines changed

3 files changed

+146
-32
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,11 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
121121
writeSkipIndex(field, producer);
122122
}
123123

124-
writeField(field, producer, -1);
124+
writeField(field, producer, -1, null);
125125
}
126126

127-
private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd) throws IOException {
127+
private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd, OffsetsAccumulator offsetsAccumulator)
128+
throws IOException {
128129
int numDocsWithValue = 0;
129130
long numValues = 0;
130131

@@ -174,6 +175,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
174175
disiAccumulator.addDocId(doc);
175176
}
176177
final int count = values.docValueCount();
178+
if (offsetsAccumulator != null) {
179+
offsetsAccumulator.addDoc(count);
180+
}
177181
for (int i = 0; i < count; ++i) {
178182
buffer[bufferSize++] = values.nextValue();
179183
if (bufferSize == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
@@ -386,7 +390,7 @@ public long cost() {
386390
}
387391
SortedDocValues sorted = valuesProducer.getSorted(field);
388392
int maxOrd = sorted.getValueCount();
389-
writeField(field, producer, maxOrd);
393+
writeField(field, producer, maxOrd, null);
390394
addTermsDict(DocValues.singleton(valuesProducer.getSorted(field)));
391395
}
392396

@@ -545,31 +549,46 @@ private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valu
545549
if (maxOrd > -1) {
546550
meta.writeByte((byte) 1); // multiValued (1 = multiValued)
547551
}
548-
long[] stats = writeField(field, valuesProducer, maxOrd);
549-
int numDocsWithField = Math.toIntExact(stats[0]);
550-
long numValues = stats[1];
551-
assert numValues >= numDocsWithField;
552552

553-
if (numValues > numDocsWithField) {
554-
long start = data.getFilePointer();
555-
meta.writeLong(start);
556-
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
553+
if (valuesProducer.mergeStats.supported()) {
554+
int numDocsWithField = valuesProducer.mergeStats.sumNumDocsWithField();
555+
long numValues = valuesProducer.mergeStats.sumNumValues();
556+
if (numDocsWithField == numValues) {
557+
writeField(field, valuesProducer, maxOrd, null);
558+
} else {
559+
assert numValues > numDocsWithField;
560+
try (var accumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField)) {
561+
writeField(field, valuesProducer, maxOrd, accumulator);
562+
accumulator.build(meta, data);
563+
}
564+
}
565+
} else {
566+
long[] stats = writeField(field, valuesProducer, maxOrd, null);
567+
int numDocsWithField = Math.toIntExact(stats[0]);
568+
long numValues = stats[1];
569+
assert numValues >= numDocsWithField;
557570

558-
final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance(
559-
meta,
560-
data,
561-
numDocsWithField + 1L,
562-
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
563-
);
564-
long addr = 0;
565-
addressesWriter.add(addr);
566-
SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
567-
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
568-
addr += values.docValueCount();
571+
if (numValues > numDocsWithField) {
572+
long start = data.getFilePointer();
573+
meta.writeLong(start);
574+
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
575+
576+
final DirectMonotonicWriter addressesWriter = DirectMonotonicWriter.getInstance(
577+
meta,
578+
data,
579+
numDocsWithField + 1L,
580+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
581+
);
582+
long addr = 0;
569583
addressesWriter.add(addr);
584+
SortedNumericDocValues values = valuesProducer.getSortedNumeric(field);
585+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
586+
addr += values.docValueCount();
587+
addressesWriter.add(addr);
588+
}
589+
addressesWriter.finish();
590+
meta.writeLong(data.getFilePointer() - start);
570591
}
571-
addressesWriter.finish();
572-
meta.writeLong(data.getFilePointer() - start);
573592
}
574593
}
575594

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.apache.lucene.store.ByteBuffersDataOutput;
13+
import org.apache.lucene.store.ByteBuffersIndexOutput;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.IOContext;
16+
import org.apache.lucene.store.IndexOutput;
17+
import org.apache.lucene.util.IOUtils;
18+
import org.apache.lucene.util.packed.DirectMonotonicWriter;
19+
import org.elasticsearch.core.SuppressForbidden;
20+
21+
import java.io.Closeable;
22+
import java.io.IOException;
23+
24+
/**
25+
* Builds the doc values address offset table iteratively, one document at a time. Useful to avoid a separate docvalues iteration
26+
* to build the address offset table.
27+
*/
28+
final class OffsetsAccumulator implements Closeable {
29+
private final Directory dir;
30+
private final IOContext context;
31+
32+
private final ByteBuffersDataOutput addressMetaBuffer;
33+
private final ByteBuffersIndexOutput addressMetaOutput;
34+
private final IndexOutput addressDataOutput;
35+
private final DirectMonotonicWriter addressesWriter;
36+
37+
private final String addressOffsetsTempFileName;
38+
39+
private long addr = 0;
40+
41+
OffsetsAccumulator(Directory dir, IOContext context, IndexOutput data, long numDocsWithField) throws IOException {
42+
this.dir = dir;
43+
this.context = context;
44+
45+
addressMetaBuffer = new ByteBuffersDataOutput();
46+
addressMetaOutput = new ByteBuffersIndexOutput(addressMetaBuffer, "meta-temp", "meta-temp");
47+
addressDataOutput = dir.createTempOutput(data.getName(), "address-data", context);
48+
addressOffsetsTempFileName = addressDataOutput.getName();
49+
addressesWriter = DirectMonotonicWriter.getInstance(
50+
addressMetaOutput,
51+
addressDataOutput,
52+
numDocsWithField + 1L,
53+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
54+
);
55+
}
56+
57+
public void addDoc(int docValueCount) throws IOException {
58+
addressesWriter.add(addr);
59+
addr += docValueCount;
60+
}
61+
62+
public void build(IndexOutput meta, IndexOutput data) throws IOException {
63+
addressesWriter.add(addr);
64+
addressesWriter.finish();
65+
long start = data.getFilePointer();
66+
meta.writeLong(start);
67+
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
68+
addressMetaBuffer.copyTo(meta);
69+
addressDataOutput.close();
70+
try (var addressDataInput = dir.openInput(addressOffsetsTempFileName, context)) {
71+
data.copyBytes(addressDataInput, addressDataInput.length());
72+
meta.writeLong(data.getFilePointer() - start);
73+
}
74+
}
75+
76+
@Override
77+
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")
78+
public void close() throws IOException {
79+
IOUtils.close(addressMetaOutput, addressDataOutput);
80+
if (addressOffsetsTempFileName != null) {
81+
IOUtils.deleteFilesIgnoringExceptions(dir, addressOffsetsTempFileName);
82+
}
83+
}
84+
}

server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public void testForceMergeDenseCase() throws Exception {
6464

6565
int numDocs = 256 + random().nextInt(1024);
6666
int numHosts = numDocs / 20;
67+
6768
for (int i = 0; i < numDocs; i++) {
6869
var d = new Document();
6970

@@ -77,7 +78,12 @@ public void testForceMergeDenseCase() throws Exception {
7778
d.add(new NumericDocValuesField("counter_1", counter1++));
7879
d.add(new SortedNumericDocValuesField("counter_2", counter2++));
7980
d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
80-
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
81+
82+
int numGauge2 = 1 + random().nextInt(8);
83+
for (int j = 0; j < numGauge2; j++) {
84+
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[(i + j) % gauge2Values.length]));
85+
}
86+
8187
int numTags = 1 + random().nextInt(8);
8288
for (int j = 0; j < numTags; j++) {
8389
d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
@@ -144,9 +150,10 @@ public void testForceMergeDenseCase() throws Exception {
144150
assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0);
145151

146152
assertEquals(i, gaugeTwoDV.nextDoc());
147-
assertEquals(1, gaugeTwoDV.docValueCount());
148-
long gaugeTwoValue = gaugeTwoDV.nextValue();
149-
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
153+
for (int j = 0; j < gaugeTwoDV.docValueCount(); j++) {
154+
long gaugeTwoValue = gaugeTwoDV.nextValue();
155+
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
156+
}
150157

151158
assertEquals(i, tagsDV.nextDoc());
152159
for (int j = 0; j < tagsDV.docValueCount(); j++) {
@@ -277,7 +284,10 @@ public void testForceMergeSparseCase() throws Exception {
277284
d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
278285
}
279286
if (random().nextBoolean()) {
280-
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
287+
int numGauge2 = 1 + random().nextInt(8);
288+
for (int j = 0; j < numGauge2; j++) {
289+
d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[(i + j) % gauge2Values.length]));
290+
}
281291
}
282292
if (random().nextBoolean()) {
283293
int numTags = 1 + random().nextInt(8);
@@ -356,9 +366,10 @@ public void testForceMergeSparseCase() throws Exception {
356366
}
357367

358368
if (gaugeTwoDV.advanceExact(i)) {
359-
assertEquals(1, gaugeTwoDV.docValueCount());
360-
long gaugeTwoValue = gaugeTwoDV.nextValue();
361-
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
369+
for (int j = 0; j < gaugeTwoDV.docValueCount(); j++) {
370+
long gaugeTwoValue = gaugeTwoDV.nextValue();
371+
assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0);
372+
}
362373
}
363374

364375
if (tagsDV.advanceExact(i)) {

0 commit comments

Comments
 (0)