Skip to content
Merged
5 changes: 5 additions & 0 deletions x-pack/plugin/mapper-exponential-histogram/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ base {
archivesName = 'x-pack-exponential-histogram'
}

restResources {
restApi {
include 'bulk', 'search'
}
}
dependencies {
api project(":libs:exponential-histogram")
compileOnly project(path: xpackModule('core'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
package org.elasticsearch.xpack.exponentialhistogram;

import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
Expand All @@ -35,6 +35,7 @@
import org.elasticsearch.index.mapper.DocumentParsingException;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
import org.elasticsearch.index.mapper.LuceneDocument;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperBuilderContext;
import org.elasticsearch.index.mapper.SourceLoader;
Expand Down Expand Up @@ -237,9 +238,10 @@ protected void parseCreateField(DocumentParserContext context) {
throw new UnsupportedOperationException("Parsing is implemented in parse(), this method should NEVER be called");
}

static class ExponentialHistogramFieldType extends MappedFieldType {
public static class ExponentialHistogramFieldType extends MappedFieldType {

ExponentialHistogramFieldType(String name, Map<String, String> meta) {
// Visible for testing
public ExponentialHistogramFieldType(String name, Map<String, String> meta) {
super(name, false, false, true, meta);
}

Expand Down Expand Up @@ -439,37 +441,18 @@ public void parse(DocumentParserContext context) throws IOException {
min = validateOrEstimateMin(min, zeroBucket, scale, negativeBuckets, positiveBuckets, totalValueCount, subParser);
max = validateOrEstimateMax(max, zeroBucket, scale, negativeBuckets, positiveBuckets, totalValueCount, subParser);

BytesStreamOutput histogramBytesOutput = new BytesStreamOutput();
CompressedExponentialHistogram.writeHistogramBytes(histogramBytesOutput, scale, negativeBuckets, positiveBuckets);
BytesRef histoBytes = histogramBytesOutput.bytes().toBytesRef();

Field histoField = new BinaryDocValuesField(fullPath(), histoBytes);
long thresholdAsLong = NumericUtils.doubleToSortableLong(zeroBucket.threshold());
NumericDocValuesField zeroThresholdField = new NumericDocValuesField(zeroThresholdSubFieldName(fullPath()), thresholdAsLong);
NumericDocValuesField valuesCountField = new NumericDocValuesField(valuesCountSubFieldName(fullPath()), totalValueCount);
NumericDocValuesField sumField = new NumericDocValuesField(
valuesSumSubFieldName(fullPath()),
NumericUtils.doubleToSortableLong(sum)
HistogramDocValueFields docValues = buildDocValueFields(
fullPath(),
scale,
negativeBuckets,
positiveBuckets,
zeroBucket.threshold(),
totalValueCount,
sum,
min,
max
);

context.doc().addWithKey(fieldType().name(), histoField);
context.doc().add(zeroThresholdField);
context.doc().add(valuesCountField);
context.doc().add(sumField);
if (min != null) {
NumericDocValuesField minField = new NumericDocValuesField(
valuesMinSubFieldName(fullPath()),
NumericUtils.doubleToSortableLong(min)
);
context.doc().add(minField);
}
if (max != null) {
NumericDocValuesField maxField = new NumericDocValuesField(
valuesMaxSubFieldName(fullPath()),
NumericUtils.doubleToSortableLong(max)
);
context.doc().add(maxField);
}
docValues.addToDoc(context.doc());

} catch (Exception ex) {
if (ignoreMalformed.value() == false) {
Expand Down Expand Up @@ -499,6 +482,88 @@ public void parse(DocumentParserContext context) throws IOException {
context.path().remove();
}

// Visible for testing, to construct realistic doc values in tests
public static HistogramDocValueFields buildDocValueFields(
String fieldName,
int scale,
List<IndexWithCount> negativeBuckets,
List<IndexWithCount> positiveBuckets,
double zeroThreshold,
long totalValueCount,
double sum,
@Nullable Double min,
@Nullable Double max
) throws IOException {
BytesStreamOutput histogramBytesOutput = new BytesStreamOutput();
CompressedExponentialHistogram.writeHistogramBytes(histogramBytesOutput, scale, negativeBuckets, positiveBuckets);
BytesRef histoBytes = histogramBytesOutput.bytes().toBytesRef();

BinaryDocValuesField histoField = new BinaryDocValuesField(fieldName, histoBytes);
long thresholdAsLong = NumericUtils.doubleToSortableLong(zeroThreshold);
NumericDocValuesField zeroThresholdField = new NumericDocValuesField(zeroThresholdSubFieldName(fieldName), thresholdAsLong);
NumericDocValuesField valuesCountField = new NumericDocValuesField(valuesCountSubFieldName(fieldName), totalValueCount);
NumericDocValuesField sumField = new NumericDocValuesField(
valuesSumSubFieldName(fieldName),
NumericUtils.doubleToSortableLong(sum)
);
NumericDocValuesField minField = null;
if (min != null) {
minField = new NumericDocValuesField(valuesMinSubFieldName(fieldName), NumericUtils.doubleToSortableLong(min));
}
NumericDocValuesField maxField = null;
if (max != null) {
maxField = new NumericDocValuesField(valuesMaxSubFieldName(fieldName), NumericUtils.doubleToSortableLong(max));
}
HistogramDocValueFields docValues = new HistogramDocValueFields(
histoField,
zeroThresholdField,
valuesCountField,
sumField,
minField,
maxField
);
return docValues;
}

// Visible for testing
public record HistogramDocValueFields(
BinaryDocValuesField histo,
NumericDocValuesField zeroThreshold,
NumericDocValuesField valuesCount,
NumericDocValuesField sumField,
@Nullable NumericDocValuesField minField,
@Nullable NumericDocValuesField maxField
) {

public void addToDoc(LuceneDocument doc) {
doc.addWithKey(histo.name(), histo);
doc.add(zeroThreshold);
doc.add(valuesCount);
doc.add(sumField);
if (minField != null) {
doc.add(minField);
}
if (maxField != null) {
doc.add(maxField);
}
}

public List<IndexableField> fieldsAsList() {
List<IndexableField> fields = new ArrayList<>();
fields.add(histo);
fields.add(zeroThreshold);
fields.add(valuesCount);
fields.add(sumField);
if (minField != null) {
fields.add(minField);
}
if (maxField != null) {
fields.add(maxField);
}
return fields;
}
}

private Double validateOrEstimateSum(
Double sum,
Integer scale,
Expand Down Expand Up @@ -813,12 +878,12 @@ private static class DocValuesReader implements ExponentialHistogramValuesReader
}

boolean hasAnyValues() {
return histoDocValues != null;
return valueCounts != null;
}

@Override
public boolean advanceExact(int docId) throws IOException {
boolean isPresent = histoDocValues != null && histoDocValues.advanceExact(docId);
boolean isPresent = valueCounts != null && valueCounts.advanceExact(docId);
currentDocId = isPresent ? docId : -1;
return isPresent;
}
Expand All @@ -828,10 +893,10 @@ public ExponentialHistogram histogramValue() throws IOException {
if (currentDocId == -1) {
throw new IllegalStateException("No histogram present for current document");
}
boolean histoPresent = histoDocValues.advanceExact(currentDocId);
boolean zeroThresholdPresent = zeroThresholds.advanceExact(currentDocId);
boolean valueCountsPresent = valueCounts.advanceExact(currentDocId);
boolean valueSumsPresent = valueSums.advanceExact(currentDocId);
assert zeroThresholdPresent && valueCountsPresent && valueSumsPresent;
assert zeroThresholdPresent && histoPresent && valueSumsPresent;

BytesRef encodedHistogram = histoDocValues.binaryValue();
double zeroThreshold = NumericUtils.sortableLongToDouble(zeroThresholds.longValue());
Expand All @@ -852,6 +917,11 @@ public ExponentialHistogram histogramValue() throws IOException {
tempHistogram.reset(zeroThreshold, valueCount, valueSum, valueMin, valueMax, encodedHistogram);
return tempHistogram;
}

@Override
public long valuesCountValue() throws IOException {
return valueCounts.longValue();
}
}

private class ExponentialHistogramSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics.ExponentialHistogramAggregatorsRegistrar;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* Plugin adding support for exponential histogram field types.
*/
public class ExponentialHistogramMapperPlugin extends Plugin implements MapperPlugin {
public class ExponentialHistogramMapperPlugin extends Plugin implements MapperPlugin, SearchPlugin {
@Override
public Map<String, Mapper.TypeParser> getMappers() {
Map<String, Mapper.TypeParser> mappers = new LinkedHashMap<>();
Expand All @@ -27,4 +32,12 @@ public Map<String, Mapper.TypeParser> getMappers() {
}
return Collections.unmodifiableMap(mappers);
}

@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
if (ExponentialHistogramFieldMapper.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled()) {
return List.of(ExponentialHistogramAggregatorsRegistrar::registerValueCountAggregator);
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.exponentialhistogram.CopyableBucketIterator;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;

import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;

Expand Down Expand Up @@ -42,6 +43,15 @@ public long valueCount() {
};
}

public static List<IndexWithCount> fromIterator(CopyableBucketIterator iterator) {
List<IndexWithCount> result = new ArrayList<>();
while (iterator.hasNext()) {
result.add(new IndexWithCount(iterator.peekIndex(), iterator.peekCount()));
iterator.advance();
}
return result;
}

private static class Iterator implements CopyableBucketIterator {
private final List<IndexWithCount> buckets;
private final int scale;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics;

import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.exponentialhistogram.aggregations.support.ExponentialHistogramValuesSourceType;

/**
* Utility class providing static methods to register aggregators for the aggregate_metric values source
*/
public class ExponentialHistogramAggregatorsRegistrar {

public static void registerValueCountAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(
ValueCountAggregationBuilder.REGISTRY_KEY,
ExponentialHistogramValuesSourceType.EXPONENTIAL_HISTOGRAM,
ExponentialHistogramValueCountAggregator::new,
true
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.exponentialhistogram.aggregations.metrics;

import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.xpack.exponentialhistogram.aggregations.support.ExponentialHistogramValuesSource;
import org.elasticsearch.xpack.exponentialhistogram.fielddata.ExponentialHistogramValuesReader;

import java.io.IOException;
import java.util.Map;

class ExponentialHistogramValueCountAggregator extends NumericMetricsAggregator.SingleValue {

private final ExponentialHistogramValuesSource.ExponentialHistogram valuesSource;

// a count per bucket
private LongArray counts;

ExponentialHistogramValueCountAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
AggregationContext aggregationContext,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, aggregationContext, parent, metadata);
assert valuesSourceConfig.hasValues();
this.valuesSource = (ExponentialHistogramValuesSource.ExponentialHistogram) valuesSourceConfig.getValuesSource();
counts = bigArrays().newLongArray(1, true);
}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
BigArrays bigArrays = bigArrays();
ExponentialHistogramValuesReader values = valuesSource.getHistogramValues(aggCtx.getLeafReaderContext());

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
if (values.advanceExact(doc)) {
counts.increment(bucket, values.valuesCountValue());
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
return owningBucketOrd >= counts.size() ? 0 : counts.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (bucket >= counts.size()) {
return buildEmptyAggregation();
}
return new InternalValueCount(name, counts.get(bucket), metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return InternalValueCount.empty(name, metadata());
}

@Override
public void doClose() {
Releasables.close(counts);
}

}
Loading