Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ public void write(XContentBuilder builder) throws IOException {
if (fieldProducer.isEmpty() == false) {
if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) {
if (metric.get() != null) {
builder.field(metric.name(), metric.get());
}
builder.field(metric.name(), metric.get());
}
} else if (fieldProducer instanceof LabelFieldProducer labelFieldProducer) {
LabelFieldProducer.Label label = labelFieldProducer.label();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DocCountFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
Expand Down Expand Up @@ -361,14 +362,30 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
docCountProvider.setLeafReaderContext(ctx);

// For each field, return a tuple with the downsample field producer and the field value leaf
final AbstractDownsampleFieldProducer[] fieldProducers = new AbstractDownsampleFieldProducer[fieldValueFetchers.size()];
final FormattedDocValues[] formattedDocValues = new FormattedDocValues[fieldValueFetchers.size()];
for (int i = 0; i < fieldProducers.length; i++) {
fieldProducers[i] = fieldValueFetchers.get(i).fieldProducer();
formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx);
final List<AbstractDownsampleFieldProducer> nonMetricProducers = new ArrayList<>();
final List<FormattedDocValues> formattedDocValues = new ArrayList<>();

final List<MetricFieldProducer> metricProducers = new ArrayList<>();
final List<SortedNumericDoubleValues> numericDocValues = new ArrayList<>();
for (var fieldValueFetcher : fieldValueFetchers) {
var fieldProducer = fieldValueFetcher.fieldProducer();
if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
metricProducers.add(metricFieldProducer);
numericDocValues.add(fieldValueFetcher.getNumericLeaf(ctx));
} else {
nonMetricProducers.add(fieldProducer);
formattedDocValues.add(fieldValueFetcher.getLeaf(ctx));
}
}

