Skip to content

Commit f130e98

Browse files
authored
Add queryDSL histogram aggregation for exponential_histogram fields (#136888)
1 parent 70ba2af commit f130e98

File tree

12 files changed

+435
-10
lines changed

12 files changed

+435
-10
lines changed

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramMapperPlugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.elasticsearch.plugins.SearchPlugin;
1414
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
1515
import org.elasticsearch.xpack.analytics.mapper.ExponentialHistogramParser;
16-
import org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics.ExponentialHistogramAggregatorsRegistrar;
16+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.ExponentialHistogramAggregatorsRegistrar;
1717

1818
import java.util.Collections;
1919
import java.util.LinkedHashMap;
@@ -40,7 +40,8 @@ public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
4040
return List.of(
4141
ExponentialHistogramAggregatorsRegistrar::registerValueCountAggregator,
4242
ExponentialHistogramAggregatorsRegistrar::registerSumAggregator,
43-
ExponentialHistogramAggregatorsRegistrar::registerAvgAggregator
43+
ExponentialHistogramAggregatorsRegistrar::registerAvgAggregator,
44+
ExponentialHistogramAggregatorsRegistrar::registerHistogramAggregator
4445
);
4546
}
4647
return Collections.emptyList();
Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
* 2.0; you may not use this file except in compliance with the Elastic License
55
* 2.0.
66
*/
7-
package org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics;
7+
package org.elasticsearch.xpack.exponentialhistogram.aggregations;
88

9+
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
910
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
1011
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
1112
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
1213
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
14+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.bucket.histogram.ExponentialHistogramBackedHistogramAggregator;
15+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics.ExponentialHistogramAvgAggregator;
16+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics.ExponentialHistogramSumAggregator;
17+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics.ExponentialHistogramValueCountAggregator;
1318
import org.elasticsearch.xpack.exponentialhistogram.aggregations.support.ExponentialHistogramValuesSourceType;
1419

