Skip to content

Commit 08c6a72

Browse files
JonasKunzelasticsearchmachine
authored andcommitted
Add support for value_count for exponential_histogram field (elastic#136163)
--------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 17f4353 commit 08c6a72

File tree

10 files changed

+511
-39
lines changed

10 files changed

+511
-39
lines changed

x-pack/plugin/mapper-exponential-histogram/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ base {
1818
archivesName = 'x-pack-exponential-histogram'
1919
}
2020

21+
restResources {
22+
restApi {
23+
include 'bulk', 'search'
24+
}
25+
}
2126
dependencies {
2227
api project(":libs:exponential-histogram")
2328
compileOnly project(path: xpackModule('core'))

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapper.java

Lines changed: 107 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
package org.elasticsearch.xpack.exponentialhistogram;
99

1010
import org.apache.lucene.document.BinaryDocValuesField;
11-
import org.apache.lucene.document.Field;
1211
import org.apache.lucene.document.NumericDocValuesField;
1312
import org.apache.lucene.index.BinaryDocValues;
13+
import org.apache.lucene.index.IndexableField;
1414
import org.apache.lucene.index.LeafReader;
1515
import org.apache.lucene.index.LeafReaderContext;
1616
import org.apache.lucene.index.NumericDocValues;
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.index.mapper.FieldMapper;
3737
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
3838
import org.elasticsearch.index.mapper.IndexType;
39+
import org.elasticsearch.index.mapper.LuceneDocument;
3940
import org.elasticsearch.index.mapper.MappedFieldType;
4041
import org.elasticsearch.index.mapper.MapperBuilderContext;
4142
import org.elasticsearch.index.mapper.SourceLoader;
@@ -238,9 +239,10 @@ protected void parseCreateField(DocumentParserContext context) {
238239
throw new UnsupportedOperationException("Parsing is implemented in parse(), this method should NEVER be called");
239240
}
240241

241-
static class ExponentialHistogramFieldType extends MappedFieldType {
242+
public static final class ExponentialHistogramFieldType extends MappedFieldType {
242243

243-
ExponentialHistogramFieldType(String name, Map<String, String> meta) {
244+
// Visible for testing
245+
public ExponentialHistogramFieldType(String name, Map<String, String> meta) {
244246
super(name, IndexType.docValuesOnly(), false, meta);
245247
}
246248

@@ -445,37 +447,18 @@ public void parse(DocumentParserContext context) throws IOException {
445447
min = validateOrEstimateMin(min, zeroBucket, scale, negativeBuckets, positiveBuckets, totalValueCount, subParser);
446448
max = validateOrEstimateMax(max, zeroBucket, scale, negativeBuckets, positiveBuckets, totalValueCount, subParser);
447449

448-
BytesStreamOutput histogramBytesOutput = new BytesStreamOutput();
449-
CompressedExponentialHistogram.writeHistogramBytes(histogramBytesOutput, scale, negativeBuckets, positiveBuckets);
450-
BytesRef histoBytes = histogramBytesOutput.bytes().toBytesRef();
451-
452-
Field histoField = new BinaryDocValuesField(fullPath(), histoBytes);
453-
long thresholdAsLong = NumericUtils.doubleToSortableLong(zeroBucket.threshold());
454-
NumericDocValuesField zeroThresholdField = new NumericDocValuesField(zeroThresholdSubFieldName(fullPath()), thresholdAsLong);
455-
NumericDocValuesField valuesCountField = new NumericDocValuesField(valuesCountSubFieldName(fullPath()), totalValueCount);
456-
NumericDocValuesField sumField = new NumericDocValuesField(
457-
valuesSumSubFieldName(fullPath()),
458-
NumericUtils.doubleToSortableLong(sum)
450+
HistogramDocValueFields docValues = buildDocValueFields(
451+
fullPath(),
452+
scale,
453+
negativeBuckets,
454+
positiveBuckets,
455+
zeroBucket.threshold(),
456+
totalValueCount,
457+
sum,
458+
min,
459+
max
459460
);
460-
461-
context.doc().addWithKey(fieldType().name(), histoField);
462-
context.doc().add(zeroThresholdField);
463-
context.doc().add(valuesCountField);
464-
context.doc().add(sumField);
465-
if (min != null) {
466-
NumericDocValuesField minField = new NumericDocValuesField(
467-
valuesMinSubFieldName(fullPath()),
468-
NumericUtils.doubleToSortableLong(min)
469-
);
470-
context.doc().add(minField);
471-
}
472-
if (max != null) {
473-
NumericDocValuesField maxField = new NumericDocValuesField(
474-
valuesMaxSubFieldName(fullPath()),
475-
NumericUtils.doubleToSortableLong(max)
476-
);
477-
context.doc().add(maxField);
478-
}
461+
docValues.addToDoc(context.doc());
479462

480463
} catch (Exception ex) {
481464
if (ignoreMalformed.value() == false) {
@@ -505,6 +488,88 @@ public void parse(DocumentParserContext context) throws IOException {
505488
context.path().remove();
506489
}
507490

491+
// Visible for testing, to construct realistic doc values in tests
492+
public static HistogramDocValueFields buildDocValueFields(
493+
String fieldName,
494+
int scale,
495+
List<IndexWithCount> negativeBuckets,
496+
List<IndexWithCount> positiveBuckets,
497+
double zeroThreshold,
498+
long totalValueCount,
499+
double sum,
500+
@Nullable Double min,
501+
@Nullable Double max
502+
) throws IOException {
503+
BytesStreamOutput histogramBytesOutput = new BytesStreamOutput();
504+
CompressedExponentialHistogram.writeHistogramBytes(histogramBytesOutput, scale, negativeBuckets, positiveBuckets);
505+
BytesRef histoBytes = histogramBytesOutput.bytes().toBytesRef();
506+
507+
BinaryDocValuesField histoField = new BinaryDocValuesField(fieldName, histoBytes);
508+
long thresholdAsLong = NumericUtils.doubleToSortableLong(zeroThreshold);
509+
NumericDocValuesField zeroThresholdField = new NumericDocValuesField(zeroThresholdSubFieldName(fieldName), thresholdAsLong);
510+
NumericDocValuesField valuesCountField = new NumericDocValuesField(valuesCountSubFieldName(fieldName), totalValueCount);
511+
NumericDocValuesField sumField = new NumericDocValuesField(
512+
valuesSumSubFieldName(fieldName),
513+
NumericUtils.doubleToSortableLong(sum)
514+
);
515+
NumericDocValuesField minField = null;
516+
if (min != null) {
517+
minField = new NumericDocValuesField(valuesMinSubFieldName(fieldName), NumericUtils.doubleToSortableLong(min));
518+
}
519+
NumericDocValuesField maxField = null;
520+
if (max != null) {
521+
maxField = new NumericDocValuesField(valuesMaxSubFieldName(fieldName), NumericUtils.doubleToSortableLong(max));
522+
}
523+
HistogramDocValueFields docValues = new HistogramDocValueFields(
524+
histoField,
525+
zeroThresholdField,
526+
valuesCountField,
527+
sumField,
528+
minField,
529+
maxField
530+
);
531+
return docValues;
532+
}
533+
534+
// Visible for testing
535+
public record HistogramDocValueFields(
536+
BinaryDocValuesField histo,
537+
NumericDocValuesField zeroThreshold,
538+
NumericDocValuesField valuesCount,
539+
NumericDocValuesField sumField,
540+
@Nullable NumericDocValuesField minField,
541+
@Nullable NumericDocValuesField maxField
542+
) {
543+
544+
public void addToDoc(LuceneDocument doc) {
545+
doc.addWithKey(histo.name(), histo);
546+
doc.add(zeroThreshold);
547+
doc.add(valuesCount);
548+
doc.add(sumField);
549+
if (minField != null) {
550+
doc.add(minField);
551+
}
552+
if (maxField != null) {
553+
doc.add(maxField);
554+
}
555+
}
556+
557+
public List<IndexableField> fieldsAsList() {
558+
List<IndexableField> fields = new ArrayList<>();
559+
fields.add(histo);
560+
fields.add(zeroThreshold);
561+
fields.add(valuesCount);
562+
fields.add(sumField);
563+
if (minField != null) {
564+
fields.add(minField);
565+
}
566+
if (maxField != null) {
567+
fields.add(maxField);
568+
}
569+
return fields;
570+
}
571+
}
572+
508573
private Double validateOrEstimateSum(
509574
Double sum,
510575
Integer scale,
@@ -819,12 +884,12 @@ private static class DocValuesReader implements ExponentialHistogramValuesReader
819884
}
820885

821886
boolean hasAnyValues() {
822-
return histoDocValues != null;
887+
return valueCounts != null;
823888
}
824889

825890
@Override
826891
public boolean advanceExact(int docId) throws IOException {
827-
boolean isPresent = histoDocValues != null && histoDocValues.advanceExact(docId);
892+
boolean isPresent = valueCounts != null && valueCounts.advanceExact(docId);
828893
currentDocId = isPresent ? docId : -1;
829894
return isPresent;
830895
}
@@ -834,10 +899,10 @@ public ExponentialHistogram histogramValue() throws IOException {
834899
if (currentDocId == -1) {
835900
throw new IllegalStateException("No histogram present for current document");
836901
}
902+
boolean histoPresent = histoDocValues.advanceExact(currentDocId);
837903
boolean zeroThresholdPresent = zeroThresholds.advanceExact(currentDocId);
838-
boolean valueCountsPresent = valueCounts.advanceExact(currentDocId);
839904
boolean valueSumsPresent = valueSums.advanceExact(currentDocId);
840-
assert zeroThresholdPresent && valueCountsPresent && valueSumsPresent;
905+
assert zeroThresholdPresent && histoPresent && valueSumsPresent;
841906

842907
BytesRef encodedHistogram = histoDocValues.binaryValue();
843908
double zeroThreshold = NumericUtils.sortableLongToDouble(zeroThresholds.longValue());
@@ -858,6 +923,11 @@ public ExponentialHistogram histogramValue() throws IOException {
858923
tempHistogram.reset(zeroThreshold, valueCount, valueSum, valueMin, valueMax, encodedHistogram);
859924
return tempHistogram;
860925
}
926+
927+
@Override
928+
public long valuesCountValue() throws IOException {
929+
return valueCounts.longValue();
930+
}
861931
}
862932

863933
private class ExponentialHistogramSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer {

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramMapperPlugin.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@
1010
import org.elasticsearch.index.mapper.Mapper;
1111
import org.elasticsearch.plugins.MapperPlugin;
1212
import org.elasticsearch.plugins.Plugin;
13+
import org.elasticsearch.plugins.SearchPlugin;
14+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
15+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics.ExponentialHistogramAggregatorsRegistrar;
1316

1417
import java.util.Collections;
1518
import java.util.LinkedHashMap;
19+
import java.util.List;
1620
import java.util.Map;
21+
import java.util.function.Consumer;
1722

1823
/**
1924
* Plugin adding support for exponential histogram field types.
2025
*/
21-
public class ExponentialHistogramMapperPlugin extends Plugin implements MapperPlugin {
26+
public class ExponentialHistogramMapperPlugin extends Plugin implements MapperPlugin, SearchPlugin {
2227
@Override
2328
public Map<String, Mapper.TypeParser> getMappers() {
2429
Map<String, Mapper.TypeParser> mappers = new LinkedHashMap<>();
@@ -27,4 +32,12 @@ public Map<String, Mapper.TypeParser> getMappers() {
2732
}
2833
return Collections.unmodifiableMap(mappers);
2934
}
35+
36+
@Override
37+
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
38+
if (ExponentialHistogramFieldMapper.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled()) {
39+
return List.of(ExponentialHistogramAggregatorsRegistrar::registerValueCountAggregator);
40+
}
41+
return Collections.emptyList();
42+
}
3043
}

x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/IndexWithCount.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.exponentialhistogram.CopyableBucketIterator;
1111
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
1212

13+
import java.util.ArrayList;
1314
import java.util.List;
1415
import java.util.OptionalLong;
1516

@@ -42,6 +43,15 @@ public long valueCount() {
4243
};
4344
}
4445

46+
public static List<IndexWithCount> fromIterator(CopyableBucketIterator iterator) {
47+
List<IndexWithCount> result = new ArrayList<>();
48+
while (iterator.hasNext()) {
49+
result.add(new IndexWithCount(iterator.peekIndex(), iterator.peekCount()));
50+
iterator.advance();
51+
}
52+
return result;
53+
}
54+
4555
private static class Iterator implements CopyableBucketIterator {
4656
private final List<IndexWithCount> buckets;
4757
private final int scale;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics;
8+
9+
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
10+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
11+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.support.ExponentialHistogramValuesSourceType;
12+
13+
/**
14+
* Utility class providing static methods to register aggregators for the aggregate_metric values source
15+
*/
16+
public class ExponentialHistogramAggregatorsRegistrar {
17+
18+
public static void registerValueCountAggregator(ValuesSourceRegistry.Builder builder) {
19+
builder.register(
20+
ValueCountAggregationBuilder.REGISTRY_KEY,
21+
ExponentialHistogramValuesSourceType.EXPONENTIAL_HISTOGRAM,
22+
ExponentialHistogramValueCountAggregator::new,
23+
true
24+
);
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics;
8+
9+
import org.elasticsearch.common.util.BigArrays;
10+
import org.elasticsearch.common.util.LongArray;
11+
import org.elasticsearch.core.Releasables;
12+
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
13+
import org.elasticsearch.search.aggregations.Aggregator;
14+
import org.elasticsearch.search.aggregations.InternalAggregation;
15+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
16+
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
17+
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
18+
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
19+
import org.elasticsearch.search.aggregations.support.AggregationContext;
20+
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
21+
import org.elasticsearch.xpack.exponentialhistogram.aggregations.support.ExponentialHistogramValuesSource;
22+
import org.elasticsearch.xpack.exponentialhistogram.fielddata.ExponentialHistogramValuesReader;
23+
24+
import java.io.IOException;
25+
import java.util.Map;
26+
27+
class ExponentialHistogramValueCountAggregator extends NumericMetricsAggregator.SingleValue {
28+
29+
private final ExponentialHistogramValuesSource.ExponentialHistogram valuesSource;
30+
31+
// a count per bucket
32+
private LongArray counts;
33+
34+
ExponentialHistogramValueCountAggregator(
35+
String name,
36+
ValuesSourceConfig valuesSourceConfig,
37+
AggregationContext aggregationContext,
38+
Aggregator parent,
39+
Map<String, Object> metadata
40+
) throws IOException {
41+
super(name, aggregationContext, parent, metadata);
42+
assert valuesSourceConfig.hasValues();
43+
this.valuesSource = (ExponentialHistogramValuesSource.ExponentialHistogram) valuesSourceConfig.getValuesSource();
44+
counts = bigArrays().newLongArray(1, true);
45+
}
46+
47+
@Override
48+
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
49+
BigArrays bigArrays = bigArrays();
50+
ExponentialHistogramValuesReader values = valuesSource.getHistogramValues(aggCtx.getLeafReaderContext());
51+
52+
return new LeafBucketCollectorBase(sub, values) {
53+
@Override
54+
public void collect(int doc, long bucket) throws IOException {
55+
counts = bigArrays.grow(counts, bucket + 1);
56+
if (values.advanceExact(doc)) {
57+
counts.increment(bucket, values.valuesCountValue());
58+
}
59+
}
60+
};
61+
}
62+
63+
@Override
64+
public double metric(long owningBucketOrd) {
65+
return owningBucketOrd >= counts.size() ? 0 : counts.get(owningBucketOrd);
66+
}
67+
68+
@Override
69+
public InternalAggregation buildAggregation(long bucket) {
70+
if (bucket >= counts.size()) {
71+
return buildEmptyAggregation();
72+
}
73+
return new InternalValueCount(name, counts.get(bucket), metadata());
74+
}
75+
76+
@Override
77+
public InternalAggregation buildEmptyAggregation() {
78+
return InternalValueCount.empty(name, metadata());
79+
}
80+
81+
@Override
82+
public void doClose() {
83+
Releasables.close(counts);
84+
}
85+
86+
}

0 commit comments

Comments
 (0)