var leafBucketCollector = new LeafDownsampleCollector(aggCtx, docCountProvider, fieldProducers, formattedDocValues);
var leafBucketCollector = new LeafDownsampleCollector(
aggCtx,
docCountProvider,
nonMetricProducers.toArray(new AbstractDownsampleFieldProducer[0]),
formattedDocValues.toArray(new FormattedDocValues[0]),
metricProducers.toArray(new MetricFieldProducer[0]),
numericDocValues.toArray(new SortedNumericDoubleValues[0])
);
leafBucketCollectors.add(leafBucketCollector);
return leafBucketCollector;
}
Expand All @@ -386,7 +403,10 @@ class LeafDownsampleCollector extends LeafBucketCollector {
final AggregationExecutionContext aggCtx;
final DocCountProvider docCountProvider;
final FormattedDocValues[] formattedDocValues;
final AbstractDownsampleFieldProducer[] fieldProducers;
final AbstractDownsampleFieldProducer[] nonMetricProducers;

final MetricFieldProducer[] metricProducers;
final SortedNumericDoubleValues[] numericDocValues;

// Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
long firstTimeStampForBulkCollection;
Expand All @@ -396,13 +416,20 @@ class LeafDownsampleCollector extends LeafBucketCollector {
LeafDownsampleCollector(
AggregationExecutionContext aggCtx,
DocCountProvider docCountProvider,
AbstractDownsampleFieldProducer[] fieldProducers,
FormattedDocValues[] formattedDocValues
AbstractDownsampleFieldProducer[] nonMetricProducers,
FormattedDocValues[] formattedDocValues,
MetricFieldProducer[] metricProducers,
SortedNumericDoubleValues[] numericDocValues
) {
assert nonMetricProducers.length == formattedDocValues.length;
assert metricProducers.length == numericDocValues.length;

this.aggCtx = aggCtx;
this.docCountProvider = docCountProvider;
this.fieldProducers = fieldProducers;
this.nonMetricProducers = nonMetricProducers;
this.formattedDocValues = formattedDocValues;
this.metricProducers = metricProducers;
this.numericDocValues = numericDocValues;
}

@Override
Expand Down Expand Up @@ -488,11 +515,16 @@ void leafBulkCollection() throws IOException {

downsampleBucketBuilder.collectDocCount(docIdBuffer, docCountProvider);
// Iterate over all field values and collect the doc_values for this docId
for (int i = 0; i < fieldProducers.length; i++) {
AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i];
for (int i = 0; i < nonMetricProducers.length; i++) {
AbstractDownsampleFieldProducer fieldProducer = nonMetricProducers[i];
FormattedDocValues docValues = formattedDocValues[i];
fieldProducer.collect(docValues, docIdBuffer);
}
for (int i = 0; i < metricProducers.length; i++) {
MetricFieldProducer metricFieldProducer = metricProducers[i];
SortedNumericDoubleValues numericDoubleValues = numericDocValues[i];
metricFieldProducer.collect(numericDoubleValues, docIdBuffer);
}

docsProcessed += docIdBuffer.size();
task.setDocsProcessed(docsProcessed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.LeafNumericFieldData;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
Expand Down Expand Up @@ -50,6 +52,11 @@ public FormattedDocValues getLeaf(LeafReaderContext context) {
return fieldData.load(context).getFormattedValues(format);
}

public SortedNumericDoubleValues getNumericLeaf(LeafReaderContext context) {
LeafNumericFieldData numericFieldData = (LeafNumericFieldData) fieldData.load(context);
return numericFieldData.getDoubleValues();
}

public AbstractDownsampleFieldProducer fieldProducer() {
return fieldProducer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.internal.hppc.IntArrayList;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -46,7 +47,7 @@ public Metric[] metrics() {
}

/** Collect the value of a raw field and compute all downsampled metrics */
void collect(Number value) {
void collect(double value) {
for (MetricFieldProducer.Metric metric : metrics()) {
metric.collect(value);
}
Expand All @@ -55,14 +56,19 @@ void collect(Number value) {

@Override
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
assert false : "MetricFieldProducer does not support formatted doc values";
throw new UnsupportedOperationException();
}

public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
for (int i = 0; i < docIdBuffer.size(); i++) {
int docId = docIdBuffer.get(i);
if (docValues.advanceExact(docId) == false) {
continue;
}
int docValuesCount = docValues.docValueCount();
for (int j = 0; j < docValuesCount; j++) {
Number num = (Number) docValues.nextValue();
double num = docValues.nextValue();
collect(num);
}
}
Expand All @@ -83,9 +89,9 @@ public String name() {
return name;
}

abstract void collect(Number number);
abstract void collect(double number);

abstract Number get();
abstract double get();

abstract void reset();
}
Expand All @@ -94,51 +100,55 @@ public String name() {
* Metric implementation that computes the maximum of all values of a field
*/
static final class Max extends Metric {
private Double max;
private static final double NO_VALUE = -Double.MAX_VALUE;

private double max = NO_VALUE;

Max() {
super("max");
}

@Override
void collect(Number value) {
this.max = max != null ? Math.max(value.doubleValue(), max) : value.doubleValue();
void collect(double value) {
this.max = Math.max(value, max);
}

@Override
Number get() {
double get() {
return max;
}

@Override
void reset() {
max = null;
max = NO_VALUE;
}
}

/**
* Metric implementation that computes the minimum of all values of a field
*/
static final class Min extends Metric {
private Double min;
private static final double NO_VALUE = Double.MAX_VALUE;

private double min = NO_VALUE;

Min() {
super("min");
}

@Override
void collect(Number value) {
this.min = min != null ? Math.min(value.doubleValue(), min) : value.doubleValue();
void collect(double value) {
this.min = Math.min(value, min);
}

@Override
Number get() {
double get() {
return min;
}

@Override
void reset() {
min = null;
min = NO_VALUE;
}
}

Expand All @@ -157,12 +167,12 @@ static final class Sum extends Metric {
}

@Override
void collect(Number value) {
kahanSummation.add(value.doubleValue());
void collect(double value) {
kahanSummation.add(value);
}

@Override
Number get() {
double get() {
return kahanSummation.value();
}

Expand All @@ -183,12 +193,12 @@ static final class ValueCount extends Metric {
}

@Override
void collect(Number value) {
void collect(double value) {
count++;
}

@Override
Number get() {
double get() {
return count;
}

Expand All @@ -206,27 +216,29 @@ void reset() {
* ignoring everything else.
*/
static final class LastValue extends Metric {
private Number lastValue;
private static final double NO_VALUE = Double.MIN_VALUE;

private double lastValue = NO_VALUE;

LastValue() {
super("last_value");
}

@Override
void collect(Number value) {
if (lastValue == null) {
void collect(double value) {
if (lastValue == Double.MIN_VALUE) {
lastValue = value;
}
}

@Override
Number get() {
double get() {
return lastValue;
}

@Override
void reset() {
lastValue = null;
lastValue = NO_VALUE;
}
}

Expand All @@ -240,7 +252,7 @@ static final class CounterMetricFieldProducer extends MetricFieldProducer {
}

@Override
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
// Counter producers only collect the last_value. Since documents are
// collected by descending timestamp order, the producer should only
// process the first value for every tsid. So, it will only collect the
Expand Down Expand Up @@ -281,9 +293,7 @@ public void write(XContentBuilder builder) throws IOException {
if (isEmpty() == false) {
builder.startObject(name());
for (MetricFieldProducer.Metric metric : metrics()) {
if (metric.get() != null) {
builder.field(metric.name(), metric.get());
}
builder.field(metric.name(), metric.get());
}
builder.endObject();
}
Expand Down
Loading