Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b7584ad
OTLP: add support for histograms
felixbarny Aug 31, 2025
8492a44
[CI] Auto commit changes from spotless
Sep 1, 2025
b3c2f4b
Apply suggestions from code review
felixbarny Sep 1, 2025
f9e64c6
[CI] Auto commit changes from spotless
Sep 1, 2025
444f614
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 1, 2025
222f9aa
Add missing import
felixbarny Sep 1, 2025
2ed63ea
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 1, 2025
95b1bda
Add buildMetricValue method to histogram data points
felixbarny Sep 1, 2025
6842cd4
[CI] Auto commit changes from spotless
Sep 1, 2025
f159188
Simplify boolean expression
felixbarny Sep 1, 2025
cf6a514
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 1, 2025
ed203a4
Add support for mapping hints
felixbarny Sep 1, 2025
0d588ff
Merge branch 'main' into otlp-histograms
felixbarny Sep 2, 2025
64cd047
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 2, 2025
d3cfe08
Fix compile error after merge
felixbarny Sep 2, 2025
8c4c621
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 2, 2025
7f5d421
Fix compile error after merge
felixbarny Sep 2, 2025
b26d645
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 2, 2025
a8efd23
[CI] Auto commit changes from spotless
Sep 2, 2025
2e5866d
Merge branch 'main' into otlp-histograms
felixbarny Sep 6, 2025
f019162
Add rest tests for histograms
felixbarny Sep 8, 2025
90cbac2
Add comment about converting exponential histograms to TDigest
felixbarny Sep 8, 2025
ed73673
[CI] Auto commit changes from spotless
Sep 8, 2025
18e7f76
Add accuracy tests for histogram converter
felixbarny Sep 8, 2025
06a0039
Merge remote-tracking branch 'felixbarny/otlp-histograms' into otlp-h…
felixbarny Sep 8, 2025
ca49987
[CI] Auto commit changes from spotless
Sep 8, 2025
9cf95b0
Test with more distributions
felixbarny Sep 9, 2025
10d6556
Merge remote-tracking branch 'origin/main' into otlp-histograms
felixbarny Sep 9, 2025
35d4c08
Merge remote-tracking branch 'felixbarny/otlp-histograms' into otlp-h…
felixbarny Sep 9, 2025
2bbdd25
Add comment about error bound
felixbarny Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/exponential-histogram/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// TODO: publish this when ready?
//apply plugin: 'elasticsearch.publish'
apply plugin: 'elasticsearch.build'
apply plugin: 'elasticsearch.internal-test-artifact'

