Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b7584ad
OTLP: add support for histograms
felixbarny Aug 31, 2025
8492a44
[CI] Auto commit changes from spotless
Sep 1, 2025
b3c2f4b
Apply suggestions from code review
felixbarny Sep 1, 2025
f9e64c6
[CI] Auto commit changes from spotless
Sep 1, 2025
444f614
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 1, 2025
222f9aa
Add missing import
felixbarny Sep 1, 2025
2ed63ea
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 1, 2025
95b1bda
Add buildMetricValue method to histogram data points
felixbarny Sep 1, 2025
6842cd4
[CI] Auto commit changes from spotless
Sep 1, 2025
f159188
Simplify boolean expression
felixbarny Sep 1, 2025
cf6a514
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 1, 2025
ed203a4
Add support for mapping hints
felixbarny Sep 1, 2025
0d588ff
Merge branch 'main' into otlp-histograms
felixbarny Sep 2, 2025
64cd047
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 2, 2025
d3cfe08
Fix compile error after merge
felixbarny Sep 2, 2025
8c4c621
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 2, 2025
7f5d421
Fix compile error after merge
felixbarny Sep 2, 2025
b26d645
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 2, 2025
a8efd23
[CI] Auto commit changes from spotless
Sep 2, 2025
2e5866d
Merge branch 'main' into otlp-histograms
felixbarny Sep 6, 2025
f019162
Add rest tests for histograms
felixbarny Sep 8, 2025
90cbac2
Add comment about converting exponential histograms to TDigest
felixbarny Sep 8, 2025
ed73673
[CI] Auto commit changes from spotless
Sep 8, 2025
18e7f76
Add accuracy tests for histogram converter
felixbarny Sep 8, 2025
06a0039
Merge remote-tracking branch 'felixbarny/otlp-histograms' into otlp-h…
felixbarny Sep 8, 2025
ca49987
[CI] Auto commit changes from spotless
Sep 8, 2025
9cf95b0
Test with more distributions
felixbarny Sep 9, 2025
10d6556
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 9, 2025
35d4c08
Merge remote-tracking branch 'felixbarny/otlp-histograms' into otlp-h…
felixbarny Sep 9, 2025
2bbdd25
Add comment about error bound
felixbarny Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
package org.elasticsearch.xpack.oteldata.otlp.datapoint;

import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
Expand Down Expand Up @@ -171,6 +174,143 @@ public boolean isValid(Set<String> errors) {
}
}

record ExponentialHistogram(ExponentialHistogramDataPoint dataPoint, Metric metric) implements DataPoint {

@Override
public long getTimestampUnixNano() {
return dataPoint.getTimeUnixNano();
}

@Override
public List<KeyValue> getAttributes() {
return dataPoint.getAttributesList();
}

@Override
public long getStartTimestampUnixNano() {
return dataPoint.getStartTimeUnixNano();
}

@Override
public String getUnit() {
return metric.getUnit();
}

@Override
public String getMetricName() {
return metric.getName();
}

@Override
public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException {
if (mappingHints.aggregateMetricDouble()) {
buildAggregateMetricDouble(builder, dataPoint.getSum(), dataPoint.getCount());
} else {
builder.startObject();
builder.startArray("counts");
HistogramConverter.counts(dataPoint, builder::value);
builder.endArray();
builder.startArray("values");
HistogramConverter.centroidValues(dataPoint, builder::value);
builder.endArray();
builder.endObject();
}
}

@Override
public long getDocCount() {
return dataPoint.getCount();
}

@Override
public String getDynamicTemplate(MappingHints mappingHints) {
if (mappingHints.aggregateMetricDouble()) {
return "summary";
} else {
return "histogram";
}
}

@Override
public boolean isValid(Set<String> errors) {
if (metric.getExponentialHistogram().getAggregationTemporality() != AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) {
errors.add("cumulative exponential histogram metrics are not supported, ignoring " + metric.getName());
return false;
}
return true;
}
}

