From 00c6fd2ea60eb5878ef3372930d6812741305238 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 13 Mar 2025 12:59:46 +0100 Subject: [PATCH] Change downsample's MetricFieldProducers (#124701) Refactor MetricFieldProducer to use SortedNumericDoubleValues instead of FormattedDocValues, which saves unneeded conversations / casts. --- .../AggregateMetricFieldSerializer.java | 4 +- .../downsample/DownsampleShardIndexer.java | 56 ++++++++++++---- .../xpack/downsample/FieldValueFetcher.java | 7 ++ .../xpack/downsample/MetricFieldProducer.java | 66 +++++++++++-------- .../downsample/MetricFieldProducerTests.java | 40 +++++------ 5 files changed, 110 insertions(+), 63 deletions(-) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java index 57137ec429978..b3642de376c58 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java @@ -39,9 +39,7 @@ public void write(XContentBuilder builder) throws IOException { if (fieldProducer.isEmpty() == false) { if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) { for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) { - if (metric.get() != null) { - builder.field(metric.name(), metric.get()); - } + builder.field(metric.name(), metric.get()); } } else if (fieldProducer instanceof LabelFieldProducer labelFieldProducer) { LabelFieldProducer.Label label = labelFieldProducer.label(); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 629cce9a2f9af..b5ad69777bc80 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.fielddata.FormattedDocValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DocCountFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; @@ -361,14 +362,30 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag docCountProvider.setLeafReaderContext(ctx); // For each field, return a tuple with the downsample field producer and the field value leaf - final AbstractDownsampleFieldProducer[] fieldProducers = new AbstractDownsampleFieldProducer[fieldValueFetchers.size()]; - final FormattedDocValues[] formattedDocValues = new FormattedDocValues[fieldValueFetchers.size()]; - for (int i = 0; i < fieldProducers.length; i++) { - fieldProducers[i] = fieldValueFetchers.get(i).fieldProducer(); - formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx); + final List nonMetricProducers = new ArrayList<>(); + final List formattedDocValues = new ArrayList<>(); + + final List metricProducers = new ArrayList<>(); + final List numericDocValues = new ArrayList<>(); + for (var fieldValueFetcher : fieldValueFetchers) { + var fieldProducer = fieldValueFetcher.fieldProducer(); + if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) { + metricProducers.add(metricFieldProducer); + numericDocValues.add(fieldValueFetcher.getNumericLeaf(ctx)); + } else { + nonMetricProducers.add(fieldProducer); + formattedDocValues.add(fieldValueFetcher.getLeaf(ctx)); + } } - var leafBucketCollector = new LeafDownsampleCollector(aggCtx, docCountProvider, fieldProducers, formattedDocValues); + var leafBucketCollector = new LeafDownsampleCollector( + aggCtx, + docCountProvider, + nonMetricProducers.toArray(new AbstractDownsampleFieldProducer[0]), + formattedDocValues.toArray(new FormattedDocValues[0]), + metricProducers.toArray(new MetricFieldProducer[0]), + numericDocValues.toArray(new SortedNumericDoubleValues[0]) + ); leafBucketCollectors.add(leafBucketCollector); return leafBucketCollector; } @@ -386,7 +403,10 @@ class LeafDownsampleCollector extends LeafBucketCollector { final AggregationExecutionContext aggCtx; final DocCountProvider docCountProvider; final FormattedDocValues[] formattedDocValues; - final AbstractDownsampleFieldProducer[] fieldProducers; + final AbstractDownsampleFieldProducer[] nonMetricProducers; + + final MetricFieldProducer[] metricProducers; + final SortedNumericDoubleValues[] numericDocValues; // Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first. long firstTimeStampForBulkCollection; @@ -396,13 +416,20 @@ class LeafDownsampleCollector extends LeafBucketCollector { LeafDownsampleCollector( AggregationExecutionContext aggCtx, DocCountProvider docCountProvider, - AbstractDownsampleFieldProducer[] fieldProducers, - FormattedDocValues[] formattedDocValues + AbstractDownsampleFieldProducer[] nonMetricProducers, + FormattedDocValues[] formattedDocValues, + MetricFieldProducer[] metricProducers, + SortedNumericDoubleValues[] numericDocValues ) { + assert nonMetricProducers.length == formattedDocValues.length; + assert metricProducers.length == numericDocValues.length; + this.aggCtx = aggCtx; this.docCountProvider = docCountProvider; - this.fieldProducers = fieldProducers; + this.nonMetricProducers = nonMetricProducers; this.formattedDocValues = formattedDocValues; + this.metricProducers = metricProducers; + this.numericDocValues = numericDocValues; } @Override @@ -488,11 +515,16 @@ void leafBulkCollection() throws IOException { downsampleBucketBuilder.collectDocCount(docIdBuffer, docCountProvider); // Iterate over all field values and collect the doc_values for this docId - for (int i = 0; i < fieldProducers.length; i++) { - AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i]; + for (int i = 0; i < nonMetricProducers.length; i++) { + AbstractDownsampleFieldProducer fieldProducer = nonMetricProducers[i]; FormattedDocValues docValues = formattedDocValues[i]; fieldProducer.collect(docValues, docIdBuffer); } + for (int i = 0; i < metricProducers.length; i++) { + MetricFieldProducer metricFieldProducer = metricProducers[i]; + SortedNumericDoubleValues numericDoubleValues = numericDocValues[i]; + metricFieldProducer.collect(numericDoubleValues, docIdBuffer); + } docsProcessed += docIdBuffer.size(); task.setDocsProcessed(docsProcessed); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 811d36ec1075a..8974fd6dc8ad2 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -10,6 +10,8 @@ import org.apache.lucene.index.LeafReaderContext; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.LeafNumericFieldData; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper; @@ -50,6 +52,11 @@ public FormattedDocValues getLeaf(LeafReaderContext context) { return fieldData.load(context).getFormattedValues(format); } + public SortedNumericDoubleValues getNumericLeaf(LeafReaderContext context) { + LeafNumericFieldData numericFieldData = (LeafNumericFieldData) fieldData.load(context); + return numericFieldData.getDoubleValues(); + } + public AbstractDownsampleFieldProducer fieldProducer() { return fieldProducer; } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index 0c6fc6a60e981..10e778b4b3eee 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -9,6 +9,7 @@ import org.apache.lucene.internal.hppc.IntArrayList; import org.elasticsearch.index.fielddata.FormattedDocValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.aggregations.metrics.CompensatedSum; import org.elasticsearch.xcontent.XContentBuilder; @@ -46,7 +47,7 @@ public Metric[] metrics() { } /** Collect the value of a raw field and compute all downsampled metrics */ - void collect(Number value) { + void collect(double value) { for (MetricFieldProducer.Metric metric : metrics()) { metric.collect(value); } @@ -55,6 +56,11 @@ void collect(Number value) { @Override public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { + assert false : "MetricFieldProducer does not support formatted doc values"; + throw new UnsupportedOperationException(); + } + + public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException { for (int i = 0; i < docIdBuffer.size(); i++) { int docId = docIdBuffer.get(i); if (docValues.advanceExact(docId) == false) { @@ -62,7 +68,7 @@ public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) thro } int docValuesCount = docValues.docValueCount(); for (int j = 0; j < docValuesCount; j++) { - Number num = (Number) docValues.nextValue(); + double num = docValues.nextValue(); collect(num); } } @@ -83,9 +89,9 @@ public String name() { return name; } - abstract void collect(Number number); + abstract void collect(double number); - abstract Number get(); + abstract double get(); abstract void reset(); } @@ -94,25 +100,27 @@ public String name() { * Metric implementation that computes the maximum of all values of a field */ static final class Max extends Metric { - private Double max; + private static final double NO_VALUE = -Double.MAX_VALUE; + + private double max = NO_VALUE; Max() { super("max"); } @Override - void collect(Number value) { - this.max = max != null ? Math.max(value.doubleValue(), max) : value.doubleValue(); + void collect(double value) { + this.max = Math.max(value, max); } @Override - Number get() { + double get() { return max; } @Override void reset() { - max = null; + max = NO_VALUE; } } @@ -120,25 +128,27 @@ void reset() { * Metric implementation that computes the minimum of all values of a field */ static final class Min extends Metric { - private Double min; + private static final double NO_VALUE = Double.MAX_VALUE; + + private double min = NO_VALUE; Min() { super("min"); } @Override - void collect(Number value) { - this.min = min != null ? Math.min(value.doubleValue(), min) : value.doubleValue(); + void collect(double value) { + this.min = Math.min(value, min); } @Override - Number get() { + double get() { return min; } @Override void reset() { - min = null; + min = NO_VALUE; } } @@ -157,12 +167,12 @@ static final class Sum extends Metric { } @Override - void collect(Number value) { - kahanSummation.add(value.doubleValue()); + void collect(double value) { + kahanSummation.add(value); } @Override - Number get() { + double get() { return kahanSummation.value(); } @@ -183,12 +193,12 @@ static final class ValueCount extends Metric { } @Override - void collect(Number value) { + void collect(double value) { count++; } @Override - Number get() { + double get() { return count; } @@ -206,27 +216,29 @@ void reset() { * ignoring everything else. */ static final class LastValue extends Metric { - private Number lastValue; + private static final double NO_VALUE = Double.MIN_VALUE; + + private double lastValue = NO_VALUE; LastValue() { super("last_value"); } @Override - void collect(Number value) { - if (lastValue == null) { + void collect(double value) { + if (lastValue == Double.MIN_VALUE) { lastValue = value; } } @Override - Number get() { + double get() { return lastValue; } @Override void reset() { - lastValue = null; + lastValue = NO_VALUE; } } @@ -240,7 +252,7 @@ static final class CounterMetricFieldProducer extends MetricFieldProducer { } @Override - public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { + public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException { // Counter producers only collect the last_value. Since documents are // collected by descending timestamp order, the producer should only // process the first value for every tsid. So, it will only collect the @@ -281,9 +293,7 @@ public void write(XContentBuilder builder) throws IOException { if (isEmpty() == false) { builder.startObject(name()); for (MetricFieldProducer.Metric metric : metrics()) { - if (metric.get() != null) { - builder.field(metric.name(), metric.get()); - } + builder.field(metric.name(), metric.get()); } builder.endObject(); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java index 34ccbc5c7202d..f197f7560703a 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java @@ -18,36 +18,36 @@ public class MetricFieldProducerTests extends AggregatorTestCase { public void testMinCountMetric() { MetricFieldProducer.Metric metric = new MetricFieldProducer.Min(); - assertNull(metric.get()); + assertEquals(Double.MAX_VALUE, metric.get(), 0); metric.collect(40); metric.collect(5.5); metric.collect(12.2); metric.collect(55); - assertEquals(5.5, metric.get()); + assertEquals(5.5, metric.get(), 0); metric.reset(); - assertNull(metric.get()); + assertEquals(Double.MAX_VALUE, metric.get(), 0); } public void testMaxCountMetric() { MetricFieldProducer.Metric metric = new MetricFieldProducer.Max(); - assertNull(metric.get()); + assertEquals(-Double.MAX_VALUE, metric.get(), 0); metric.collect(5.5); metric.collect(12.2); metric.collect(55); - assertEquals(55d, metric.get()); + assertEquals(55d, metric.get(), 0); metric.reset(); - assertNull(metric.get()); + assertEquals(-Double.MAX_VALUE, metric.get(), 0); } public void testSumCountMetric() { MetricFieldProducer.Metric metric = new MetricFieldProducer.Sum(); - assertEquals(0d, metric.get()); + assertEquals(0d, metric.get(), 0); metric.collect(5.5); metric.collect(12.2); metric.collect(55); - assertEquals(72.7, metric.get()); + assertEquals(72.7, metric.get(), 0); metric.reset(); - assertEquals(0d, metric.get()); + assertEquals(0d, metric.get(), 0); } /** @@ -61,7 +61,7 @@ public void testSummationAccuracy() { for (int i = 0; i < values.length; i++) { metric.collect(values[i]); } - assertEquals(15.3, metric.get().doubleValue(), Double.MIN_NORMAL); + assertEquals(15.3, metric.get(), Double.MIN_NORMAL); // Summing up an array which contains NaN and infinities and expect a result same as naive summation metric.reset(); @@ -74,7 +74,7 @@ public void testSummationAccuracy() { sum += d; metric.collect(d); } - assertEquals(sum, metric.get().doubleValue(), 1e-10); + assertEquals(sum, metric.get(), 1e-10); // Summing up some big double values and expect infinity result metric.reset(); @@ -82,35 +82,35 @@ public void testSummationAccuracy() { for (int i = 0; i < n; i++) { metric.collect(Double.MAX_VALUE); } - assertEquals(Double.POSITIVE_INFINITY, metric.get().doubleValue(), 0d); + assertEquals(Double.POSITIVE_INFINITY, metric.get(), 0d); metric.reset(); for (int i = 0; i < n; i++) { metric.collect(-Double.MAX_VALUE); } - assertEquals(Double.NEGATIVE_INFINITY, metric.get().doubleValue(), 0d); + assertEquals(Double.NEGATIVE_INFINITY, metric.get(), 0d); } public void testValueCountMetric() { MetricFieldProducer.Metric metric = new MetricFieldProducer.ValueCount(); - assertEquals(0L, metric.get()); + assertEquals(0L, metric.get(), 0d); metric.collect(40); metric.collect(30); metric.collect(20); - assertEquals(3L, metric.get()); + assertEquals(3L, metric.get(), 0d); metric.reset(); - assertEquals(0L, metric.get()); + assertEquals(0L, metric.get(), 0d); } public void testLastValueMetric() { MetricFieldProducer.Metric metric = new MetricFieldProducer.LastValue(); - assertNull(metric.get()); + assertEquals(Double.MIN_VALUE, metric.get(), 0); metric.collect(40); metric.collect(30); metric.collect(20); - assertEquals(40, metric.get()); + assertEquals(40, metric.get(), 0); metric.reset(); - assertNull(metric.get()); + assertEquals(Double.MIN_VALUE, metric.get(), 0); } public void testCounterMetricFieldProducer() throws IOException { @@ -145,7 +145,7 @@ public void testGaugeMetricFieldProducer() throws IOException { XContentBuilder builder = JsonXContent.contentBuilder().startObject(); producer.write(builder); builder.endObject(); - assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3}}", Strings.toString(builder)); + assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3.0}}", Strings.toString(builder)); assertEquals(field, producer.name()); }