Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124477.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124477
summary: Improve downsample performance by buffering docids and do bulk processing
area: Downsampling
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.downsample;

import org.apache.lucene.internal.hppc.IntArrayList;
import org.elasticsearch.index.fielddata.FormattedDocValues;

import java.io.IOException;
Expand Down Expand Up @@ -43,5 +44,5 @@ public boolean isEmpty() {
return isEmpty;
}

public abstract void collect(FormattedDocValues docValues, int docId) throws IOException;
public abstract void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.downsample;

import org.apache.lucene.internal.hppc.IntArrayList;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -55,13 +56,16 @@ void collectOnce(final Object value) {
* This is an expensive check, that slows down downsampling significantly.
* Given that index is sorted by tsid as primary key, this shouldn't really happen.
*/
boolean validate(FormattedDocValues docValues, int docId) throws IOException {
if (docValues.advanceExact(docId)) {
int docValueCount = docValues.docValueCount();
for (int i = 0; i < docValueCount; i++) {
var value = docValues.nextValue();
if (value.equals(this.value) == false) {
assert false : "Dimension value changed without tsid change [" + value + "] != [" + this.value + "]";
boolean validate(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
for (int i = 0; i < buffer.size(); i++) {
int docId = buffer.get(i);
if (docValues.advanceExact(docId)) {
int docValueCount = docValues.docValueCount();
for (int j = 0; j < docValueCount; j++) {
var value = docValues.nextValue();
if (value.equals(this.value) == false) {
assert false : "Dimension value changed without tsid change [" + value + "] != [" + this.value + "]";
}
}
}
}
Expand All @@ -81,19 +85,25 @@ public boolean isEmpty() {
}

@Override
public void collect(FormattedDocValues docValues, int docId) throws IOException {
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
if (dimension.isEmpty == false) {
assert dimension.validate(docValues, docId);
assert dimension.validate(docValues, docIdBuffer);
return;
}

if (docValues.advanceExact(docId) == false) {
for (int i = 0; i < docIdBuffer.size(); i++) {
int docId = docIdBuffer.get(i);
if (docValues.advanceExact(docId) == false) {
continue;
}
int docValueCount = docValues.docValueCount();
for (int j = 0; j < docValueCount; j++) {
this.dimension.collectOnce(docValues.nextValue());
}
// Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the
// same.
return;
}
int docValueCount = docValues.docValueCount();
for (int i = 0; i < docValueCount; i++) {
this.dimension.collectOnce(docValues.nextValue());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.internal.hppc.IntArrayList;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -80,6 +81,7 @@
class DownsampleShardIndexer {

private static final Logger logger = LogManager.getLogger(DownsampleShardIndexer.class);
private static final int DOCID_BUFFER_SIZE = 8096;
public static final int DOWNSAMPLE_BULK_ACTIONS = 10000;
public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = ByteSizeValue.of(1, ByteSizeUnit.MB);
public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = ByteSizeValue.of(50, ByteSizeUnit.MB);
Expand Down Expand Up @@ -338,6 +340,7 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
private class TimeSeriesBucketCollector extends BucketCollector {
private final BulkProcessor2 bulkProcessor;
private final DownsampleBucketBuilder downsampleBucketBuilder;
private final List<LeafDownsampleCollector> leafBucketCollectors = new ArrayList<>();
private long docsProcessed;
private long bucketsCreated;
long lastTimestamp = Long.MAX_VALUE;
Expand Down Expand Up @@ -365,83 +368,138 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx);
}

return new LeafBucketCollector() {
@Override
public void collect(int docId, long owningBucketOrd) throws IOException {
task.addNumReceived(1);
final BytesRef tsidHash = aggCtx.getTsidHash();
assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found.";
final int tsidHashOrd = aggCtx.getTsidHashOrd();
final long timestamp = timestampField.resolution().roundDownToMillis(aggCtx.getTimestamp());

boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder.tsidOrd();
if (tsidChanged || timestamp < lastHistoTimestamp) {
lastHistoTimestamp = Math.max(
rounding.round(timestamp),
searchExecutionContext.getIndexSettings().getTimestampBounds().startTime()
);
}
task.setLastSourceTimestamp(timestamp);
task.setLastTargetTimestamp(lastHistoTimestamp);

if (logger.isTraceEnabled()) {
logger.trace(
"Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]",
docId,
DocValueFormat.TIME_SERIES_ID.format(tsidHash),
timestampFormat.format(timestamp),
timestampFormat.format(lastHistoTimestamp)
);
}
var leafBucketCollector = new LeafDownsampleCollector(aggCtx, docCountProvider, fieldProducers, formattedDocValues);
leafBucketCollectors.add(leafBucketCollector);
return leafBucketCollector;
}

void bulkCollection() throws IOException {
// The leaf bucket collectors with newer timestamp go first, to correctly capture the last value for counters and labels.
leafBucketCollectors.sort((o1, o2) -> -Long.compare(o1.firstTimeStampForBulkCollection, o2.firstTimeStampForBulkCollection));
for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors) {
leafBucketCollector.leafBulkCollection();
}
}

/*
* Sanity checks to ensure that we receive documents in the correct order
* - _tsid must be sorted in ascending order
* - @timestamp must be sorted in descending order within the same _tsid
*/
BytesRef lastTsid = downsampleBucketBuilder.tsid();
assert lastTsid == null || lastTsid.compareTo(tsidHash) <= 0
: "_tsid is not sorted in ascending order: ["
+ DocValueFormat.TIME_SERIES_ID.format(lastTsid)
+ "] -> ["
+ DocValueFormat.TIME_SERIES_ID.format(tsidHash)
+ "]";
assert tsidHash.equals(lastTsid) == false || lastTimestamp >= timestamp
: "@timestamp is not sorted in descending order: ["
+ timestampFormat.format(lastTimestamp)
+ "] -> ["
+ timestampFormat.format(timestamp)
+ "]";
lastTimestamp = timestamp;

if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) {
// Flush downsample doc if not empty
if (downsampleBucketBuilder.isEmpty() == false) {
XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument();
indexBucket(doc);
}

// Create new downsample bucket
if (tsidChanged) {
downsampleBucketBuilder.resetTsid(tsidHash, tsidHashOrd, lastHistoTimestamp);
} else {
downsampleBucketBuilder.resetTimestamp(lastHistoTimestamp);
}
bucketsCreated++;
class LeafDownsampleCollector extends LeafBucketCollector {

final AggregationExecutionContext aggCtx;
final DocCountProvider docCountProvider;
final FormattedDocValues[] formattedDocValues;
final AbstractDownsampleFieldProducer[] fieldProducers;

// Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
long firstTimeStampForBulkCollection;
final IntArrayList docIdBuffer = new IntArrayList(DOCID_BUFFER_SIZE);
final long timestampBoundStartTime = searchExecutionContext.getIndexSettings().getTimestampBounds().startTime();

LeafDownsampleCollector(
AggregationExecutionContext aggCtx,
DocCountProvider docCountProvider,
AbstractDownsampleFieldProducer[] fieldProducers,
FormattedDocValues[] formattedDocValues
) {
this.aggCtx = aggCtx;
this.docCountProvider = docCountProvider;
this.fieldProducers = fieldProducers;
this.formattedDocValues = formattedDocValues;
}

@Override
public void collect(int docId, long owningBucketOrd) throws IOException {
task.addNumReceived(1);
final BytesRef tsidHash = aggCtx.getTsidHash();
assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found.";
final int tsidHashOrd = aggCtx.getTsidHashOrd();
final long timestamp = timestampField.resolution().roundDownToMillis(aggCtx.getTimestamp());

boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder.tsidOrd();
if (tsidChanged || timestamp < lastHistoTimestamp) {
lastHistoTimestamp = Math.max(rounding.round(timestamp), timestampBoundStartTime);
}
task.setLastSourceTimestamp(timestamp);
task.setLastTargetTimestamp(lastHistoTimestamp);

if (logger.isTraceEnabled()) {
logger.trace(
"Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]",
docId,
DocValueFormat.TIME_SERIES_ID.format(tsidHash),
timestampFormat.format(timestamp),
timestampFormat.format(lastHistoTimestamp)
);
}

/*
* Sanity checks to ensure that we receive documents in the correct order
* - _tsid must be sorted in ascending order
* - @timestamp must be sorted in descending order within the same _tsid
*/
BytesRef lastTsid = downsampleBucketBuilder.tsid();
assert lastTsid == null || lastTsid.compareTo(tsidHash) <= 0
: "_tsid is not sorted in ascending order: ["
+ DocValueFormat.TIME_SERIES_ID.format(lastTsid)
+ "] -> ["
+ DocValueFormat.TIME_SERIES_ID.format(tsidHash)
+ "]";
assert tsidHash.equals(lastTsid) == false || lastTimestamp >= timestamp
: "@timestamp is not sorted in descending order: ["
+ timestampFormat.format(lastTimestamp)
+ "] -> ["
+ timestampFormat.format(timestamp)
+ "]";
lastTimestamp = timestamp;

if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) {
bulkCollection();
// Flush downsample doc if not empty
if (downsampleBucketBuilder.isEmpty() == false) {
XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument();
indexBucket(doc);
}

final int docCount = docCountProvider.getDocCount(docId);
downsampleBucketBuilder.collectDocCount(docCount);
// Iterate over all field values and collect the doc_values for this docId
for (int i = 0; i < fieldProducers.length; i++) {
AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i];
FormattedDocValues docValues = formattedDocValues[i];
fieldProducer.collect(docValues, docId);
// Create new downsample bucket
if (tsidChanged) {
downsampleBucketBuilder.resetTsid(tsidHash, tsidHashOrd, lastHistoTimestamp);
} else {
downsampleBucketBuilder.resetTimestamp(lastHistoTimestamp);
}
docsProcessed++;
task.setDocsProcessed(docsProcessed);
bucketsCreated++;
}
};

if (docIdBuffer.isEmpty()) {
firstTimeStampForBulkCollection = aggCtx.getTimestamp();
}
// buffer.add() always delegates to system.arraycopy() and checks buffer size for resizing purposes:
docIdBuffer.buffer[docIdBuffer.elementsCount++] = docId;
if (docIdBuffer.size() == DOCID_BUFFER_SIZE) {
bulkCollection();
}
}

void leafBulkCollection() throws IOException {
if (docIdBuffer.isEmpty()) {
return;
}

if (logger.isDebugEnabled()) {
logger.debug("buffered {} docids", docIdBuffer.size());
}

downsampleBucketBuilder.collectDocCount(docIdBuffer, docCountProvider);
// Iterate over all field values and collect the doc_values for this docId
for (int i = 0; i < fieldProducers.length; i++) {
AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i];
FormattedDocValues docValues = formattedDocValues[i];
fieldProducer.collect(docValues, docIdBuffer);
}

docsProcessed += docIdBuffer.size();
task.setDocsProcessed(docsProcessed);

// buffer.clean() also overwrites all slots with zeros
docIdBuffer.elementsCount = 0;
}
}

private void indexBucket(XContentBuilder doc) {
Expand All @@ -464,6 +522,7 @@ public void preCollection() {
@Override
public void postCollection() throws IOException {
// Flush downsample doc if not empty
bulkCollection();
if (downsampleBucketBuilder.isEmpty() == false) {
XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument();
indexBucket(doc);
Expand Down Expand Up @@ -545,8 +604,15 @@ public void resetTimestamp(long timestamp) {
}
}

public void collectDocCount(int docCount) {
this.docCount += docCount;
public void collectDocCount(IntArrayList buffer, DocCountProvider docCountProvider) throws IOException {
if (docCountProvider.alwaysOne()) {
this.docCount += buffer.size();
} else {
for (int i = 0; i < buffer.size(); i++) {
int docId = buffer.get(i);
this.docCount += docCountProvider.getDocCount(docId);
}
}
}

public XContentBuilder buildDownsampleDocument() throws IOException {
Expand Down
Loading