Skip to content

Commit 559941a

Browse files
committed
Improve downsample performance by buffering docids and bulk processing.
1 parent def4c89 commit 559941a

File tree

7 files changed

+101
-47
lines changed

7 files changed

+101
-47
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected boolean lessThan(LeafWalker a, LeafWalker b) {
143143
walker.collectCurrent();
144144
if (walker.nextDoc() == DocIdSetIterator.NO_MORE_DOCS || walker.shouldPop()) {
145145
queue.pop();
146+
walker.collector.finish();
146147
} else {
147148
queue.updateTop();
148149
}
@@ -168,6 +169,7 @@ private boolean populateQueue(List<LeafWalker> leafWalkers, PriorityQueue<LeafWa
168169
// If a walker is exhausted then we can remove it from consideration
169170
// entirely
170171
it.remove();
172+
leafWalker.collector.finish();
171173
continue;
172174
}
173175
BytesRef tsid = leafWalker.getTsid();

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractDownsampleFieldProducer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.downsample;
99

10+
import org.apache.lucene.internal.hppc.IntArrayList;
1011
import org.elasticsearch.index.fielddata.FormattedDocValues;
1112

1213
import java.io.IOException;
@@ -43,5 +44,5 @@ public boolean isEmpty() {
4344
return isEmpty;
4445
}
4546

46-
public abstract void collect(FormattedDocValues docValues, int docId) throws IOException;
47+
public abstract void collect(FormattedDocValues docValues, IntArrayList buffer) throws IOException;
4748
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.downsample;
99

10+
import org.apache.lucene.internal.hppc.IntArrayList;
1011
import org.elasticsearch.index.fielddata.FormattedDocValues;
1112
import org.elasticsearch.xcontent.XContentBuilder;
1213

@@ -81,19 +82,24 @@ public boolean isEmpty() {
8182
}
8283

8384
@Override
84-
public void collect(FormattedDocValues docValues, int docId) throws IOException {
85+
public void collect(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
8586
if (dimension.isEmpty == false) {
86-
assert dimension.validate(docValues, docId);
8787
return;
8888
}
8989

90-
if (docValues.advanceExact(docId) == false) {
90+
for (int i = 0; i < buffer.size(); i++) {
91+
int docId = buffer.get(i);
92+
if (docValues.advanceExact(docId) == false) {
93+
continue;
94+
}
95+
int docValueCount = docValues.docValueCount();
96+
for (int j = 0; j < docValueCount; j++) {
97+
this.dimension.collectOnce(docValues.nextValue());
98+
}
99+
// Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the
100+
// same.
91101
return;
92102
}
93-
int docValueCount = docValues.docValueCount();
94-
for (int i = 0; i < docValueCount; i++) {
95-
this.dimension.collectOnce(docValues.nextValue());
96-
}
97103
}
98104

99105
@Override

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.Logger;
1111
import org.apache.lucene.document.SortedSetDocValuesField;
1212
import org.apache.lucene.index.LeafReaderContext;
13+
import org.apache.lucene.internal.hppc.IntArrayList;
1314
import org.apache.lucene.search.MatchAllDocsQuery;
1415
import org.apache.lucene.search.MatchNoDocsQuery;
1516
import org.apache.lucene.search.Query;
@@ -80,6 +81,7 @@
8081
class DownsampleShardIndexer {
8182

8283
private static final Logger logger = LogManager.getLogger(DownsampleShardIndexer.class);
84+
private static final int DOCID_BUFFER_SIZE = 8069;
8385
public static final int DOWNSAMPLE_BULK_ACTIONS = 10000;
8486
public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = ByteSizeValue.of(1, ByteSizeUnit.MB);
8587
public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = ByteSizeValue.of(50, ByteSizeUnit.MB);
@@ -365,7 +367,11 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
365367
formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx);
366368
}
367369

370+
long timestampBoundStartTime = searchExecutionContext.getIndexSettings().getTimestampBounds().startTime();
368371
return new LeafBucketCollector() {
372+
373+
final IntArrayList buffer = new IntArrayList(DOCID_BUFFER_SIZE);
374+
369375
@Override
370376
public void collect(int docId, long owningBucketOrd) throws IOException {
371377
task.addNumReceived(1);
@@ -376,10 +382,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
376382

377383
boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder.tsidOrd();
378384
if (tsidChanged || timestamp < lastHistoTimestamp) {
379-
lastHistoTimestamp = Math.max(
380-
rounding.round(timestamp),
381-
searchExecutionContext.getIndexSettings().getTimestampBounds().startTime()
382-
);
385+
lastHistoTimestamp = Math.max(rounding.round(timestamp), timestampBoundStartTime);
383386
}
384387
task.setLastSourceTimestamp(timestamp);
385388
task.setLastTargetTimestamp(lastHistoTimestamp);
@@ -415,6 +418,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
415418
lastTimestamp = timestamp;
416419

417420
if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) {
421+
bulkCollection();
418422
// Flush downsample doc if not empty
419423
if (downsampleBucketBuilder.isEmpty() == false) {
420424
XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument();
@@ -429,17 +433,39 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
429433
}
430434
bucketsCreated++;
431435
}
436+
if (buffer.size() == DOCID_BUFFER_SIZE) {
437+
bulkCollection();
438+
}
439+
440+
buffer.buffer[buffer.elementsCount++] = docId;
441+
}
442+
443+
@Override
444+
public void finish() throws IOException {
445+
bulkCollection();
446+
}
432447

433-
final int docCount = docCountProvider.getDocCount(docId);
434-
downsampleBucketBuilder.collectDocCount(docCount);
448+
void bulkCollection() throws IOException {
449+
if (buffer.isEmpty()) {
450+
return;
451+
}
452+
453+
if (logger.isDebugEnabled()) {
454+
logger.debug("buffered {} docids", buffer.size());
455+
}
456+
457+
downsampleBucketBuilder.collectDocCount(buffer, docCountProvider);
435458
// Iterate over all field values and collect the doc_values for this docId
436-
for (int i = 0; i < fieldProducers.length; i++) {
437-
AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i];
438-
FormattedDocValues docValues = formattedDocValues[i];
439-
fieldProducer.collect(docValues, docId);
459+
for (int j = 0; j < fieldProducers.length; j++) {
460+
AbstractDownsampleFieldProducer fieldProducer = fieldProducers[j];
461+
FormattedDocValues docValues = formattedDocValues[j];
462+
fieldProducer.collect(docValues, buffer);
440463
}
441-
docsProcessed++;
464+
465+
docsProcessed += buffer.size();
442466
task.setDocsProcessed(docsProcessed);
467+
468+
buffer.elementsCount = 0;
443469
}
444470
};
445471
}
@@ -545,8 +571,15 @@ public void resetTimestamp(long timestamp) {
545571
}
546572
}
547573