1520
/**
@@ -44,4 +49,12 @@ public static void registerAvgAggregator(ValuesSourceRegistry.Builder builder) {
4449
);
4550
}
4651

52+
public static void registerHistogramAggregator(ValuesSourceRegistry.Builder builder) {
53+
builder.register(
54+
HistogramAggregationBuilder.REGISTRY_KEY,
55+
ExponentialHistogramValuesSourceType.EXPONENTIAL_HISTOGRAM,
56+
ExponentialHistogramBackedHistogramAggregator::new,
57+
true
58+
);
59+
}
4760
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.exponentialhistogram.aggregations.bucket.histogram;
9+
10+
import org.elasticsearch.exponentialhistogram.BucketIterator;
11+
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
12+
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
13+
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
14+
import org.elasticsearch.search.aggregations.Aggregator;
15+
import org.elasticsearch.search.aggregations.AggregatorFactories;
16+
import org.elasticsearch.search.aggregations.BucketOrder;
17+
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
18+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
19+
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
20+
import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregator;
21+
import org.elasticsearch.search.aggregations.bucket.histogram.DoubleBounds;
22+
import org.elasticsearch.search.aggregations.support.AggregationContext;
23+
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
24+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.support.ExponentialHistogramValuesSource;
25+
import org.elasticsearch.xpack.exponentialhistogram.fielddata.ExponentialHistogramValuesReader;
26+
27+
import java.io.IOException;
28+
import java.util.Map;
29+
30+
public final class ExponentialHistogramBackedHistogramAggregator extends AbstractHistogramAggregator {
31+
32+
private final ExponentialHistogramValuesSource.ExponentialHistogram valuesSource;
33+
34+
public ExponentialHistogramBackedHistogramAggregator(
35+
String name,
36+
AggregatorFactories factories,
37+
double interval,
38+
double offset,
39+
BucketOrder order,
40+
boolean keyed,
41+
long minDocCount,
42+
DoubleBounds extendedBounds,
43+
DoubleBounds hardBounds,
44+
ValuesSourceConfig valuesSourceConfig,
45+
AggregationContext context,
46+
Aggregator parent,
47+
CardinalityUpperBound cardinalityUpperBound,
48+
Map<String, Object> metadata
49+
) throws IOException {
50+
super(
51+
name,
52+
factories,
53+
interval,
54+
offset,
55+
order,
56+
keyed,
57+
minDocCount,
58+
extendedBounds,
59+
hardBounds,
60+
valuesSourceConfig.format(),
61+
context,
62+
parent,
63+
cardinalityUpperBound,
64+
metadata
65+
);
66+
67+
this.valuesSource = (ExponentialHistogramValuesSource.ExponentialHistogram) valuesSourceConfig.getValuesSource();
68+
69+
// Sub aggregations are not allowed when running histogram agg over histograms
70+
if (subAggregators().length > 0) {
71+
throw new IllegalArgumentException("Histogram aggregation on histogram fields does not support sub-aggregations");
72+
}
73+
}
74+
75+
@Override
76+
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
77+
ExponentialHistogramValuesReader values = valuesSource.getHistogramValues(aggCtx.getLeafReaderContext());
78+
return new LeafBucketCollectorBase(sub, values) {
79+
@Override
80+
public void collect(int doc, long owningBucketOrd) throws IOException {
81+
if (values.advanceExact(doc)) {
82+
ExponentialHistogram histo = values.histogramValue();
83+
forEachBucketCenter(histo, (center, count) -> {
84+
double key = Math.floor((center - offset) / interval);
85+
if (hardBounds == null || hardBounds.contain(key * interval)) {
86+
long bucketOrd = bucketOrds.add(owningBucketOrd, Double.doubleToLongBits(key));
87+
if (bucketOrd < 0) { // already seen
88+
bucketOrd = -1 - bucketOrd;
89+
collectExistingBucket(sub, doc, bucketOrd);
90+
} else {
91+
collectBucket(sub, doc, bucketOrd);
92+
}
93+
// We have added the document already and we have incremented bucket doc_count
94+
// by _doc_count times. To compensate for this, we should increment doc_count by
95+
// (count - _doc_count) so that we have added it count times.
96+
incrementBucketDocCount(bucketOrd, count - docCountProvider.getDocCount(doc));
97+
}
98+
});
99+
}
100+
}
101+
};
102+
}
103+
104+
@FunctionalInterface
105+
private interface BucketCenterConsumer {
106+
void accept(double bucketCenter, long count) throws IOException;
107+
}
108+
109+
private static void forEachBucketCenter(ExponentialHistogram histo, BucketCenterConsumer consumer) throws IOException {
110+
BucketIterator negIt = histo.negativeBuckets().iterator();
111+
while (negIt.hasNext()) {
112+
double center = -ExponentialScaleUtils.getPointOfLeastRelativeError(negIt.peekIndex(), negIt.scale());
113+
center = Math.clamp(center, histo.min(), histo.max());
114+
consumer.accept(center, negIt.peekCount());
115+
negIt.advance();
116+
}
117+
if (histo.zeroBucket().count() > 0) {
118+
consumer.accept(0.0, histo.zeroBucket().count());
119+
}
120+
BucketIterator posIt = histo.positiveBuckets().iterator();
121+
while (posIt.hasNext()) {
122+
double center = ExponentialScaleUtils.getPointOfLeastRelativeError(posIt.peekIndex(), posIt.scale());
123+
center = Math.clamp(center, histo.min(), histo.max());
124+
consumer.accept(center, posIt.peekCount());
125+
posIt.advance();
126+
}
127+
}
128+
129+
}

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/aggregations/metrics/ExponentialHistogramAvgAggregator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.io.IOException;
2929
import java.util.Map;
3030

31-
final class ExponentialHistogramAvgAggregator extends NumericMetricsAggregator.SingleValue {
31+
public final class ExponentialHistogramAvgAggregator extends NumericMetricsAggregator.SingleValue {
3232

3333
private final ExponentialHistogramValuesSource.ExponentialHistogram valuesSource;
3434
private final DocValueFormat format;
@@ -37,7 +37,7 @@ final class ExponentialHistogramAvgAggregator extends NumericMetricsAggregator.S
3737
private DoubleArray sums;
3838
private DoubleArray compensations;
3939

40-
ExponentialHistogramAvgAggregator(
40+
public ExponentialHistogramAvgAggregator(
4141
String name,
4242
ValuesSourceConfig valuesSourceConfig,
4343
AggregationContext context,

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/aggregations/metrics/ExponentialHistogramSumAggregator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@
2727
import java.io.IOException;
2828
import java.util.Map;
2929

30-
final class ExponentialHistogramSumAggregator extends NumericMetricsAggregator.SingleValue {
30+
public final class ExponentialHistogramSumAggregator extends NumericMetricsAggregator.SingleValue {
3131

3232
private final ExponentialHistogramValuesSource.ExponentialHistogram valuesSource;
3333
private final DocValueFormat format;
3434

3535
private DoubleArray sums;
3636
private DoubleArray compensations;
3737

38-
ExponentialHistogramSumAggregator(
38+
public ExponentialHistogramSumAggregator(
3939
String name,
4040
ValuesSourceConfig valuesSourceConfig,
4141
AggregationContext context,

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/aggregations/metrics/ExponentialHistogramValueCountAggregator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
import java.io.IOException;
2525
import java.util.Map;
2626

27-
class ExponentialHistogramValueCountAggregator extends NumericMetricsAggregator.SingleValue {
27+
public final class ExponentialHistogramValueCountAggregator extends NumericMetricsAggregator.SingleValue {
2828

2929
private final ExponentialHistogramValuesSource.ExponentialHistogram valuesSource;
3030

3131
// a count per bucket
3232
private LongArray counts;
3333

34-
ExponentialHistogramValueCountAggregator(
34+
public ExponentialHistogramValueCountAggregator(
3535
String name,
3636
ValuesSourceConfig valuesSourceConfig,
3737
AggregationContext aggregationContext,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics;
8+
package org.elasticsearch.xpack.exponentialhistogram.aggregations;
99

1010
import org.apache.lucene.index.IndexableField;
1111
import org.apache.lucene.tests.index.RandomIndexWriter;

0 commit comments

Comments
 (0)