diff --git a/libs/exponential-histogram/build.gradle b/libs/exponential-histogram/build.gradle index 493f7ecfc4886..8b82fcacfd2b9 100644 --- a/libs/exponential-histogram/build.gradle +++ b/libs/exponential-histogram/build.gradle @@ -10,6 +10,7 @@ // TODO: publish this when ready? //apply plugin: 'elasticsearch.publish' apply plugin: 'elasticsearch.build' +apply plugin: 'elasticsearch.internal-test-artifact' dependencies { api project(':libs:core') diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestCase.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestCase.java index 174646dfb4582..649c116f27d01 100644 --- a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestCase.java +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestCase.java @@ -70,7 +70,7 @@ ExponentialHistogram createAutoReleasedHistogram(ConsumerUDDSketch paper */ - private static double getMaximumRelativeError(double[] values, int bucketCount) { + public static double getMaximumRelativeError(double[] values, int bucketCount) { HashSet usedPositiveIndices = new HashSet<>(); HashSet usedNegativeIndices = new HashSet<>(); int bestPossibleScale = MAX_SCALE; diff --git a/x-pack/plugin/otel-data/build.gradle b/x-pack/plugin/otel-data/build.gradle index 263aa5674c656..3219444c753b9 100644 --- a/x-pack/plugin/otel-data/build.gradle +++ b/x-pack/plugin/otel-data/build.gradle @@ -51,6 +51,8 @@ dependencies { api project(":libs:exponential-histogram") compileOnly project(path: xpackModule('core')) testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation(testArtifact(project(":libs:exponential-histogram"))) + testImplementation('org.apache.commons:commons-math3:3.6.1') clusterModules project(':modules:data-streams') clusterModules project(':modules:ingest-common') clusterModules project(':modules:ingest-geoip') diff --git a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java index 36a4eda0de785..629b9b66e7fcb 100644 --- a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java +++ b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.oteldata.otlp; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; @@ -15,9 +16,14 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; @@ -48,6 +54,7 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; +import static org.elasticsearch.test.rest.ObjectPath.evaluate; import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC; import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC; import static org.hamcrest.Matchers.aMapWithSize; @@ -133,12 +140,12 @@ public void testIngestMetricViaMeterProvider() throws Exception { ObjectPath search = search("metrics-generic.otel-default"); assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); var source = search.evaluate("hits.hits.0._source"); - assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class)); - assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class)); - assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class)); + assertThat(evaluate(source, "@timestamp"), isA(String.class)); + assertThat(evaluate(source, "start_timestamp"), isA(String.class)); + assertThat(evaluate(source, "_metric_names_hash"), isA(String.class)); assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory)); - assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By")); - assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); + assertThat(evaluate(source, "unit"), equalTo("By")); + assertThat(evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); } public void testIngestMetricDataViaMetricExporter() throws Exception { @@ -150,13 +157,13 @@ public void testIngestMetricDataViaMetricExporter() throws Exception { ObjectPath search = search("metrics-generic.otel-default"); assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); var source = search.evaluate("hits.hits.0._source"); - assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now))); - assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now))); - assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class)); + assertThat(evaluate(source, "@timestamp"), equalTo(timestampAsString(now))); + assertThat(evaluate(source, "start_timestamp"), equalTo(timestampAsString(now))); + assertThat(evaluate(source, "_metric_names_hash"), isA(String.class)); assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory)); - assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By")); - assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch")); - assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); + assertThat(evaluate(source, "unit"), equalTo("By")); + assertThat(evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch")); + assertThat(evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); } public void testGroupingSameGroup() throws Exception { @@ -197,11 +204,11 @@ public void testGauge() throws Exception { createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now) ) ); - Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); - assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double")); - assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge")); - assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge")); + Map metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(metrics, "double_gauge.type"), equalTo("double")); + assertThat(evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge")); + assertThat(evaluate(metrics, "long_gauge.type"), equalTo("long")); + assertThat(evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge")); } public void testCounterTemporality() throws Exception { @@ -213,11 +220,11 @@ public void testCounterTemporality() throws Exception { ) ); - Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); - assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter")); - assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge")); + Map metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(metrics, "cumulative_counter.type"), equalTo("long")); + assertThat(evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter")); + assertThat(evaluate(metrics, "delta_counter.type"), equalTo("long")); + assertThat(evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge")); } public void testCounterMonotonicity() throws Exception { @@ -230,11 +237,96 @@ public void testCounterMonotonicity() throws Exception { ) ); - Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge")); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge")); + Map metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(metrics, "up_down_counter.type"), equalTo("long")); + assertThat(evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge")); + assertThat(evaluate(metrics, "up_down_counter_delta.type"), equalTo("long")); + assertThat(evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge")); + } + + public void testExponentialHistograms() throws Exception { + long now = Clock.getDefault().now(); + export(List.of(createExponentialHistogram(now, "exponential_histogram", DELTA, Attributes.empty()))); + + Map mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(mappings, "exponential_histogram.type"), equalTo("histogram")); + + // Get document and check values/counts array + ObjectPath search = search("metrics-generic.otel-default"); + assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(evaluate(source, "metrics.exponential_histogram.counts"), equalTo(List.of(2, 1, 10, 1, 2))); + assertThat(evaluate(source, "metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0))); + } + + public void testExponentialHistogramsAsAggregateMetricDouble() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createExponentialHistogram( + now, + "exponential_histogram_summary", + DELTA, + Attributes.of( + AttributeKey.stringArrayKey("elasticsearch.mapping.hints"), + List.of("aggregate_metric_double", "_doc_count") + ) + ) + ) + ); + + Map mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(mappings, "exponential_histogram_summary.type"), equalTo("aggregate_metric_double")); + + ObjectPath search = search("metrics-generic.otel-default"); + assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(evaluate(source, "_doc_count"), equalTo(16)); + assertThat(evaluate(source, "metrics.exponential_histogram_summary.value_count"), equalTo(16)); + assertThat(evaluate(source, "metrics.exponential_histogram_summary.sum"), equalTo(10.0)); + } + + public void testHistogram() throws Exception { + long now = Clock.getDefault().now(); + export(List.of(createHistogram(now, "histogram", DELTA, Attributes.empty()))); + + Map metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(metrics, "histogram.type"), equalTo("histogram")); + + // Get document and check values/counts array + ObjectPath search = search("metrics-generic.otel-default"); + assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(evaluate(source, "metrics.histogram.counts"), equalTo(List.of(1, 2, 3, 4, 5, 6))); + List values = evaluate(source, "metrics.histogram.values"); + assertThat(values, equalTo(List.of(1.0, 3.0, 5.0, 7.0, 9.0, 10.0))); + } + + public void testHistogramAsAggregateMetricDouble() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createHistogram( + now, + "histogram_summary", + DELTA, + Attributes.of( + AttributeKey.stringArrayKey("elasticsearch.mapping.hints"), + List.of("aggregate_metric_double", "_doc_count") + ) + ) + ) + ); + + Map metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(evaluate(metrics, "histogram_summary.type"), equalTo("aggregate_metric_double")); + + ObjectPath search = search("metrics-generic.otel-default"); + assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(evaluate(source, "_doc_count"), equalTo(21)); + assertThat(evaluate(source, "metrics.histogram_summary.value_count"), equalTo(21)); + assertThat(evaluate(source, "metrics.histogram_summary.sum"), equalTo(10.0)); } public void testTsidForBulkIsSame() throws Exception { @@ -316,7 +408,7 @@ private static Map getMapping(String target) throws IOException Map mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping"))) .evaluate(""); assertThat(mappings, aMapWithSize(1)); - Map mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings"); + Map mapping = evaluate(mappings.values().iterator().next(), "mappings"); assertThat(mapping, not(anEmptyMap())); return mapping; } @@ -420,4 +512,66 @@ public boolean isMonotonic() { return monotonic; } } + + private static MetricData createHistogram(long timeEpochNanos, String name, AggregationTemporality temporality, Attributes attributes) { + return ImmutableMetricData.createDoubleHistogram( + TEST_RESOURCE, + TEST_SCOPE, + name, + "Histogram Test", + "ms", + HistogramData.create( + temporality, + List.of( + HistogramPointData.create( + timeEpochNanos, + timeEpochNanos, + attributes, + 10, + false, + 0, + false, + 0, + List.of(2.0, 4.0, 6.0, 8.0, 10.0), + List.of(1L, 2L, 3L, 4L, 5L, 6L) + ) + ) + ) + ); + } + + private static MetricData createExponentialHistogram( + long timeEpochNanos, + String name, + AggregationTemporality temporality, + Attributes attributes + ) { + return ImmutableMetricData.createExponentialHistogram( + TEST_RESOURCE, + TEST_SCOPE, + name, + "Exponential Histogram Test", + "ms", + ImmutableExponentialHistogramData.create( + temporality, + List.of( + ImmutableExponentialHistogramPointData.create( + 0, + 10, + 10, + false, + 0, + false, + 0, + ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)), + ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)), + timeEpochNanos, + timeEpochNanos, + attributes, + List.of() + ) + ) + ) + ); + } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 43c48942f9f14..a3c3c59c6d28e 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -154,7 +154,7 @@ private static void handlePartialSuccess(ActionListener listene // the server MUST respond with HTTP 200 OK. // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage()); - listener.onResponse(new MetricsResponse(RestStatus.OK, response)); + listener.onResponse(new MetricsResponse(RestStatus.BAD_REQUEST, response)); } private static void handlePartialSuccess( diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java index 9dbaed3e3d2c4..6d70f4baf9034 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java @@ -8,6 +8,9 @@ package org.elasticsearch.xpack.oteldata.otlp.datapoint; import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; @@ -171,6 +174,143 @@ public boolean isValid(Set errors) { } } + record ExponentialHistogram(ExponentialHistogramDataPoint dataPoint, Metric metric) implements DataPoint { + + @Override + public long getTimestampUnixNano() { + return dataPoint.getTimeUnixNano(); + } + + @Override + public List getAttributes() { + return dataPoint.getAttributesList(); + } + + @Override + public long getStartTimestampUnixNano() { + return dataPoint.getStartTimeUnixNano(); + } + + @Override + public String getUnit() { + return metric.getUnit(); + } + + @Override + public String getMetricName() { + return metric.getName(); + } + + @Override + public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException { + if (mappingHints.aggregateMetricDouble()) { + buildAggregateMetricDouble(builder, dataPoint.getSum(), dataPoint.getCount()); + } else { + builder.startObject(); + builder.startArray("counts"); + HistogramConverter.counts(dataPoint, builder::value); + builder.endArray(); + builder.startArray("values"); + HistogramConverter.centroidValues(dataPoint, builder::value); + builder.endArray(); + builder.endObject(); + } + } + + @Override + public long getDocCount() { + return dataPoint.getCount(); + } + + @Override + public String getDynamicTemplate(MappingHints mappingHints) { + if (mappingHints.aggregateMetricDouble()) { + return "summary"; + } else { + return "histogram"; + } + } + + @Override + public boolean isValid(Set errors) { + if (metric.getExponentialHistogram().getAggregationTemporality() != AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) { + errors.add("cumulative exponential histogram metrics are not supported, ignoring " + metric.getName()); + return false; + } + return true; + } + } + + record Histogram(HistogramDataPoint dataPoint, Metric metric) implements DataPoint { + @Override + public long getTimestampUnixNano() { + return dataPoint.getTimeUnixNano(); + } + + @Override + public List getAttributes() { + return dataPoint.getAttributesList(); + } + + @Override + public long getStartTimestampUnixNano() { + return dataPoint.getStartTimeUnixNano(); + } + + @Override + public String getUnit() { + return metric.getUnit(); + } + + @Override + public String getMetricName() { + return metric.getName(); + } + + @Override + public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException { + if (mappingHints.aggregateMetricDouble()) { + buildAggregateMetricDouble(builder, dataPoint.getSum(), dataPoint.getCount()); + } else { + builder.startObject(); + builder.startArray("counts"); + HistogramConverter.counts(dataPoint, builder::value); + builder.endArray(); + builder.startArray("values"); + HistogramConverter.centroidValues(dataPoint, builder::value); + builder.endArray(); + builder.endObject(); + } + } + + @Override + public long getDocCount() { + return dataPoint.getCount(); + } + + @Override + public String getDynamicTemplate(MappingHints mappingHints) { + if (mappingHints.aggregateMetricDouble()) { + return "summary"; + } else { + return "histogram"; + } + } + + @Override + public boolean isValid(Set errors) { + if (metric.getHistogram().getAggregationTemporality() != AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) { + errors.add("cumulative histogram metrics are not supported, ignoring " + metric.getName()); + return false; + } + if (dataPoint.getBucketCountsCount() == 1 && dataPoint.getExplicitBoundsCount() == 0) { + errors.add("histogram with a single bucket and no explicit bounds is not supported, ignoring " + metric.getName()); + return false; + } + return true; + } + } + record Summary(SummaryDataPoint dataPoint, Metric metric) implements DataPoint { @Override diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java index 1a06aa10be693..e20edb7a5092a 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -69,12 +69,16 @@ public void groupDataPoints(ExportMetricsServiceRequest exportMetricsServiceRequ scopeGroup.addDataPoints(metric, metric.getGauge().getDataPointsList(), DataPoint.Number::new); break; case EXPONENTIAL_HISTOGRAM: - ignoredDataPoints += metric.getExponentialHistogram().getDataPointsCount(); - ignoredDataPointMessages.add("Exponential histogram is not supported yet. Dropping " + metric.getName()); + // for now, we convert exponential histograms to TDigest + // once we have native support for exponential histograms in ES, we'll migrate to that + scopeGroup.addDataPoints( + metric, + metric.getExponentialHistogram().getDataPointsList(), + DataPoint.ExponentialHistogram::new + ); break; case HISTOGRAM: - ignoredDataPoints += metric.getHistogram().getDataPointsCount(); - ignoredDataPointMessages.add("Histogram is not supported yet. Dropping " + metric.getName()); + scopeGroup.addDataPoints(metric, metric.getHistogram().getDataPointsList(), DataPoint.Histogram::new); break; case SUMMARY: scopeGroup.addDataPoints(metric, metric.getSummary().getDataPointsList(), DataPoint.Summary::new); diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverter.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverter.java new file mode 100644 index 0000000000000..acc9d48eb0755 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverter.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; + +import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils; + +/** + * Utility class to convert OpenTelemetry histogram data points into counts and centroid values + * so that we can use it with the {@code histogram} field type. + * This class provides methods to extract counts and centroid values from both + * {@link ExponentialHistogramDataPoint} and {@link HistogramDataPoint}. + * The algorithm is ported over from the OpenTelemetry collector's Elasticsearch exporter. + * @see + * Elasticsearch exporter on GitHub + * + */ +class HistogramConverter { + + static void counts(ExponentialHistogramDataPoint dp, CheckedLongConsumer counts) throws E { + ExponentialHistogramDataPoint.Buckets negative = dp.getNegative(); + + for (int i = negative.getBucketCountsCount() - 1; i >= 0; i--) { + long count = negative.getBucketCounts(i); + if (count != 0) { + counts.accept(count); + } + } + + long zeroCount = dp.getZeroCount(); + if (zeroCount > 0) { + counts.accept(zeroCount); + } + + ExponentialHistogramDataPoint.Buckets positive = dp.getPositive(); + for (int i = 0; i < positive.getBucketCountsCount(); i++) { + long count = positive.getBucketCounts(i); + if (count != 0) { + counts.accept(count); + } + } + } + + /** + * @see + * ToTDigest function + * + */ + static void centroidValues(ExponentialHistogramDataPoint dp, CheckedDoubleConsumer values) throws E { + int scale = dp.getScale(); + ExponentialHistogramDataPoint.Buckets negative = dp.getNegative(); + + int offset = negative.getOffset(); + for (int i = negative.getBucketCountsCount() - 1; i >= 0; i--) { + long count = negative.getBucketCounts(i); + if (count != 0) { + double lb = -ExponentialScaleUtils.getUpperBucketBoundary(offset + i, scale); + double ub = -ExponentialScaleUtils.getLowerBucketBoundary(offset + i, scale); + values.accept(lb + (ub - lb) / 2); + } + } + + long zeroCount = dp.getZeroCount(); + if (zeroCount > 0) { + values.accept(0.0); + } + + ExponentialHistogramDataPoint.Buckets positive = dp.getPositive(); + offset = positive.getOffset(); + for (int i = 0; i < positive.getBucketCountsCount(); i++) { + long count = positive.getBucketCounts(i); + if (count != 0) { + double lb = ExponentialScaleUtils.getLowerBucketBoundary(offset + i, scale); + double ub = ExponentialScaleUtils.getUpperBucketBoundary(offset + i, scale); + values.accept(lb + (ub - lb) / 2); + } + } + } + + static void counts(HistogramDataPoint dp, CheckedLongConsumer counts) throws E { + for (int i = 0; i < dp.getBucketCountsCount(); i++) { + long count = dp.getBucketCounts(i); + if (count != 0) { + counts.accept(count); + } + } + } + + /** + * @see + * histogramToValue function + * + */ + static void centroidValues(HistogramDataPoint dp, CheckedDoubleConsumer values) throws E { + int size = dp.getBucketCountsCount(); + for (int i = 0; i < size; i++) { + long count = dp.getBucketCounts(i); + if (count != 0) { + double value; + if (i == 0) { + // (-infinity, explicit_bounds[i]] + value = dp.getExplicitBounds(i); + if (value > 0) { + value /= 2; + } + } else if (i == size - 1) { + // (explicit_bounds[i], +infinity) + value = dp.getExplicitBounds(i - 1); + } else { + // [explicit_bounds[i-1], explicit_bounds[i]) + // Use the midpoint between the boundaries. + value = dp.getExplicitBounds(i - 1) + (dp.getExplicitBounds(i) - dp.getExplicitBounds(i - 1)) / 2.0; + } + values.accept(value); + } + } + } + + interface CheckedLongConsumer { + void accept(long value) throws E; + } + + interface CheckedDoubleConsumer { + void accept(double value) throws E; + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java index e5a16745abd33..51014fbb64a97 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java @@ -13,7 +13,11 @@ import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.common.v1.KeyValueList; import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogram; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Histogram; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; @@ -97,6 +101,34 @@ public static Metric createGaugeMetric(String name, String unit, List dataPoints, + AggregationTemporality temporality + ) { + return Metric.newBuilder() + .setName(name) + .setUnit(unit) + .setExponentialHistogram( + ExponentialHistogram.newBuilder().setAggregationTemporality(temporality).addAllDataPoints(dataPoints).build() + ) + .build(); + } + + public static Metric createHistogramMetric( + String name, + String unit, + List dataPoints, + AggregationTemporality temporality + ) { + return Metric.newBuilder() + .setName(name) + .setUnit(unit) + .setHistogram(Histogram.newBuilder().setAggregationTemporality(temporality).addAllDataPoints(dataPoints).build()) + .build(); + } + public static Metric createSumMetric( String name, String unit, diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointExponentialHistogramTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointExponentialHistogramTests.java new file mode 100644 index 0000000000000..0532c5419ede6 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointExponentialHistogramTests.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.metrics.v1.ExponentialHistogram; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints; + +import java.util.HashSet; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.mappingHints; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +public class DataPointExponentialHistogramTests extends ESTestCase { + + private final HashSet validationErrors = new HashSet<>(); + + public void testExponentialHistogram() { + DataPoint.ExponentialHistogram doubleGauge = new DataPoint.ExponentialHistogram( + ExponentialHistogramDataPoint.newBuilder().build(), + Metric.newBuilder() + .setExponentialHistogram(ExponentialHistogram.newBuilder().setAggregationTemporality(AGGREGATION_TEMPORALITY_DELTA).build()) + .build() + ); + assertThat(doubleGauge.getDynamicTemplate(MappingHints.empty()), equalTo("histogram")); + assertThat(doubleGauge.isValid(validationErrors), equalTo(true)); + assertThat(validationErrors, empty()); + } + + public void testExponentialHistogramMappingHint() { + DataPoint.ExponentialHistogram doubleGauge = new DataPoint.ExponentialHistogram( + ExponentialHistogramDataPoint.newBuilder().build(), + Metric.newBuilder() + .setExponentialHistogram(ExponentialHistogram.newBuilder().setAggregationTemporality(AGGREGATION_TEMPORALITY_DELTA).build()) + .build() + ); + assertThat( + doubleGauge.getDynamicTemplate(MappingHints.fromAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE))), + equalTo("summary") + ); + assertThat(doubleGauge.isValid(validationErrors), equalTo(true)); + assertThat(validationErrors, empty()); + } + + public void testExponentialHistogramUnsupportedTemporality() { + DataPoint.ExponentialHistogram doubleGauge = new DataPoint.ExponentialHistogram( + ExponentialHistogramDataPoint.newBuilder().build(), + Metric.newBuilder() + .setExponentialHistogram( + ExponentialHistogram.newBuilder().setAggregationTemporality(AGGREGATION_TEMPORALITY_CUMULATIVE).build() + ) + .build() + ); + assertThat(doubleGauge.getDynamicTemplate(MappingHints.empty()), equalTo("histogram")); + assertThat(doubleGauge.isValid(validationErrors), equalTo(false)); + assertThat(validationErrors, contains(containsString("cumulative exponential histogram metrics are not supported"))); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 826a558061415..2b2071acc171a 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.oteldata.otlp.datapoint; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import org.elasticsearch.test.ESTestCase; @@ -18,8 +20,11 @@ import java.util.concurrent.atomic.AtomicInteger; import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createExponentialHistogramMetric; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createHistogramMetric; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createMetricsRequest; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createResourceMetrics; @@ -49,11 +54,25 @@ public void testGroupingSameGroup() throws Exception { true, AGGREGATION_TEMPORALITY_CUMULATIVE ), + createExponentialHistogramMetric( + "http.request.duration", + "", + List.of( + ExponentialHistogramDataPoint.newBuilder().setTimeUnixNano(nowUnixNanos).setStartTimeUnixNano(nowUnixNanos).build() + ), + AGGREGATION_TEMPORALITY_DELTA + ), + createHistogramMetric( + "http.request.size", + "", + List.of(HistogramDataPoint.newBuilder().setTimeUnixNano(nowUnixNanos).setStartTimeUnixNano(nowUnixNanos).build()), + AGGREGATION_TEMPORALITY_DELTA + ), createSummaryMetric("summary", "", List.of(createSummaryDataPoint(nowUnixNanos, List.of()))) ) ); context.groupDataPoints(metricsRequest); - assertEquals(4, context.totalDataPoints()); + assertEquals(6, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); assertEquals("", context.getIgnoredDataPointsMessage()); diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointHistogramTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointHistogramTests.java new file mode 100644 index 0000000000000..a14502448aae1 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointHistogramTests.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.metrics.v1.Histogram; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints; + +import java.util.HashSet; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +public class DataPointHistogramTests extends ESTestCase { + + private final HashSet validationErrors = new HashSet<>(); + + public void testExponentialHistogram() { + DataPoint.Histogram doubleGauge = new DataPoint.Histogram( + HistogramDataPoint.newBuilder().build(), + Metric.newBuilder() + .setHistogram(Histogram.newBuilder().setAggregationTemporality(AGGREGATION_TEMPORALITY_DELTA).build()) + .build() + ); + assertThat(doubleGauge.getDynamicTemplate(MappingHints.empty()), equalTo("histogram")); + assertThat(doubleGauge.isValid(validationErrors), equalTo(true)); + assertThat(validationErrors, empty()); + } + + public void testExponentialHistogramUnsupportedTemporality() { + DataPoint.Histogram doubleGauge = new DataPoint.Histogram( + HistogramDataPoint.newBuilder().build(), + Metric.newBuilder() + .setHistogram(Histogram.newBuilder().setAggregationTemporality(AGGREGATION_TEMPORALITY_CUMULATIVE).build()) + .build() + ); + assertThat(doubleGauge.getDynamicTemplate(MappingHints.empty()), equalTo("histogram")); + assertThat(doubleGauge.isValid(validationErrors), equalTo(false)); + assertThat(validationErrors, contains(containsString("cumulative histogram metrics are not supported"))); + } + + public void testExponentialHistogramInvalidBucketCountWithoutBounds() { + DataPoint.Histogram doubleGauge = new DataPoint.Histogram( + HistogramDataPoint.newBuilder().addBucketCounts(1).build(), + Metric.newBuilder() + .setHistogram(Histogram.newBuilder().setAggregationTemporality(AGGREGATION_TEMPORALITY_DELTA).build()) + .build() + ); + assertThat(doubleGauge.getDynamicTemplate(MappingHints.empty()), equalTo("histogram")); + assertThat(doubleGauge.isValid(validationErrors), equalTo(false)); + assertThat(validationErrors, contains(containsString("histogram with a single bucket and no explicit bounds is not supported"))); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/ExponentialHistogramConverterAccuracyTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/ExponentialHistogramConverterAccuracyTests.java new file mode 100644 index 0000000000000..5a7e1fd90e90f --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/ExponentialHistogramConverterAccuracyTests.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; + +import org.apache.commons.math3.distribution.BetaDistribution; +import org.apache.commons.math3.distribution.ExponentialDistribution; +import org.apache.commons.math3.distribution.GammaDistribution; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.distribution.RealDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.distribution.WeibullDistribution; +import org.apache.commons.math3.random.Well19937c; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.exponentialhistogram.BucketIterator; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramTestCase; +import org.elasticsearch.exponentialhistogram.QuantileAccuracyTests; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; +import org.elasticsearch.tdigest.TDigest; +import org.elasticsearch.tdigest.arrays.TDigestArrays; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.DoubleFunction; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ExponentialHistogramConverterAccuracyTests extends ExponentialHistogramTestCase { + public static final double[] QUANTILES_TO_TEST = { 0, 0.0000001, 0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999999, 1.0 }; + + private static final TDigestArrays arrays = new MemoryTrackingTDigestArrays(new NoopCircuitBreaker("default-wrapper-tdigest-arrays")); + public static final int TDIGEST_COMPRESSION = 100; + + public void testUniformDistribution() { + testDistributionQuantileAccuracy(new UniformRealDistribution(new Well19937c(randomInt()), 0, 100)); + } + + public void testNormalDistribution() { + testDistributionQuantileAccuracy(new NormalDistribution(new Well19937c(randomInt()), 100, 15)); + } + + public void testExponentialDistribution() { + testDistributionQuantileAccuracy(new ExponentialDistribution(new Well19937c(randomInt()), 10)); + } + + public void testLogNormalDistribution() { + testDistributionQuantileAccuracy(new LogNormalDistribution(new Well19937c(randomInt()), 0, 1)); + } + + public void testGammaDistribution() { + testDistributionQuantileAccuracy(new GammaDistribution(new Well19937c(randomInt()), 2, 5)); + } + + public void testBetaDistribution() { + testDistributionQuantileAccuracy(new BetaDistribution(new Well19937c(randomInt()), 2, 5)); + } + + public void testWeibullDistribution() { + testDistributionQuantileAccuracy(new WeibullDistribution(new Well19937c(randomInt()), 2, 5)); + } + + private void testDistributionQuantileAccuracy(RealDistribution distribution) { + double[] samples = QuantileAccuracyTests.generateSamples(distribution, between(3_000, 10_000)); + int numBuckets = randomIntBetween(50, 100); + ExponentialHistogram exponentialHistogram = createAutoReleasedHistogram(numBuckets, samples); + ExponentialHistogramDataPoint otlpHistogram = convertToOtlpHistogram(exponentialHistogram); + + double rawTDigestMaxError; + try (TDigest rawTDigest = TDigest.createAvlTreeDigest(arrays, TDIGEST_COMPRESSION)) { + for (double sample : samples) { + rawTDigest.add(sample); + } + rawTDigestMaxError = getMaxRelativeError(samples, rawTDigest::quantile); + } + + double convertedTDigestMaxError; + try (TDigest convertedTDigest = convertToTDigest(otlpHistogram)) { + convertedTDigestMaxError = getMaxRelativeError(samples, convertedTDigest::quantile); + } + double exponentialHistogramMaxError = QuantileAccuracyTests.getMaximumRelativeError(samples, numBuckets); + double combinedRelativeError = rawTDigestMaxError + exponentialHistogramMaxError; + // It's hard to reason about the upper bound of the combined error for this conversion, + // so we just check that it's not worse than twice the sum of the individual errors. + // For a lower number of buckets or samples than the ones we're testing with here, + // the error can be even higher than this. + // The same is true when using a different TDigest implementation that's less accurate (such as hybrid or merging). + assertThat(convertedTDigestMaxError, lessThanOrEqualTo(combinedRelativeError * 2)); + } + + private static TDigest convertToTDigest(ExponentialHistogramDataPoint otlpHistogram) { + TDigest result = TDigest.createAvlTreeDigest(arrays, 100); + List centroidValues = new ArrayList<>(); + HistogramConverter.centroidValues(otlpHistogram, centroidValues::add); + List counts = new ArrayList<>(); + HistogramConverter.counts(otlpHistogram, counts::add); + assertEquals(centroidValues.size(), counts.size()); + for (int i = 0; i < centroidValues.size(); i++) { + if (counts.get(i) > 0) { + result.add(centroidValues.get(i), counts.get(i)); + } + } + return result; + } + + private ExponentialHistogramDataPoint convertToOtlpHistogram(ExponentialHistogram histogram) { + ExponentialHistogramDataPoint.Builder builder = ExponentialHistogramDataPoint.newBuilder(); + builder.setScale(histogram.scale()); + builder.setZeroCount(histogram.zeroBucket().count()); + builder.setZeroThreshold(histogram.zeroBucket().zeroThreshold()); + builder.setPositive(convertBuckets(histogram.positiveBuckets())); + builder.setNegative(convertBuckets(histogram.negativeBuckets())); + return builder.build(); + } + + private static ExponentialHistogramDataPoint.Buckets.Builder convertBuckets(ExponentialHistogram.Buckets buckets) { + ExponentialHistogramDataPoint.Buckets.Builder result = ExponentialHistogramDataPoint.Buckets.newBuilder(); + BucketIterator it = buckets.iterator(); + if (it.hasNext() == false) { + return result; + } + result.setOffset((int) it.peekIndex()); + for (long index = it.peekIndex(); it.hasNext(); index++) { + int missingBuckets = (int) (it.peekIndex() - index); + for (int i = 0; i < missingBuckets; i++) { + result.addBucketCounts(0); + index++; + } + result.addBucketCounts(it.peekCount()); + it.advance(); + } + return result; + } + + private double getMaxRelativeError(double[] values, DoubleFunction quantileFunction) { + Arrays.sort(values); + double maxError = 0; + // Compare histogram quantiles with exact quantiles + for (double q : QUANTILES_TO_TEST) { + double percentileRank = q * (values.length - 1); + int lowerRank = (int) Math.floor(percentileRank); + int upperRank = (int) Math.ceil(percentileRank); + double upperFactor = percentileRank - lowerRank; + + if (values[lowerRank] < 0 && values[upperRank] > 0) { + // the percentile lies directly between a sign change and we interpolate linearly in-between + // in this case the relative error bound does not hold + continue; + } + double exactValue = values[lowerRank] * (1 - upperFactor) + values[upperRank] * upperFactor; + + double histoValue = quantileFunction.apply(q); + + // Skip comparison if exact value is close to zero to avoid false-positives due to numerical imprecision + if (Math.abs(exactValue) < 1e-100) { + continue; + } + + double relativeError = Math.abs(histoValue - exactValue) / Math.abs(exactValue); + maxError = Math.max(maxError, relativeError); + } + return maxError; + } + +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/ExponentialHistogramConverterTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/ExponentialHistogramConverterTests.java new file mode 100644 index 0000000000000..eceb08b85edcb --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/ExponentialHistogramConverterTests.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint.Buckets; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint.newBuilder; + +/** + * @see + * OpenTelemetry Collector Exponential Histogram Tests + * + */ +public class ExponentialHistogramConverterTests extends ESTestCase { + + private final ExponentialHistogramDataPoint dataPoint; + private final List expectedCounts; + private final List expectedValues; + + public ExponentialHistogramConverterTests( + String name, + ExponentialHistogramDataPoint.Builder builder, + List expectedCounts, + List expectedValues + ) { + this.dataPoint = builder.build(); + this.expectedCounts = expectedCounts; + this.expectedValues = expectedValues; + } + + public void testExponentialHistograms() { + List actualCounts = new ArrayList<>(); + HistogramConverter.counts(dataPoint, actualCounts::add); + assertEquals(expectedCounts, actualCounts); + + List actualValues = new ArrayList<>(); + HistogramConverter.centroidValues(dataPoint, actualValues::add); + assertEquals(expectedValues.size(), actualValues.size()); + for (int i = 0; i < expectedValues.size(); i++) { + assertEquals(expectedValues.get(i), actualValues.get(i), 1e-10); + } + } + + @ParametersFactory(argumentFormatting = "%1$s") + public static List testCases() { + return List.of( + new Object[] { "empty", newBuilder(), List.of(), List.of() }, + new Object[] { "empty, scale=1", newBuilder().setScale(1), List.of(), List.of() }, + new Object[] { "empty, scale=-1", newBuilder().setScale(-1), List.of(), List.of() }, + new Object[] { "zeros", newBuilder().setZeroCount(1), List.of(1L), List.of(0.0) }, + new Object[] { + "scale=0", + newBuilder().setZeroCount(1) + .setPositive(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L, 1L), + List.of(-3.0, -1.5, 0.0, 1.5, 3.0) }, + new Object[] { + "scale=0, no zeros", + newBuilder().setPositive(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L), + List.of(-3.0, -1.5, 1.5, 3.0) }, + new Object[] { + "scale=0, offset=1", + newBuilder().setZeroCount(1) + .setPositive(Buckets.newBuilder().setOffset(1).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(1).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L, 1L), + List.of(-6.0, -3.0, 0.0, 3.0, 6.0) }, + new Object[] { + "scale=0, offset=-1", + newBuilder().setZeroCount(1) + .setPositive(Buckets.newBuilder().setOffset(-1).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(-1).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L, 1L), + List.of(-1.5, -0.75, 0.0, 0.75, 1.5) }, + new Object[] { + "scale=0, different offsets", + newBuilder().setZeroCount(1) + .setPositive(Buckets.newBuilder().setOffset(-1).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(1).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L, 1L), + List.of(-6.0, -3.0, 0.0, 0.75, 1.5) }, + new Object[] { + "scale=-1", + newBuilder().setScale(-1) + .setZeroCount(1) + .setPositive(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L, 1L), + List.of(-10.0, -2.5, 0.0, 2.5, 10.0) }, + new Object[] { + "scale=1", + newBuilder().setScale(1) + .setZeroCount(1) + .setPositive(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))), + List.of(1L, 1L, 1L, 1L, 1L), + List.of(-1.7071067811865475, -1.2071067811865475, 0.0, 1.2071067811865475, 1.7071067811865475) } + ); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverterTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverterTests.java new file mode 100644 index 0000000000000..bc940dfd6d492 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/HistogramConverterTests.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Histogram; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class HistogramConverterTests extends ESTestCase { + + @SuppressWarnings("unused") + private final String name; + private final HistogramDataPoint dataPoint; + private final List expectedCounts; + private final List expectedValues; + private final boolean valid; + + public HistogramConverterTests( + String name, + HistogramDataPoint dataPoint, + List expectedCounts, + List expectedValues, + boolean valid + ) { + this.name = name; + this.dataPoint = dataPoint; + this.expectedCounts = expectedCounts; + this.expectedValues = expectedValues; + this.valid = valid; + } + + public void testHistograms() throws Exception { + DataPoint.Histogram histogram = new DataPoint.Histogram( + dataPoint, + Metric.newBuilder() + .setHistogram( + Histogram.newBuilder() + .setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) + .addDataPoints(dataPoint) + ) + .build() + ); + assertThat(histogram.isValid(new HashSet<>()), equalTo(valid)); + if (valid == false) { + return; + } + List actualCounts = new ArrayList<>(); + HistogramConverter.counts(dataPoint, actualCounts::add); + List actualValues = new ArrayList<>(); + HistogramConverter.centroidValues(dataPoint, actualValues::add); + + assertEquals(expectedCounts, actualCounts); + assertEquals(expectedValues.size(), actualValues.size()); + for (int i = 0; i < expectedValues.size(); i++) { + assertEquals(expectedValues.get(i), actualValues.get(i), 1e-10); + } + } + + @ParametersFactory(argumentFormatting = "%1$s") + public static List testCases() { + return List.of( + new Object[] { "empty", HistogramDataPoint.newBuilder().build(), List.of(), List.of(), true }, + new Object[] { + "single bucket", + HistogramDataPoint.newBuilder().addBucketCounts(10L).addExplicitBounds(5.0).build(), + List.of(10L), + List.of(2.5), + true }, + new Object[] { "single count", HistogramDataPoint.newBuilder().addBucketCounts(10L).build(), List.of(10L), List.of(), false }, + new Object[] { + "two buckets", + HistogramDataPoint.newBuilder().addAllBucketCounts(List.of(5L, 10L)).addExplicitBounds(5.0).build(), + List.of(5L, 10L), + List.of(2.5, 5.0), + true }, + new Object[] { + "three buckets", + HistogramDataPoint.newBuilder().addAllBucketCounts(List.of(5L, 10L, 15L)).addAllExplicitBounds(List.of(5.0, 10.0)).build(), + List.of(5L, 10L, 15L), + List.of(2.5, 7.5, 10.0), + true }, + new Object[] { + "zero count buckets", + HistogramDataPoint.newBuilder().addAllBucketCounts(List.of(5L, 0L, 15L)).addAllExplicitBounds(List.of(5.0, 10.0)).build(), + List.of(5L, 15L), + List.of(2.5, 10.0), + true }, + new Object[] { + "negative bounds", + HistogramDataPoint.newBuilder() + .addAllBucketCounts(List.of(5L, 10L, 15L)) + .addAllExplicitBounds(List.of(-10.0, 10.0)) + .build(), + List.of(5L, 10L, 15L), + List.of(-10.0, 0.0, 10.0), + true }, + new Object[] { + "all negative bounds", + HistogramDataPoint.newBuilder().addAllBucketCounts(List.of(5L, 10L)).addExplicitBounds(-5.0).build(), + List.of(5L, 10L), + List.of(-5.0, -5.0), + true }, + new Object[] { + "multiple buckets with varying distances", + HistogramDataPoint.newBuilder() + .addAllBucketCounts(List.of(5L, 10L, 15L, 20L)) + .addAllExplicitBounds(List.of(1.0, 5.0, 20.0)) + .build(), + List.of(5L, 10L, 15L, 20L), + List.of(0.5, 3.0, 12.5, 20.0), + true } + ); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java index 01fa605d321cd..ff0fbce9979f1 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java @@ -10,6 +10,8 @@ import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; import io.opentelemetry.proto.resource.v1.Resource; @@ -34,8 +36,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createExponentialHistogramMetric; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createHistogramMetric; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSummaryMetric; @@ -200,6 +205,149 @@ public void testEmptyFields() throws IOException { assertThat(doc.evaluate("unit"), is(nullValue())); } + public void testExponentialHistogram() throws Exception { + Resource resource = Resource.newBuilder().build(); + InstrumentationScope scope = InstrumentationScope.newBuilder().build(); + + DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( + resource, + null, + scope, + null, + List.of(), + "", + TargetIndex.defaultMetrics() + ); + dataPointGroup.addDataPoint( + Set.of(), + new DataPoint.ExponentialHistogram( + ExponentialHistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setZeroCount(1) + .setPositive(ExponentialHistogramDataPoint.Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(ExponentialHistogramDataPoint.Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .build(), + createExponentialHistogramMetric("exponential_histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) + ) + ); + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + assertThat(doc.evaluate("metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0))); + assertThat(doc.evaluate("metrics.exponential_histogram.counts"), equalTo(List.of(1, 1, 1, 1, 1))); + assertThat(dynamicTemplates, hasEntry("metrics.exponential_histogram", "histogram")); + } + + public void testExponentialHistogramAsAggregateMetricDouble() throws Exception { + Resource resource = Resource.newBuilder().build(); + InstrumentationScope scope = InstrumentationScope.newBuilder().build(); + + DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( + resource, + null, + scope, + null, + List.of(), + "", + TargetIndex.defaultMetrics() + ); + dataPointGroup.addDataPoint( + Set.of(), + new DataPoint.ExponentialHistogram( + ExponentialHistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setSum(42) + .setCount(1L) + .addAllAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE)) + .build(), + createExponentialHistogramMetric("histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) + ) + ); + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + assertThat(doc.evaluate("metrics.histogram.sum"), equalTo(42.0)); + assertThat(doc.evaluate("metrics.histogram.value_count"), equalTo(1)); + assertThat(dynamicTemplates, hasEntry("metrics.histogram", "summary")); + } + + public void testHistogram() throws Exception { + Resource resource = Resource.newBuilder().build(); + InstrumentationScope scope = InstrumentationScope.newBuilder().build(); + + DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( + resource, + null, + scope, + null, + List.of(), + "", + TargetIndex.defaultMetrics() + ); + dataPointGroup.addDataPoint( + Set.of(), + new DataPoint.Histogram( + HistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .addBucketCounts(10L) + .addExplicitBounds(5.0) + .build(), + createHistogramMetric("histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) + ) + ); + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + assertThat(doc.evaluate("metrics.histogram.values"), equalTo(List.of(2.5))); + assertThat(doc.evaluate("metrics.histogram.counts"), equalTo(List.of(10))); + assertThat(dynamicTemplates, hasEntry("metrics.histogram", "histogram")); + } + + public void testHistogramAsAggregateMetricDouble() throws Exception { + Resource resource = Resource.newBuilder().build(); + InstrumentationScope scope = InstrumentationScope.newBuilder().build(); + + DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( + resource, + null, + scope, + null, + List.of(), + "", + TargetIndex.defaultMetrics() + ); + dataPointGroup.addDataPoint( + Set.of(), + new DataPoint.Histogram( + HistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setSum(42) + .setCount(1L) + .addAllAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE)) + .build(), + createHistogramMetric("histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) + ) + ); + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + assertThat(doc.evaluate("metrics.histogram.sum"), equalTo(42.0)); + assertThat(doc.evaluate("metrics.histogram.value_count"), equalTo(1)); + assertThat(dynamicTemplates, hasEntry("metrics.histogram", "summary")); + } + public void testSummary() throws Exception { Resource resource = Resource.newBuilder().build(); InstrumentationScope scope = InstrumentationScope.newBuilder().build();