dependencies {
api project(':libs:core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ExponentialHistogram createAutoReleasedHistogram(Consumer<ExponentialHistogramBu
return result;
}

ExponentialHistogram createAutoReleasedHistogram(int numBuckets, double... values) {
protected ExponentialHistogram createAutoReleasedHistogram(int numBuckets, double... values) {
ReleasableExponentialHistogram result = ExponentialHistogram.create(numBuckets, breaker(), values);
releaseBeforeEnd.add(result);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void testDistributionQuantileAccuracy(RealDistribution distribution) {
testQuantileAccuracy(values, bucketCount);
}

private static double[] generateSamples(RealDistribution distribution, int sampleSize) {
public static double[] generateSamples(RealDistribution distribution, int sampleSize) {
double[] values = new double[sampleSize];
for (int i = 0; i < sampleSize; i++) {
values[i] = distribution.sample();
Expand Down Expand Up @@ -276,7 +276,7 @@ private double testQuantileAccuracy(double[] values, int bucketCount) {
* The error depends on the raw values put into the histogram and the number of buckets allowed.
* This is an implementation of the error bound computation proven by Theorem 3 in the <a href="https://arxiv.org/pdf/2004.08604">UDDSketch paper</a>
*/
private static double getMaximumRelativeError(double[] values, int bucketCount) {
public static double getMaximumRelativeError(double[] values, int bucketCount) {
HashSet<Long> usedPositiveIndices = new HashSet<>();
HashSet<Long> usedNegativeIndices = new HashSet<>();
int bestPossibleScale = MAX_SCALE;
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/otel-data/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ dependencies {
api project(":libs:exponential-histogram")
compileOnly project(path: xpackModule('core'))
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation(testArtifact(project(":libs:exponential-histogram")))
testImplementation('org.apache.commons:commons-math3:3.6.1')
clusterModules project(':modules:data-streams')
clusterModules project(':modules:ingest-common')
clusterModules project(':modules:ingest-geoip')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.oteldata.otlp;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
Expand All @@ -15,9 +16,14 @@
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.data.HistogramData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
Expand Down Expand Up @@ -48,6 +54,7 @@
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
import static org.elasticsearch.test.rest.ObjectPath.evaluate;
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
import static org.hamcrest.Matchers.aMapWithSize;
Expand Down Expand Up @@ -133,12 +140,12 @@ public void testIngestMetricViaMeterProvider() throws Exception {
ObjectPath search = search("metrics-generic.otel-default");
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class));
assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class));
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
assertThat(evaluate(source, "@timestamp"), isA(String.class));
assertThat(evaluate(source, "start_timestamp"), isA(String.class));
assertThat(evaluate(source, "_metric_names_hash"), isA(String.class));
assertThat(ObjectPath.<Number>evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory));
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
assertThat(evaluate(source, "unit"), equalTo("By"));
assertThat(evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
}

public void testIngestMetricDataViaMetricExporter() throws Exception {
Expand All @@ -150,13 +157,13 @@ public void testIngestMetricDataViaMetricExporter() throws Exception {
ObjectPath search = search("metrics-generic.otel-default");
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
assertThat(evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
assertThat(evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
assertThat(evaluate(source, "_metric_names_hash"), isA(String.class));
assertThat(ObjectPath.<Number>evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory));
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
assertThat(evaluate(source, "unit"), equalTo("By"));
assertThat(evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
assertThat(evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
}

public void testGroupingSameGroup() throws Exception {
Expand Down Expand Up @@ -197,11 +204,11 @@ public void testGauge() throws Exception {
createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now)
)
);
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double"));
assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(metrics, "double_gauge.type"), equalTo("double"));
assertThat(evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
assertThat(evaluate(metrics, "long_gauge.type"), equalTo("long"));
assertThat(evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
}

public void testCounterTemporality() throws Exception {
Expand All @@ -213,11 +220,11 @@ public void testCounterTemporality() throws Exception {
)
);

Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
assertThat(evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
assertThat(evaluate(metrics, "delta_counter.type"), equalTo("long"));
assertThat(evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
}

public void testCounterMonotonicity() throws Exception {
Expand All @@ -230,11 +237,96 @@ public void testCounterMonotonicity() throws Exception {
)
);

Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(metrics, "up_down_counter.type"), equalTo("long"));
assertThat(evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
assertThat(evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
assertThat(evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
}

public void testExponentialHistograms() throws Exception {
long now = Clock.getDefault().now();
export(List.of(createExponentialHistogram(now, "exponential_histogram", DELTA, Attributes.empty())));

Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(mappings, "exponential_histogram.type"), equalTo("histogram"));

// Get document and check values/counts array
ObjectPath search = search("metrics-generic.otel-default");
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(evaluate(source, "metrics.exponential_histogram.counts"), equalTo(List.of(2, 1, 10, 1, 2)));
assertThat(evaluate(source, "metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0)));
}

public void testExponentialHistogramsAsAggregateMetricDouble() throws Exception {
long now = Clock.getDefault().now();
export(
List.of(
createExponentialHistogram(
now,
"exponential_histogram_summary",
DELTA,
Attributes.of(
AttributeKey.stringArrayKey("elasticsearch.mapping.hints"),
List.of("aggregate_metric_double", "_doc_count")
)
)
)
);

Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(mappings, "exponential_histogram_summary.type"), equalTo("aggregate_metric_double"));

ObjectPath search = search("metrics-generic.otel-default");
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(evaluate(source, "_doc_count"), equalTo(16));
assertThat(evaluate(source, "metrics.exponential_histogram_summary.value_count"), equalTo(16));
assertThat(evaluate(source, "metrics.exponential_histogram_summary.sum"), equalTo(10.0));
}

public void testHistogram() throws Exception {
long now = Clock.getDefault().now();
export(List.of(createHistogram(now, "histogram", DELTA, Attributes.empty())));

Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(metrics, "histogram.type"), equalTo("histogram"));

// Get document and check values/counts array
ObjectPath search = search("metrics-generic.otel-default");
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(evaluate(source, "metrics.histogram.counts"), equalTo(List.of(1, 2, 3, 4, 5, 6)));
List<Double> values = evaluate(source, "metrics.histogram.values");
assertThat(values, equalTo(List.of(1.0, 3.0, 5.0, 7.0, 9.0, 10.0)));
}

public void testHistogramAsAggregateMetricDouble() throws Exception {
long now = Clock.getDefault().now();
export(
List.of(
createHistogram(
now,
"histogram_summary",
DELTA,
Attributes.of(
AttributeKey.stringArrayKey("elasticsearch.mapping.hints"),
List.of("aggregate_metric_double", "_doc_count")
)
)
)
);

Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(evaluate(metrics, "histogram_summary.type"), equalTo("aggregate_metric_double"));

ObjectPath search = search("metrics-generic.otel-default");
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(evaluate(source, "_doc_count"), equalTo(21));
assertThat(evaluate(source, "metrics.histogram_summary.value_count"), equalTo(21));
assertThat(evaluate(source, "metrics.histogram_summary.sum"), equalTo(10.0));
}

public void testTsidForBulkIsSame() throws Exception {
Expand Down Expand Up @@ -316,7 +408,7 @@ private static Map<String, Object> getMapping(String target) throws IOException
Map<String, Object> mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping")))
.evaluate("");
assertThat(mappings, aMapWithSize(1));
Map<String, Object> mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings");
Map<String, Object> mapping = evaluate(mappings.values().iterator().next(), "mappings");
assertThat(mapping, not(anEmptyMap()));
return mapping;
}
Expand Down Expand Up @@ -420,4 +512,66 @@ public boolean isMonotonic() {
return monotonic;
}
}

private static MetricData createHistogram(long timeEpochNanos, String name, AggregationTemporality temporality, Attributes attributes) {
return ImmutableMetricData.createDoubleHistogram(
TEST_RESOURCE,
TEST_SCOPE,
name,
"Histogram Test",
"ms",
HistogramData.create(
temporality,
List.of(
HistogramPointData.create(
timeEpochNanos,
timeEpochNanos,
attributes,
10,
false,
0,
false,
0,
List.of(2.0, 4.0, 6.0, 8.0, 10.0),
List.of(1L, 2L, 3L, 4L, 5L, 6L)
)
)
)
);
}

private static MetricData createExponentialHistogram(
long timeEpochNanos,
String name,
AggregationTemporality temporality,
Attributes attributes
) {
return ImmutableMetricData.createExponentialHistogram(
TEST_RESOURCE,
TEST_SCOPE,
name,
"Exponential Histogram Test",
"ms",
ImmutableExponentialHistogramData.create(
temporality,
List.of(
ImmutableExponentialHistogramPointData.create(
0,
10,
10,
false,
0,
false,
0,
ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)),
ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)),
timeEpochNanos,
timeEpochNanos,
attributes,
List.of()
)
)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private static void handlePartialSuccess(ActionListener<MetricsResponse> listene
// the server MUST respond with HTTP 200 OK.
// https://opentelemetry.io/docs/specs/otlp/#partial-success-1
MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage());
listener.onResponse(new MetricsResponse(RestStatus.OK, response));
listener.onResponse(new MetricsResponse(RestStatus.BAD_REQUEST, response));
}

private static void handlePartialSuccess(
Expand Down
Loading