record Histogram(HistogramDataPoint dataPoint, Metric metric) implements DataPoint {
@Override
public long getTimestampUnixNano() {
return dataPoint.getTimeUnixNano();
}

@Override
public List<KeyValue> getAttributes() {
return dataPoint.getAttributesList();
}

@Override
public long getStartTimestampUnixNano() {
return dataPoint.getStartTimeUnixNano();
}

@Override
public String getUnit() {
return metric.getUnit();
}

@Override
public String getMetricName() {
return metric.getName();
}

@Override
public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException {
if (mappingHints.aggregateMetricDouble()) {
buildAggregateMetricDouble(builder, dataPoint.getSum(), dataPoint.getCount());
} else {
builder.startObject();
builder.startArray("counts");
HistogramConverter.counts(dataPoint, builder::value);
builder.endArray();
builder.startArray("values");
HistogramConverter.centroidValues(dataPoint, builder::value);
builder.endArray();
builder.endObject();
}
}

@Override
public long getDocCount() {
return dataPoint.getCount();
}

@Override
public String getDynamicTemplate(MappingHints mappingHints) {
if (mappingHints.aggregateMetricDouble()) {
return "summary";
} else {
return "histogram";
}
}

@Override
public boolean isValid(Set<String> errors) {
if (metric.getHistogram().getAggregationTemporality() != AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) {
errors.add("cumulative histogram metrics are not supported, ignoring " + metric.getName());
return false;
}
if (dataPoint.getBucketCountsCount() == 1 && dataPoint.getExplicitBoundsCount() == 0) {
errors.add("histogram with a single bucket and no explicit bounds is not supported, ignoring " + metric.getName());
return false;
}
return true;
}
}

