Skip to content

Commit 63840f1

Browse files
authored
OTLP: add support for histograms (#133902)
1 parent 3d2d765 commit 63840f1

File tree

17 files changed

+1226
-36
lines changed

17 files changed

+1226
-36
lines changed

libs/exponential-histogram/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// TODO: publish this when ready?
1111
//apply plugin: 'elasticsearch.publish'
1212
apply plugin: 'elasticsearch.build'
13+
apply plugin: 'elasticsearch.internal-test-artifact'
1314

1415
dependencies {
1516
api project(':libs:core')

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ExponentialHistogram createAutoReleasedHistogram(Consumer<ExponentialHistogramBu
7070
return result;
7171
}
7272

73-
ExponentialHistogram createAutoReleasedHistogram(int numBuckets, double... values) {
73+
protected ExponentialHistogram createAutoReleasedHistogram(int numBuckets, double... values) {
7474
ReleasableExponentialHistogram result = ExponentialHistogram.create(numBuckets, breaker(), values);
7575
releaseBeforeEnd.add(result);
7676
return result;

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/QuantileAccuracyTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ private void testDistributionQuantileAccuracy(RealDistribution distribution) {
221221
testQuantileAccuracy(values, bucketCount);
222222
}
223223

224-
private static double[] generateSamples(RealDistribution distribution, int sampleSize) {
224+
public static double[] generateSamples(RealDistribution distribution, int sampleSize) {
225225
double[] values = new double[sampleSize];
226226
for (int i = 0; i < sampleSize; i++) {
227227
values[i] = distribution.sample();
@@ -276,7 +276,7 @@ private double testQuantileAccuracy(double[] values, int bucketCount) {
276276
* The error depends on the raw values put into the histogram and the number of buckets allowed.
277277
* This is an implementation of the error bound computation proven by Theorem 3 in the <a href="https://arxiv.org/pdf/2004.08604">UDDSketch paper</a>
278278
*/
279-
private static double getMaximumRelativeError(double[] values, int bucketCount) {
279+
public static double getMaximumRelativeError(double[] values, int bucketCount) {
280280
HashSet<Long> usedPositiveIndices = new HashSet<>();
281281
HashSet<Long> usedNegativeIndices = new HashSet<>();
282282
int bestPossibleScale = MAX_SCALE;

x-pack/plugin/otel-data/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ dependencies {
5151
api project(":libs:exponential-histogram")
5252
compileOnly project(path: xpackModule('core'))
5353
testImplementation(testArtifact(project(xpackModule('core'))))
54+
testImplementation(testArtifact(project(":libs:exponential-histogram")))
55+
testImplementation('org.apache.commons:commons-math3:3.6.1')
5456
clusterModules project(':modules:data-streams')
5557
clusterModules project(':modules:ingest-common')
5658
clusterModules project(':modules:ingest-geoip')

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java

Lines changed: 181 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.oteldata.otlp;
99

10+
import io.opentelemetry.api.common.AttributeKey;
1011
import io.opentelemetry.api.common.Attributes;
1112
import io.opentelemetry.api.metrics.Meter;
1213
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
@@ -15,9 +16,14 @@
1516
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1617
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
1718
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
19+
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
20+
import io.opentelemetry.sdk.metrics.data.HistogramData;
21+
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
1822
import io.opentelemetry.sdk.metrics.data.MetricData;
1923
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
2024
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
25+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
26+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
2127
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
2228
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
2329
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
@@ -48,6 +54,7 @@
4854
import static io.opentelemetry.api.common.AttributeKey.stringKey;
4955
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
5056
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
57+
import static org.elasticsearch.test.rest.ObjectPath.evaluate;
5158
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
5259
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
5360
import static org.hamcrest.Matchers.aMapWithSize;
@@ -133,12 +140,12 @@ public void testIngestMetricViaMeterProvider() throws Exception {
133140
ObjectPath search = search("metrics-generic.otel-default");
134141
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
135142
var source = search.evaluate("hits.hits.0._source");
136-
assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class));
137-
assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class));
138-
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
143+
assertThat(evaluate(source, "@timestamp"), isA(String.class));
144+
assertThat(evaluate(source, "start_timestamp"), isA(String.class));
145+
assertThat(evaluate(source, "_metric_names_hash"), isA(String.class));
139146
assertThat(ObjectPath.<Number>evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory));
140-
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
141-
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
147+
assertThat(evaluate(source, "unit"), equalTo("By"));
148+
assertThat(evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
142149
}
143150

144151
public void testIngestMetricDataViaMetricExporter() throws Exception {
@@ -150,13 +157,13 @@ public void testIngestMetricDataViaMetricExporter() throws Exception {
150157
ObjectPath search = search("metrics-generic.otel-default");
151158
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
152159
var source = search.evaluate("hits.hits.0._source");
153-
assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
154-
assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
155-
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
160+
assertThat(evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
161+
assertThat(evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
162+
assertThat(evaluate(source, "_metric_names_hash"), isA(String.class));
156163
assertThat(ObjectPath.<Number>evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory));
157-
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
158-
assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
159-
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
164+
assertThat(evaluate(source, "unit"), equalTo("By"));
165+
assertThat(evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
166+
assertThat(evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
160167
}
161168

162169
public void testGroupingSameGroup() throws Exception {
@@ -197,11 +204,11 @@ public void testGauge() throws Exception {
197204
createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now)
198205
)
199206
);
200-
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
201-
assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double"));
202-
assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
203-
assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long"));
204-
assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
207+
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
208+
assertThat(evaluate(metrics, "double_gauge.type"), equalTo("double"));
209+
assertThat(evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
210+
assertThat(evaluate(metrics, "long_gauge.type"), equalTo("long"));
211+
assertThat(evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
205212
}
206213

207214
public void testCounterTemporality() throws Exception {
@@ -213,11 +220,11 @@ public void testCounterTemporality() throws Exception {
213220
)
214221
);
215222

216-
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
217-
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
218-
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
219-
assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long"));
220-
assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
223+
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
224+
assertThat(evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
225+
assertThat(evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
226+
assertThat(evaluate(metrics, "delta_counter.type"), equalTo("long"));
227+
assertThat(evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
221228
}
222229

223230
public void testCounterMonotonicity() throws Exception {
@@ -230,11 +237,96 @@ public void testCounterMonotonicity() throws Exception {
230237
)
231238
);
232239

233-
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
234-
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long"));
235-
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
236-
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
237-
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
240+
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
241+
assertThat(evaluate(metrics, "up_down_counter.type"), equalTo("long"));
242+
assertThat(evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
243+
assertThat(evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
244+
assertThat(evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
245+
}
246+
247+
public void testExponentialHistograms() throws Exception {
248+
long now = Clock.getDefault().now();
249+
export(List.of(createExponentialHistogram(now, "exponential_histogram", DELTA, Attributes.empty())));
250+
251+
Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
252+
assertThat(evaluate(mappings, "exponential_histogram.type"), equalTo("histogram"));
253+
254+
// Get document and check values/counts array
255+
ObjectPath search = search("metrics-generic.otel-default");
256+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
257+
var source = search.evaluate("hits.hits.0._source");
258+
assertThat(evaluate(source, "metrics.exponential_histogram.counts"), equalTo(List.of(2, 1, 10, 1, 2)));
259+
assertThat(evaluate(source, "metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0)));
260+
}
261+
262+
public void testExponentialHistogramsAsAggregateMetricDouble() throws Exception {
263+
long now = Clock.getDefault().now();
264+
export(
265+
List.of(
266+
createExponentialHistogram(
267+
now,
268+
"exponential_histogram_summary",
269+
DELTA,
270+
Attributes.of(
271+
AttributeKey.stringArrayKey("elasticsearch.mapping.hints"),
272+
List.of("aggregate_metric_double", "_doc_count")
273+
)
274+
)
275+
)
276+
);
277+
278+
Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
279+
assertThat(evaluate(mappings, "exponential_histogram_summary.type"), equalTo("aggregate_metric_double"));
280+
281+
ObjectPath search = search("metrics-generic.otel-default");
282+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
283+
var source = search.evaluate("hits.hits.0._source");
284+
assertThat(evaluate(source, "_doc_count"), equalTo(16));
285+
assertThat(evaluate(source, "metrics.exponential_histogram_summary.value_count"), equalTo(16));
286+
assertThat(evaluate(source, "metrics.exponential_histogram_summary.sum"), equalTo(10.0));
287+
}
288+
289+
public void testHistogram() throws Exception {
290+
long now = Clock.getDefault().now();
291+
export(List.of(createHistogram(now, "histogram", DELTA, Attributes.empty())));
292+
293+
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
294+
assertThat(evaluate(metrics, "histogram.type"), equalTo("histogram"));
295+
296+
// Get document and check values/counts array
297+
ObjectPath search = search("metrics-generic.otel-default");
298+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
299+
var source = search.evaluate("hits.hits.0._source");
300+
assertThat(evaluate(source, "metrics.histogram.counts"), equalTo(List.of(1, 2, 3, 4, 5, 6)));
301+
List<Double> values = evaluate(source, "metrics.histogram.values");
302+
assertThat(values, equalTo(List.of(1.0, 3.0, 5.0, 7.0, 9.0, 10.0)));
303+
}
304+
305+
public void testHistogramAsAggregateMetricDouble() throws Exception {
306+
long now = Clock.getDefault().now();
307+
export(
308+
List.of(
309+
createHistogram(
310+
now,
311+
"histogram_summary",
312+
DELTA,
313+
Attributes.of(
314+
AttributeKey.stringArrayKey("elasticsearch.mapping.hints"),
315+
List.of("aggregate_metric_double", "_doc_count")
316+
)
317+
)
318+
)
319+
);
320+
321+
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
322+
assertThat(evaluate(metrics, "histogram_summary.type"), equalTo("aggregate_metric_double"));
323+
324+
ObjectPath search = search("metrics-generic.otel-default");
325+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
326+
var source = search.evaluate("hits.hits.0._source");
327+
assertThat(evaluate(source, "_doc_count"), equalTo(21));
328+
assertThat(evaluate(source, "metrics.histogram_summary.value_count"), equalTo(21));
329+
assertThat(evaluate(source, "metrics.histogram_summary.sum"), equalTo(10.0));
238330
}
239331

240332
public void testTsidForBulkIsSame() throws Exception {
@@ -316,7 +408,7 @@ private static Map<String, Object> getMapping(String target) throws IOException
316408
Map<String, Object> mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping")))
317409
.evaluate("");
318410
assertThat(mappings, aMapWithSize(1));
319-
Map<String, Object> mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings");
411+
Map<String, Object> mapping = evaluate(mappings.values().iterator().next(), "mappings");
320412
assertThat(mapping, not(anEmptyMap()));
321413
return mapping;
322414
}
@@ -420,4 +512,66 @@ public boolean isMonotonic() {
420512
return monotonic;
421513
}
422514
}
515+
516+
private static MetricData createHistogram(long timeEpochNanos, String name, AggregationTemporality temporality, Attributes attributes) {
517+
return ImmutableMetricData.createDoubleHistogram(
518+
TEST_RESOURCE,
519+
TEST_SCOPE,
520+
name,
521+
"Histogram Test",
522+
"ms",
523+
HistogramData.create(
524+
temporality,
525+
List.of(
526+
HistogramPointData.create(
527+
timeEpochNanos,
528+
timeEpochNanos,
529+
attributes,
530+
10,
531+
false,
532+
0,
533+
false,
534+
0,
535+
List.of(2.0, 4.0, 6.0, 8.0, 10.0),
536+
List.of(1L, 2L, 3L, 4L, 5L, 6L)
537+
)
538+
)
539+
)
540+
);
541+
}
542+
543+
private static MetricData createExponentialHistogram(
544+
long timeEpochNanos,
545+
String name,
546+
AggregationTemporality temporality,
547+
Attributes attributes
548+
) {
549+
return ImmutableMetricData.createExponentialHistogram(
550+
TEST_RESOURCE,
551+
TEST_SCOPE,
552+
name,
553+
"Exponential Histogram Test",
554+
"ms",
555+
ImmutableExponentialHistogramData.create(
556+
temporality,
557+
List.of(
558+
ImmutableExponentialHistogramPointData.create(
559+
0,
560+
10,
561+
10,
562+
false,
563+
0,
564+
false,
565+
0,
566+
ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)),
567+
ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)),
568+
timeEpochNanos,
569+
timeEpochNanos,
570+
attributes,
571+
List.of()
572+
)
573+
)
574+
)
575+
);
576+
}
423577
}

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private static void handlePartialSuccess(ActionListener<MetricsResponse> listene
154154
// the server MUST respond with HTTP 200 OK.
155155
// https://opentelemetry.io/docs/specs/otlp/#partial-success-1
156156
MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage());
157-
listener.onResponse(new MetricsResponse(RestStatus.OK, response));
157+
listener.onResponse(new MetricsResponse(RestStatus.BAD_REQUEST, response));
158158
}
159159

160160
private static void handlePartialSuccess(

0 commit comments

Comments
 (0)