Skip to content

Commit e36a586

Browse files
authored
Add option to generate exponential histogram fields to the OTLP intake (elastic#139119)
1 parent cf84571 commit e36a586

File tree

18 files changed

+479
-122
lines changed

18 files changed

+479
-122
lines changed

x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/ParsedHistogramConverterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ private ExponentialHistogramDataPoint.Buckets toOtelProtoBuckets(ExponentialHist
211211

212212
private HistogramParser.ParsedHistogram toParsed(DataPoint.ExponentialHistogram point) {
213213
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
214-
point.buildMetricValue(MappingHints.empty(), builder);
214+
point.buildMetricValue(MappingHints.DEFAULT_TDIGEST, builder, null);
215215
String json = Strings.toString(builder);
216216
try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) {
217217
parser.nextToken();

x-pack/plugin/otel-data/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ dependencies {
7878
clusterModules project(xpackModule('analytics'))
7979
clusterModules project(xpackModule('ilm'))
8080
clusterModules project(xpackModule('mapper-aggregate-metric'))
81+
clusterModules project(xpackModule('mapper-exponential-histogram'))
8182
clusterModules project(xpackModule('mapper-constant-keyword'))
8283
clusterModules project(xpackModule('mapper-counted-keyword'))
8384
clusterModules project(xpackModule('stack'))

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,21 @@ private static String createApiKey() throws IOException {
132132
return createApiKeyResponse.evaluate("encoded");
133133
}
134134

135+
/**
136+
* Sets the xpack.otel_data.histogram_field_type cluster setting to the provided value.
137+
*/
138+
private static void setHistogramFieldTypeClusterSetting(String value) throws IOException {
139+
Request request = new Request("PUT", "/_cluster/settings");
140+
request.setJsonEntity("""
141+
{
142+
"persistent": {
143+
"xpack.otel_data.histogram_field_type": "$setting"
144+
}
145+
}
146+
""".replace("$setting", value));
147+
assertOK(client().performRequest(request));
148+
}
149+
135150
@Override
136151
public void tearDown() throws Exception {
137152
meterProvider.close();
@@ -261,12 +276,13 @@ public void testCounterMonotonicity() throws Exception {
261276
assertThat(evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
262277
}
263278

264-
public void testExponentialHistograms() throws Exception {
279+
public void testExponentialHistogramsAsTDigest() throws Exception {
265280
long now = Clock.getDefault().now();
266281
export(List.of(createExponentialHistogram(now, "exponential_histogram", DELTA, Attributes.empty())));
267282

268283
Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
269284
assertThat(evaluate(mappings, "exponential_histogram.type"), equalTo("histogram"));
285+
assertThat(evaluate(mappings, "exponential_histogram.time_series_metric"), equalTo("histogram"));
270286

271287
// Get document and check values/counts array
272288
ObjectPath search = search("metrics-generic.otel-default");
@@ -276,6 +292,31 @@ public void testExponentialHistograms() throws Exception {
276292
assertThat(evaluate(source, "metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0)));
277293
}
278294

295+
public void testExponentialHistogramsAsExponentialHistogram() throws Exception {
296+
setHistogramFieldTypeClusterSetting("exponential_histogram");
297+
298+
long now = Clock.getDefault().now();
299+
export(List.of(createExponentialHistogram(now, "exponential_histogram", DELTA, Attributes.empty())));
300+
301+
Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
302+
assertThat(evaluate(mappings, "exponential_histogram.type"), equalTo("exponential_histogram"));
303+
assertThat(evaluate(mappings, "exponential_histogram.time_series_metric"), equalTo("histogram"));
304+
305+
// Get document and check values/counts array
306+
ObjectPath search = search("metrics-generic.otel-default");
307+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
308+
var source = search.evaluate("hits.hits.0._source");
309+
assertThat(evaluate(source, "metrics.exponential_histogram.scale"), equalTo(0));
310+
assertThat(evaluate(source, "metrics.exponential_histogram.zero.count"), equalTo(10));
311+
assertThat(evaluate(source, "metrics.exponential_histogram.positive.indices"), equalTo(List.of(0, 1)));
312+
assertThat(evaluate(source, "metrics.exponential_histogram.positive.counts"), equalTo(List.of(1, 2)));
313+
assertThat(evaluate(source, "metrics.exponential_histogram.negative.indices"), equalTo(List.of(0, 1)));
314+
assertThat(evaluate(source, "metrics.exponential_histogram.negative.counts"), equalTo(List.of(1, 2)));
315+
assertThat(evaluate(source, "metrics.exponential_histogram.sum"), equalTo(10.0));
316+
assertThat(evaluate(source, "metrics.exponential_histogram.min"), equalTo(-2.5));
317+
assertThat(evaluate(source, "metrics.exponential_histogram.max"), equalTo(2.5));
318+
}
319+
279320
public void testExponentialHistogramsAsAggregateMetricDouble() throws Exception {
280321
long now = Clock.getDefault().now();
281322
export(
@@ -303,12 +344,13 @@ public void testExponentialHistogramsAsAggregateMetricDouble() throws Exception
303344
assertThat(evaluate(source, "metrics.exponential_histogram_summary.sum"), equalTo(10.0));
304345
}
305346

306-
public void testHistogram() throws Exception {
347+
public void testHistogramAsTDigest() throws Exception {
307348
long now = Clock.getDefault().now();
308349
export(List.of(createHistogram(now, "histogram", DELTA, Attributes.empty())));
309350

310-
Map<String, Object> metrics = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
311-
assertThat(evaluate(metrics, "histogram.type"), equalTo("histogram"));
351+
Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
352+
assertThat(evaluate(mappings, "histogram.type"), equalTo("histogram"));
353+
assertThat(evaluate(mappings, "histogram.time_series_metric"), equalTo("histogram"));
312354

313355
// Get document and check values/counts array
314356
ObjectPath search = search("metrics-generic.otel-default");
@@ -319,6 +361,33 @@ public void testHistogram() throws Exception {
319361
assertThat(values, equalTo(List.of(1.0, 3.0, 5.0, 7.0, 9.0, 10.0)));
320362
}
321363

364+
public void testHistogramsAsExponentialHistogram() throws Exception {
365+
setHistogramFieldTypeClusterSetting("exponential_histogram");
366+
367+
long now = Clock.getDefault().now();
368+
export(List.of(createHistogram(now, "histogram", DELTA, Attributes.empty())));
369+
370+
Map<String, Object> mappings = evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
371+
assertThat(evaluate(mappings, "histogram.type"), equalTo("exponential_histogram"));
372+
assertThat(evaluate(mappings, "histogram.time_series_metric"), equalTo("histogram"));
373+
374+
// Get document and check values/counts array
375+
ObjectPath search = search("metrics-generic.otel-default");
376+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
377+
var source = search.evaluate("hits.hits.0._source");
378+
assertThat(evaluate(source, "metrics.histogram.scale"), equalTo(38)); // ExponentialHistogram.MAX_SCALE
379+
assertThat(evaluate(source, "metrics.histogram.zero"), equalTo(null));
380+
assertThat(
381+
evaluate(source, "metrics.histogram.positive.indices"),
382+
equalTo(List.of(-274877906945L, 435671174782L, 638246734797L, 771679845024L, 871342349565L, 913124641741L, 1234711177419L))
383+
);
384+
assertThat(evaluate(source, "metrics.histogram.positive.counts"), equalTo(List.of(1, 2, 3, 4, 5, 5, 1)));
385+
assertThat(evaluate(source, "metrics.histogram.negative"), equalTo(null));
386+
assertThat(evaluate(source, "metrics.histogram.sum"), equalTo(10.0));
387+
assertThat(evaluate(source, "metrics.histogram.min"), equalTo(0.5));
388+
assertThat(evaluate(source, "metrics.histogram.max"), equalTo(22.5));
389+
}
390+
322391
public void testHistogramAsAggregateMetricDouble() throws Exception {
323392
long now = Clock.getDefault().now();
324393
export(
@@ -541,10 +610,10 @@ private static MetricData createHistogram(long timeEpochNanos, String name, Aggr
541610
timeEpochNanos,
542611
attributes,
543612
10,
544-
false,
545-
0,
546-
false,
547-
0,
613+
true,
614+
0.5,
615+
true,
616+
22.5,
548617
List.of(2.0, 4.0, 6.0, 8.0, 10.0),
549618
List.of(1L, 2L, 3L, 4L, 5L, 6L)
550619
)
@@ -572,10 +641,10 @@ private static MetricData createExponentialHistogram(
572641
0,
573642
10,
574643
10,
575-
false,
576-
0,
577-
false,
578-
0,
644+
true,
645+
-2.5,
646+
true,
647+
2.5,
579648
ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)),
580649
ExponentialHistogramBuckets.create(0, 0, List.of(1L, 2L)),
581650
timeEpochNanos,

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/OTelPlugin.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@ public class OTelPlugin extends Plugin implements ActionPlugin {
4545
Setting.Property.Dynamic
4646
);
4747

48+
public enum HistogramMappingSettingValues {
49+
HISTOGRAM,
50+
EXPONENTIAL_HISTOGRAM
51+
};
52+
53+
public static final Setting<HistogramMappingSettingValues> USE_EXPONENTIAL_HISTOGRAM_FIELD_TYPE = Setting.enumSetting(
54+
HistogramMappingSettingValues.class,
55+
"xpack.otel_data.histogram_field_type",
56+
HistogramMappingSettingValues.HISTOGRAM,
57+
Setting.Property.NodeScope,
58+
Setting.Property.Dynamic
59+
);
60+
4861
private static final Logger logger = LogManager.getLogger(OTelPlugin.class);
4962

5063
private final SetOnce<OTelIndexTemplateRegistry> registry = new SetOnce<>();
@@ -92,7 +105,7 @@ public void close() {
92105

93106
@Override
94107
public List<Setting<?>> getSettings() {
95-
return List.of(OTEL_DATA_REGISTRY_ENABLED);
108+
return List.of(OTEL_DATA_REGISTRY_ENABLED, USE_EXPONENTIAL_HISTOGRAM_FIELD_TYPE);
96109
}
97110

98111
@Override

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import org.elasticsearch.action.support.ActionFilters;
3030
import org.elasticsearch.action.support.HandledTransportAction;
3131
import org.elasticsearch.client.internal.Client;
32+
import org.elasticsearch.cluster.service.ClusterService;
3233
import org.elasticsearch.common.bytes.BytesArray;
3334
import org.elasticsearch.common.bytes.BytesReference;
3435
import org.elasticsearch.common.io.stream.BytesStreamOutput;
3536
import org.elasticsearch.common.io.stream.StreamInput;
3637
import org.elasticsearch.common.io.stream.StreamOutput;
38+
import org.elasticsearch.common.settings.ClusterSettings;
3739
import org.elasticsearch.common.util.Maps;
3840
import org.elasticsearch.injection.guice.Inject;
3941
import org.elasticsearch.rest.RestStatus;
@@ -42,7 +44,9 @@
4244
import org.elasticsearch.transport.TransportService;
4345
import org.elasticsearch.xcontent.XContentBuilder;
4446
import org.elasticsearch.xcontent.XContentFactory;
47+
import org.elasticsearch.xpack.oteldata.OTelPlugin;
4548
import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext;
49+
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints;
4650
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MetricDocumentBuilder;
4751
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
4852

@@ -71,15 +75,24 @@ public class OTLPMetricsTransportAction extends HandledTransportAction<
7175
public static final int IGNORED_DATA_POINTS_MESSAGE_LIMIT = 10;
7276
private final Client client;
7377

78+
// visible for testing
79+
volatile MappingHints defaultMappingHints;
80+
7481
@Inject
7582
public OTLPMetricsTransportAction(
7683
TransportService transportService,
7784
ActionFilters actionFilters,
7885
ThreadPool threadPool,
79-
Client client
86+
Client client,
87+
ClusterService clusterService
8088
) {
8189
super(NAME, transportService, actionFilters, MetricsRequest::new, threadPool.executor(ThreadPool.Names.WRITE));
8290
this.client = client;
91+
ClusterSettings clusterSettings = clusterService.getClusterSettings();
92+
defaultMappingHints = MappingHints.fromSettings(clusterSettings.get(OTelPlugin.USE_EXPONENTIAL_HISTOGRAM_FIELD_TYPE));
93+
clusterSettings.addSettingsUpdateConsumer(OTelPlugin.USE_EXPONENTIAL_HISTOGRAM_FIELD_TYPE, histogramFieldTypeSetting -> {
94+
defaultMappingHints = MappingHints.fromSettings(histogramFieldTypeSetting);
95+
});
8396
}
8497

8598
@Override
@@ -94,7 +107,7 @@ protected void doExecute(Task task, MetricsRequest request, ActionListener<Metri
94107
return;
95108
}
96109
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
97-
MetricDocumentBuilder metricDocumentBuilder = new MetricDocumentBuilder(byteStringAccessor);
110+
MetricDocumentBuilder metricDocumentBuilder = new MetricDocumentBuilder(byteStringAccessor, defaultMappingHints);
98111
context.consume(dataPointGroup -> addIndexRequest(bulkRequestBuilder, metricDocumentBuilder, dataPointGroup));
99112
if (bulkRequestBuilder.numberOfActions() == 0) {
100113
// all data points were ignored

0 commit comments

Comments
 (0)