548-
public void collectDocCount(int docCount) {
549-
this.docCount += docCount;
574+
public void collectDocCount(IntArrayList buffer, DocCountProvider docCountProvider) throws IOException {
575+
if (docCountProvider.alwaysOne()) {
576+
this.docCount += buffer.size();
577+
} else {
578+
for (int i = 0; i < buffer.size(); i++) {
579+
int docId = buffer.get(i);
580+
this.docCount += docCountProvider.getDocCount(docId);
581+
}
582+
}
550583
}
551584

552585
public XContentBuilder buildDownsampleDocument() throws IOException {

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.downsample;
99

10+
import org.apache.lucene.internal.hppc.IntArrayList;
1011
import org.apache.lucene.util.BytesRef;
1112
import org.elasticsearch.index.fielddata.FormattedDocValues;
1213
import org.elasticsearch.index.fielddata.HistogramValue;
@@ -114,25 +115,31 @@ public void write(XContentBuilder builder) throws IOException {
114115
}
115116

116117
@Override
117-
public void collect(FormattedDocValues docValues, int docId) throws IOException {
118+
public void collect(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
118119
if (isEmpty() == false) {
119120
return;
120121
}
121-
if (docValues.advanceExact(docId) == false) {
122-
return;
123-
}
124122

125-
int docValuesCount = docValues.docValueCount();
126-
assert docValuesCount > 0;
127-
isEmpty = false;
128-
if (docValuesCount == 1) {
129-
label.collect(docValues.nextValue());
130-
} else {
131-
Object[] values = new Object[docValuesCount];
132-
for (int i = 0; i < docValuesCount; i++) {
133-
values[i] = docValues.nextValue();
123+
for (int i = 0; i < buffer.size(); i++) {
124+
int docId = buffer.get(i);
125+
if (docValues.advanceExact(docId) == false) {
126+
continue;
127+
}
128+
int docValuesCount = docValues.docValueCount();
129+
assert docValuesCount > 0;
130+
isEmpty = false;
131+
if (docValuesCount == 1) {
132+
label.collect(docValues.nextValue());
133+
} else {
134+
Object[] values = new Object[docValuesCount];
135+
for (int j = 0; j < docValuesCount; j++) {
136+
values[j] = docValues.nextValue();
137+
}
138+
label.collect(values);
134139
}
135-
label.collect(values);
140+
// Only need to record one label value from one document, within in the same tsid-and-time-interval we only keep the first
141+
// with downsampling.
142+
return;
136143
}
137144
}
138145

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.downsample;
99

10+
import org.apache.lucene.internal.hppc.IntArrayList;
1011
import org.elasticsearch.index.fielddata.FormattedDocValues;
1112
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
1213
import org.elasticsearch.xcontent.XContentBuilder;
@@ -53,14 +54,17 @@ void collect(Number value) {
5354
}
5455

5556
@Override
56-
public void collect(FormattedDocValues docValues, int docId) throws IOException {
57-
if (docValues.advanceExact(docId) == false) {
58-
return;
59-
}
60-
int docValuesCount = docValues.docValueCount();
61-
for (int i = 0; i < docValuesCount; i++) {
62-
Number num = (Number) docValues.nextValue();
63-
collect(num);
57+
public void collect(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
58+
for (int i = 0; i < buffer.size(); i++) {
59+
int docId = buffer.get(i);
60+
if (docValues.advanceExact(docId) == false) {
61+
continue;
62+
}
63+
int docValuesCount = docValues.docValueCount();
64+
for (int j = 0; j < docValuesCount; j++) {
65+
Number num = (Number) docValues.nextValue();
66+
collect(num);
67+
}
6468
}
6569
}
6670

@@ -236,13 +240,13 @@ static final class CounterMetricFieldProducer extends MetricFieldProducer {
236240
}
237241

238242
@Override
239-
public void collect(FormattedDocValues docValues, int docId) throws IOException {
243+
public void collect(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
240244
// Counter producers only collect the last_value. Since documents are
241245
// collected by descending timestamp order, the producer should only
242246
// process the first value for every tsid. So, it will only collect the
243247
// field if no value has been set before.
244248
if (isEmpty()) {
245-
super.collect(docValues, docId);
249+
super.collect(docValues, buffer);
246250
}
247251
}
248252

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.downsample;
99

10+
import org.apache.lucene.internal.hppc.IntArrayList;
1011
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.index.fielddata.FormattedDocValues;
1213
import org.elasticsearch.search.aggregations.AggregatorTestCase;
@@ -93,7 +94,7 @@ public Object nextValue() {
9394
return "aaaa";
9495
}
9596
};
96-
producer.collect(docValues, 1);
97+
producer.collect(docValues, IntArrayList.from(1));
9798
// producer.collect("dummy", "aaaa");
9899
assertFalse(producer.isEmpty());
99100
assertEquals("aaaa", producer.label().get());
@@ -129,7 +130,7 @@ public Object nextValue() {
129130
}
130131
};
131132

132-
producer.collect(docValues, 1);
133+
producer.collect(docValues, IntArrayList.from(1));
133134
assertFalse(producer.isEmpty());
134135
assertEquals("a\0value_a", (((Object[]) producer.label().get())[0]).toString());
135136
assertEquals("b\0value_b", (((Object[]) producer.label().get())[1]).toString());

0 commit comments

Comments
 (0)