record Summary(SummaryDataPoint dataPoint, Metric metric) implements DataPoint {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ public void groupDataPoints(ExportMetricsServiceRequest exportMetricsServiceRequ
scopeGroup.addDataPoints(metric, metric.getGauge().getDataPointsList(), DataPoint.Number::new);
break;
case EXPONENTIAL_HISTOGRAM:
ignoredDataPoints += metric.getExponentialHistogram().getDataPointsCount();
ignoredDataPointMessages.add("Exponential histogram is not supported yet. Dropping " + metric.getName());
scopeGroup.addDataPoints(
metric,
metric.getExponentialHistogram().getDataPointsList(),
DataPoint.ExponentialHistogram::new
);
break;
case HISTOGRAM:
ignoredDataPoints += metric.getHistogram().getDataPointsCount();
ignoredDataPointMessages.add("Histogram is not supported yet. Dropping " + metric.getName());
scopeGroup.addDataPoints(metric, metric.getHistogram().getDataPointsList(), DataPoint.Histogram::new);
break;
case SUMMARY:
scopeGroup.addDataPoints(metric, metric.getSummary().getDataPointsList(), DataPoint.Summary::new);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp.datapoint;

import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;

import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;

/**
* Utility class to convert OpenTelemetry histogram data points into counts and centroid values
* so that we can use it with the {@code histogram} field type.
* This class provides methods to extract counts and centroid values from both
* {@link ExponentialHistogramDataPoint} and {@link HistogramDataPoint}.
* The algorithm is ported over from the OpenTelemetry collector's Elasticsearch exporter.
* @see <a href="https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.132.0/exporter/elasticsearchexporter">
* Elasticsearch exporter on GitHub
* </a>
*/
class HistogramConverter {

static <E extends Exception> void counts(ExponentialHistogramDataPoint dp, CheckedLongConsumer<E> counts) throws E {
ExponentialHistogramDataPoint.Buckets negative = dp.getNegative();

for (int i = negative.getBucketCountsCount() - 1; i >= 0; i--) {
long count = negative.getBucketCounts(i);
if (count != 0) {
counts.accept(count);
}
}

long zeroCount = dp.getZeroCount();
if (zeroCount > 0) {
counts.accept(zeroCount);
}

ExponentialHistogramDataPoint.Buckets positive = dp.getPositive();
for (int i = 0; i < positive.getBucketCountsCount(); i++) {
long count = positive.getBucketCounts(i);
if (count != 0) {
counts.accept(count);
}
}
}

/**
* @see <a
* href="https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.132.0/exporter/elasticsearchexporter/internal/exphistogram/exphistogram.go">
* <code>ToTDigest</code> function
* </a>
*/
static <E extends Exception> void centroidValues(ExponentialHistogramDataPoint dp, CheckedDoubleConsumer<E> values) throws E {
int scale = dp.getScale();
ExponentialHistogramDataPoint.Buckets negative = dp.getNegative();

int offset = negative.getOffset();
for (int i = negative.getBucketCountsCount() - 1; i >= 0; i--) {
long count = negative.getBucketCounts(i);
if (count != 0) {
double lb = -ExponentialScaleUtils.getUpperBucketBoundary(offset + i, scale);
double ub = -ExponentialScaleUtils.getLowerBucketBoundary(offset + i, scale);
values.accept(lb + (ub - lb) / 2);
}
}

long zeroCount = dp.getZeroCount();
if (zeroCount > 0) {
values.accept(0.0);
}

ExponentialHistogramDataPoint.Buckets positive = dp.getPositive();
offset = positive.getOffset();
for (int i = 0; i < positive.getBucketCountsCount(); i++) {
long count = positive.getBucketCounts(i);
if (count != 0) {
double lb = ExponentialScaleUtils.getLowerBucketBoundary(offset + i, scale);
double ub = ExponentialScaleUtils.getUpperBucketBoundary(offset + i, scale);
values.accept(lb + (ub - lb) / 2);
}
}
}

static <E extends Exception> void counts(HistogramDataPoint dp, CheckedLongConsumer<E> counts) throws E {
for (int i = 0; i < dp.getBucketCountsCount(); i++) {
long count = dp.getBucketCounts(i);
if (count != 0) {
counts.accept(count);
}
}
}

/**
* @see <a
* href="https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.132.0/exporter/elasticsearchexporter/internal/datapoints/histogram.go">
* <code>histogramToValue</code> function
* </a>
*/
static <E extends Exception> void centroidValues(HistogramDataPoint dp, CheckedDoubleConsumer<E> values) throws E {
int size = dp.getBucketCountsCount();
for (int i = 0; i < size; i++) {
long count = dp.getBucketCounts(i);
if (count != 0) {
double value;
if (i == 0) {
// (-infinity, explicit_bounds[i]]
value = dp.getExplicitBounds(i);
if (value > 0) {
value /= 2;
}
} else if (i == size - 1) {
// (explicit_bounds[i], +infinity)
value = dp.getExplicitBounds(i - 1);
} else {
// [explicit_bounds[i-1], explicit_bounds[i])
// Use the midpoint between the boundaries.
value = dp.getExplicitBounds(i - 1) + (dp.getExplicitBounds(i) - dp.getExplicitBounds(i - 1)) / 2.0;
}
values.accept(value);
}
}
}

interface CheckedLongConsumer<E extends Exception> {
void accept(long value) throws E;
}

interface CheckedDoubleConsumer<E extends Exception> {
void accept(double value) throws E;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.common.v1.KeyValueList;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogram;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
Expand Down Expand Up @@ -97,6 +101,34 @@ public static Metric createGaugeMetric(String name, String unit, List<NumberData
return Metric.newBuilder().setName(name).setUnit(unit).setGauge(Gauge.newBuilder().addAllDataPoints(dataPoints).build()).build();
}

public static Metric createExponentialHistogramMetric(
String name,
String unit,
List<ExponentialHistogramDataPoint> dataPoints,
AggregationTemporality temporality
) {
return Metric.newBuilder()
.setName(name)
.setUnit(unit)
.setExponentialHistogram(
ExponentialHistogram.newBuilder().setAggregationTemporality(temporality).addAllDataPoints(dataPoints).build()
)
.build();
}

public static Metric createHistogramMetric(
String name,
String unit,
List<HistogramDataPoint> dataPoints,
AggregationTemporality temporality
) {
return Metric.newBuilder()
.setName(name)
.setUnit(unit)
.setHistogram(Histogram.newBuilder().setAggregationTemporality(temporality).addAllDataPoints(dataPoints).build())
.build();
}

public static Metric createSumMetric(
String name,
String unit,
Expand Down
Loading