From 1ab3d89c4cba9cc5b52e158c1a0e45104abbfbfd Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 28 Aug 2025 10:31:46 +0200 Subject: [PATCH 01/11] OTLP: add support for counters and gauges --- .../cluster/routing/TsidBuilder.java | 14 +- .../otlp/OTLPMetricsIndexingRestIT.java | 220 +++++++++++- .../oteldata/otlp/OTLPMetricsRestAction.java | 12 +- .../otlp/OTLPMetricsTransportAction.java | 125 ++++++- .../oteldata/otlp/datapoint/DataPoint.java | 156 +++++++++ .../datapoint/DataPointGroupingContext.java | 316 ++++++++++++++++++ .../docbuilder/MetricDocumentBuilder.java | 139 ++++++++ .../proto/BufferedByteStringAccessor.java | 64 ++++ .../otlp/tsid/AttributeListTsidFunnel.java | 67 ++++ .../tsid/DataPointDimensionsTsidFunnel.java | 32 ++ .../otlp/tsid/DataPointTsidFunnel.java | 33 ++ .../otlp/tsid/ResourceTsidFunnel.java | 39 +++ .../oteldata/otlp/tsid/ScopeTsidFunnel.java | 41 +++ 13 files changed, 1240 insertions(+), 18 deletions(-) create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java b/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java index d29dced2adb28..3a3c47d8c5bb8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java @@ -34,7 +34,15 @@ public class TsidBuilder { private static final int MAX_TSID_VALUE_FIELDS = 16; private final BufferedMurmur3Hasher murmur3Hasher = new BufferedMurmur3Hasher(0L); - private final List dimensions = new ArrayList<>(); + private final List dimensions; + + public TsidBuilder() { + this.dimensions = new ArrayList<>(); + } + + public TsidBuilder(int size) { + this.dimensions = new ArrayList<>(size); + } public static TsidBuilder newBuilder() { return new TsidBuilder(); @@ -281,6 +289,10 @@ private static int writeHash128(MurmurHash3.Hash128 hash128, byte[] buffer, int return index; } + public int size() { + return dimensions.size(); + } + /** * A functional interface that describes how objects of a complex type are added to a TSID. * diff --git a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java index 12b30e743194f..236c2bcfd2179 100644 --- a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java +++ b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java @@ -8,17 +8,20 @@ package org.elasticsearch.action.otlp; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.exporter.internal.FailedExportException; +import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; import io.opentelemetry.sdk.resources.Resource; import org.elasticsearch.client.Request; @@ -29,17 +32,29 @@ import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.junit.Before; import org.junit.ClassRule; +import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; +import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC; +import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; public class OTLPMetricsIndexingRestIT extends ESRestTestCase { @@ -56,6 +71,8 @@ public class OTLPMetricsIndexingRestIT extends ESRestTestCase { .user(USER, PASS, "superuser", false) .setting("xpack.security.autoconfiguration.enabled", "false") .setting("xpack.license.self_generated.type", "trial") + .setting("xpack.security.enabled", "false") + .setting("xpack.watcher.enabled", "false") .build(); @Override @@ -98,21 +115,136 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testIngestMetricViaMeterProvider() throws Exception { + Meter sampleMeter = meterProvider.get("io.opentelemetry.example.metrics"); + + sampleMeter.gaugeBuilder("jvm.memory.total") + .setDescription("Reports JVM memory usage.") + .setUnit("By") + .buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty())); + + var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess(), is(true)); + + refreshMetricsIndices(); + + ObjectPath search = ObjectPath.createFromResponse( + client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search")) + ); + assertThat(search.evaluate("hits.total.value"), equalTo(1)); + } + public void testIngestMetricDataViaMetricExporter() throws Exception { - MetricData jvmMemoryMetricData = createDoubleGauge( - TEST_RESOURCE, - Attributes.empty(), - "jvm.memory.total", - Runtime.getRuntime().totalMemory(), - "By", - Clock.getDefault().now() + long now = Clock.getDefault().now(); + long totalMemory = Runtime.getRuntime().totalMemory(); + MetricData jvmMemoryMetricData = createLongGauge(TEST_RESOURCE, Attributes.empty(), "jvm.memory.total", totalMemory, "By", now); + + export(List.of(jvmMemoryMetricData)); + ObjectPath search = ObjectPath.createFromResponse( + client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search")) ); + assertThat(search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now))); + assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now))); + assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class)); + assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").toString(), equalTo(Long.toString(totalMemory))); + assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By")); + assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch")); + assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); + } - FailedExportException.HttpExportException exception = assertThrows( - FailedExportException.HttpExportException.class, - () -> export(List.of(jvmMemoryMetricData)) + public void testGroupingSameGroup() throws Exception { + long now = Clock.getDefault().now(); + MetricData metric1 = createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now); + // uses an equal but not the same resource to test grouping across resourceMetrics + MetricData metric2 = createDoubleGauge(TEST_RESOURCE.toBuilder().build(), Attributes.empty(), "metric2", 42, "By", now); + + export(List.of(metric1, metric2)); + + ObjectPath path = ObjectPath.createFromResponse( + client().performRequest(new Request("GET", "metrics-generic.otel-default/_search")) ); - assertThat(exception.getResponse().statusCode(), equalTo(RestStatus.NOT_IMPLEMENTED.getStatus())); + assertThat(path.evaluate("hits.total.value"), equalTo(1)); + assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0))); + assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch")))); + } + + public void testGroupingDifferentGroup() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now), + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now + TimeUnit.MILLISECONDS.toNanos(1)), + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "", now), + createDoubleGauge(TEST_RESOURCE, Attributes.of(stringKey("foo"), "bar"), "metric1", 42, "By", now) + ) + ); + ObjectPath path = ObjectPath.createFromResponse( + client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search")) + ); + assertThat(path.evaluate("hits.total.value"), equalTo(4)); + } + + public void testGauge() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "double_gauge", 42.0, "By", now), + createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now) + ) + ); + Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double")); + assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge")); + assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge")); + } + + public void testCounterTemporality() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createCounter(TEST_RESOURCE, Attributes.empty(), "cumulative_counter", 42, "By", now, CUMULATIVE, MONOTONIC), + createCounter(TEST_RESOURCE, Attributes.empty(), "delta_counter", 42, "By", now, DELTA, MONOTONIC) + ) + ); + + Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter")); + assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge")); + } + + public void testCounterMonotonicity() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter", 42, "By", now, CUMULATIVE, NON_MONOTONIC), + createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter_delta", 42, "By", now, DELTA, NON_MONOTONIC) + + ) + ); + + Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge")); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge")); + } + + private static Map getMapping(String target) throws IOException { + Map mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping"))) + .evaluate(""); + assertThat(mappings, aMapWithSize(1)); + Map mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings"); + assertThat(mapping, not(anEmptyMap())); + return mapping; + } + + private static String timestampAsString(long now) { + return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(now)).toString(); } private void export(List metrics) throws Exception { @@ -124,7 +256,11 @@ private void export(List metrics) throws Exception { throw new RuntimeException("Failed to export metrics", failure); } assertThat(result.isSuccess(), is(true)); - assertOK(client().performRequest(new Request("GET", "_refresh/metrics-*"))); + refreshMetricsIndices(); + } + + private static void refreshMetricsIndices() throws IOException { + assertOK(client().performRequest(new Request("GET", "metrics-*/_refresh"))); } private static MetricData createDoubleGauge( @@ -144,4 +280,62 @@ private static MetricData createDoubleGauge( ImmutableGaugeData.create(List.of(ImmutableDoublePointData.create(timeEpochNanos, timeEpochNanos, attributes, value))) ); } + + private static MetricData createLongGauge( + Resource resource, + Attributes attributes, + String name, + long value, + String unit, + long timeEpochNanos + ) { + return ImmutableMetricData.createLongGauge( + resource, + TEST_SCOPE, + name, + "Your description could be here.", + unit, + ImmutableGaugeData.create(List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value))) + ); + } + + private static MetricData createCounter( + Resource resource, + Attributes attributes, + String name, + long value, + String unit, + long timeEpochNanos, + AggregationTemporality temporality, + Monotonicity monotonicity + ) { + return ImmutableMetricData.createLongSum( + resource, + TEST_SCOPE, + name, + "Your description could be here.", + unit, + ImmutableSumData.create( + monotonicity.isMonotonic(), + temporality, + List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value)) + ) + ); + } + + // this is just to enhance readability of the createCounter calls (avoid boolean parameter) + enum Monotonicity { + MONOTONIC(true), + NON_MONOTONIC(false); + + private final boolean monotonic; + + Monotonicity(boolean monotonic) { + this.monotonic = monotonic; + } + + public boolean isMonotonic() { + return monotonic; + } + } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java index 23f46363d3060..c21975779d61f 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.oteldata.otlp; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; @@ -48,8 +49,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) { @Override public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) { - RestStatus restStatus = r.getStatus(); - return new RestResponse(restStatus, "application/x-protobuf", r.getResponse()); + return new RestResponse(r.getStatus(), "application/x-protobuf", r.getResponse()); } }) ); @@ -59,7 +59,13 @@ public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) // (a request that does not carry any telemetry data) // the server SHOULD respond with success. // https://opentelemetry.io/docs/specs/otlp/#full-success-1 - return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", new BytesArray(new byte[0]))); + return channel -> channel.sendResponse( + new RestResponse( + RestStatus.OK, + "application/x-protobuf", + new BytesArray(ExportMetricsServiceResponse.newBuilder().build().toByteArray()) + ) + ); } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 722b727fff40f..11894b676156f 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -7,6 +7,10 @@ package org.elasticsearch.xpack.oteldata.otlp; +import com.google.protobuf.MessageLite; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -15,11 +19,17 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.injection.guice.Inject; @@ -27,8 +37,14 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; +import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MetricDocumentBuilder; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; +import java.util.Arrays; /** * Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests. @@ -62,7 +78,110 @@ public OTLPMetricsTransportAction( @Override protected void doExecute(Task task, MetricsRequest request, ActionListener listener) { - listener.onResponse(new MetricsResponse(RestStatus.NOT_IMPLEMENTED, new BytesArray(new byte[0]))); + BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor(); + DataPointGroupingContext context = new DataPointGroupingContext(byteStringAccessor); + try { + var metricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.exportMetricsServiceRequest.streamInput()); + context.groupDataPoints(metricsServiceRequest); + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + MetricDocumentBuilder metricDocumentBuilder = new MetricDocumentBuilder(byteStringAccessor); + context.consume(dataPointGroup -> addIndexRequest(bulkRequestBuilder, metricDocumentBuilder, dataPointGroup)); + if (bulkRequestBuilder.numberOfActions() == 0) { + if (context.totalDataPoints() == 0) { + // If the server receives an empty request + // (a request that does not carry any telemetry data) + // the server SHOULD respond with success. + // https://opentelemetry.io/docs/specs/otlp/#full-success-1 + listener.onResponse(new MetricsResponse(RestStatus.OK, ExportMetricsServiceResponse.newBuilder().build())); + } else { + // If the processing of the request fails because the request contains data that cannot be decoded + // or is otherwise invalid and such failure is permanent, + // then the server MUST respond with HTTP 400 Bad Request. + // https://opentelemetry.io/docs/specs/otlp/#bad-data + listener.onResponse( + new MetricsResponse( + RestStatus.BAD_REQUEST, + responseWithRejectedDataPoints(context.totalDataPoints(), context.getIgnoredDataPointsMessage()) + ) + ); + } + return; + } + + bulkRequestBuilder.execute(new ActionListener<>() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + MessageLite response; + // If the request is only partially accepted + // (i.e. when the server accepts only parts of the data and rejects the rest), + // the server MUST respond with HTTP 200 OK. + // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 + RestStatus status = RestStatus.OK; + if (bulkItemResponses.hasFailures() || context.getIgnoredDataPoints() > 0) { + int failures = (int) Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count(); + if (failures == bulkItemResponses.getItems().length) { + // If the processing of the request fails, + // the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. + // https://opentelemetry.io/docs/specs/otlp/#failures-1 + status = RestStatus.INTERNAL_SERVER_ERROR; + } + response = responseWithRejectedDataPoints( + failures + context.getIgnoredDataPoints(), + bulkItemResponses.buildFailureMessage() + context.getIgnoredDataPointsMessage() + ); + } else { + response = ExportMetricsServiceResponse.newBuilder().build(); + } + listener.onResponse(new MetricsResponse(status, response)); + } + + @Override + public void onFailure(Exception e) { + listener.onResponse( + // https://opentelemetry.io/docs/specs/otlp/#failures-1 + // If the processing of the request fails, + // the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. + new MetricsResponse( + RestStatus.INTERNAL_SERVER_ERROR, + responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage()) + ) + ); + } + }); + + } catch (Exception e) { + logger.error("failed to execute otlp metrics request", e); + listener.onResponse(new MetricsResponse( + RestStatus.INTERNAL_SERVER_ERROR, + responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage()) + )); + } + } + + private static ExportMetricsPartialSuccess responseWithRejectedDataPoints(int rejectedDataPoints, String message) { + return ExportMetricsServiceResponse.newBuilder() + .getPartialSuccessBuilder() + .setRejectedDataPoints(rejectedDataPoints) + .setErrorMessage(message) + .build(); + } + + private void addIndexRequest( + BulkRequestBuilder bulkRequestBuilder, + MetricDocumentBuilder metricDocumentBuilder, + DataPointGroupingContext.DataPointGroup dataPointGroup + ) throws IOException { + try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) { + var dynamicTemplates = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dataPointGroup); + bulkRequestBuilder.add( + new IndexRequest(dataPointGroup.targetIndex()).opType(DocWriteRequest.OpType.CREATE) + .setRequireDataStream(true) + .source(xContentBuilder) + // TODO explicitly set _tsid once https://github.com/elastic/elasticsearch/pull/132566 is merged + //.tsid(DataPointGroupTsidFunnel.forDataPointGroup(dataPointGroup).buildTsid()) + .setDynamicTemplates(dynamicTemplates) + ); + } } public static class MetricsRequest extends ActionRequest implements CompositeIndicesRequest { @@ -87,6 +206,10 @@ public static class MetricsResponse extends ActionResponse { private final BytesReference response; private final RestStatus status; + public MetricsResponse(RestStatus status, MessageLite response) { + this(status, new BytesArray(response.toByteArray())); + } + public MetricsResponse(RestStatus status, BytesReference response) { this.response = response; this.status = status; diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java new file mode 100644 index 0000000000000..528e46b301163 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; + +/** + * Represents a metrics data point in the OpenTelemetry metrics data model. + * This interface defines methods to access various properties of a data point, + * such as its timestamp, attributes, unit, metric name, and methods to build + * the metric value in a specific format. + * The reason this class is needed is that the generated classes from the + * OpenTelemetry proto definitions don't implement a common interface, + * which makes it difficult to handle different types of data points uniformly. + */ +public interface DataPoint { + + /** + * Returns the timestamp of the data point in Unix nanoseconds. + * + * @return the timestamp in nanoseconds + */ + long getTimestampUnixNano(); + + /** + * Returns the start timestamp of the data point in Unix nanoseconds. + * This allows detecting when a sequence of observations is unbroken. + * This field indicates to consumers the start time for points with cumulative and delta temporality, + * and can support correct rate calculation. + * + * @return the start timestamp in nanoseconds + */ + long getStartTimestampUnixNano(); + + /** + * Returns the attributes associated with the data point. + * + * @return a list of key-value pairs representing the attributes + */ + List getAttributes(); + + /** + * Returns the unit of measurement for the data point. + * + * @return the unit as a string + */ + String getUnit(); + + /** + * Returns the name of the metric associated with the data point. + * + * @return the metric name as a string + */ + String getMetricName(); + + /** + * Builds the metric value for the data point and writes it to the provided XContentBuilder. + * + * @param builder the XContentBuilder to write the metric value to + * @throws IOException if an I/O error occurs while writing to the builder + */ + void buildMetricValue(XContentBuilder builder) throws IOException; + + /** + * Returns the dynamic template name for the data point based on its type and value. + * This is used to dynamically map the appropriate field type according to the data point's characteristics. + * + * @return the dynamic template name as a string + */ + String getDynamicTemplate(); + + /** + * Validates whether the data point can be indexed into Elasticsearch. + * + * @param messages a set to collect validation error messages + * @return true if the data point is valid, false otherwise + */ + boolean isValid(Set messages); + + record Number(NumberDataPoint dataPoint, Metric metric) implements DataPoint { + + @Override + public long getTimestampUnixNano() { + return dataPoint.getTimeUnixNano(); + } + + @Override + public List getAttributes() { + return dataPoint.getAttributesList(); + } + + @Override + public long getStartTimestampUnixNano() { + return dataPoint.getStartTimeUnixNano(); + } + + @Override + public String getUnit() { + return metric.getUnit(); + } + + @Override + public String getMetricName() { + return metric.getName(); + } + + @Override + public void buildMetricValue(XContentBuilder builder) throws IOException { + switch (dataPoint.getValueCase()) { + case AS_DOUBLE -> builder.value(dataPoint.getAsDouble()); + case AS_INT -> builder.value(dataPoint.getAsInt()); + } + } + + @Override + public String getDynamicTemplate() { + String type; + if (metric.hasSum() + // TODO add support for delta counters - for now we represent them as gauges + && metric.getSum().getAggregationTemporality() == AGGREGATION_TEMPORALITY_CUMULATIVE + // TODO add support for up/down counters - for now we represent them as gauges + && metric.getSum().getIsMonotonic()) { + type = "counter_"; + } else { + type = "gauge_"; + } + if (dataPoint.getValueCase() == NumberDataPoint.ValueCase.AS_INT) { + return type + "long"; + } else if (dataPoint.getValueCase() == NumberDataPoint.ValueCase.AS_DOUBLE) { + return type + "double"; + } else { + return null; + } + } + + @Override + public boolean isValid(Set messages) { + return true; + } + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java new file mode 100644 index 0000000000000..20604ccaac023 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -0,0 +1,316 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.resource.v1.Resource; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.common.hash.BufferedMurmur3Hasher; +import org.elasticsearch.common.hash.MurmurHash3.Hash128; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; +import org.elasticsearch.xpack.oteldata.otlp.tsid.DataPointTsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.tsid.ResourceTsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.tsid.ScopeTsidFunnel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; + +public class DataPointGroupingContext { + + private final BufferedByteStringAccessor byteStringAccessor; + private final Map resourceGroups = new HashMap<>(); + private final Set ignoredDataPointMessages = new HashSet<>(); + + private int totalDataPoints = 0; + private int ignoredDataPoints = 0; + + public DataPointGroupingContext(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public void groupDataPoints(ExportMetricsServiceRequest exportMetricsServiceRequest) { + List resourceMetricsList = exportMetricsServiceRequest.getResourceMetricsList(); + for (int i = 0; i < resourceMetricsList.size(); i++) { + ResourceMetrics resourceMetrics = resourceMetricsList.get(i); + ResourceGroup resourceGroup = getOrCreateResourceGroup(resourceMetrics); + List scopeMetricsList = resourceMetrics.getScopeMetricsList(); + for (int j = 0; j < scopeMetricsList.size(); j++) { + ScopeMetrics scopeMetrics = scopeMetricsList.get(j); + ScopeGroup scopeGroup = resourceGroup.getOrCreateScope(scopeMetrics); + List metricsList = scopeMetrics.getMetricsList(); + for (int k = 0; k < metricsList.size(); k++) { + var metric = metricsList.get(k); + switch (metric.getDataCase()) { + case SUM: + scopeGroup.addDataPoints(metric, metric.getSum().getDataPointsList(), DataPoint.Number::new); + break; + case GAUGE: + scopeGroup.addDataPoints(metric, metric.getGauge().getDataPointsList(), DataPoint.Number::new); + break; + case EXPONENTIAL_HISTOGRAM: + ignoredDataPoints += metric.getExponentialHistogram().getDataPointsCount(); + ignoredDataPointMessages.add("Exponential histogram is not supported yet. Dropping " + metric.getName()); + break; + case HISTOGRAM: + ignoredDataPoints += metric.getHistogram().getDataPointsCount(); + ignoredDataPointMessages.add("Histogram is not supported yet. Dropping " + metric.getName()); + break; + case SUMMARY: + ignoredDataPoints += metric.getSummary().getDataPointsList().size(); + ignoredDataPointMessages.add("Summary is not supported yet. Dropping " + metric.getName()); + break; + default: + ignoredDataPoints++; + ignoredDataPointMessages.add("unsupported metric type " + metric.getDataCase()); + break; + } + } + } + } + } + + /** + * Consumes all data point groups in the context, removing them from the context. + * + * @param consumer the consumer to process each {@link DataPointGroup} + * @param the type of exception that can be thrown by the consumer + * @throws E if the consumer throws an exception + */ + public void consume(CheckedConsumer consumer) throws E { + for (Iterator iterator = resourceGroups.values().iterator(); iterator.hasNext();) { + ResourceGroup resourceGroup = iterator.next(); + // Remove the resource group from the map can help to significantly reduce GC overhead. + // This avoids that the resource groups are promoted to survivor space when the context is kept alive for a while, + // for example, when referenced in the bulk response listener. + iterator.remove(); + resourceGroup.forEach(consumer); + } + } + + public int totalDataPoints() { + return totalDataPoints; + } + + public int getIgnoredDataPoints() { + return ignoredDataPoints; + } + + public String getIgnoredDataPointsMessage() { + return ignoredDataPointMessages.isEmpty() ? "" : String.join("\n", ignoredDataPointMessages); + } + + private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) { + TsidBuilder resourceTsidBuilder = ResourceTsidFunnel.forResource(byteStringAccessor, resourceMetrics); + Hash128 resourceHash = resourceTsidBuilder.hash(); + ResourceGroup resourceGroup = resourceGroups.get(resourceHash); + if (resourceGroup == null) { + resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes()); + resourceGroups.put(resourceHash, resourceGroup); + } + return resourceGroup; + } + + class ResourceGroup { + private final Resource resource; + private final ByteString resourceSchemaUrl; + private final Map scopes; + + ResourceGroup(Resource resource, ByteString resourceSchemaUrl) { + this.resource = resource; + this.resourceSchemaUrl = resourceSchemaUrl; + this.scopes = new HashMap<>(); + } + + public ScopeGroup getOrCreateScope(ScopeMetrics scopeMetrics) { + TsidBuilder scopeTsidBuilder = ScopeTsidFunnel.forScope(byteStringAccessor, scopeMetrics); + Hash128 scopeHash = scopeTsidBuilder.hash(); + ScopeGroup scopeGroup = scopes.get(scopeHash); + if (scopeGroup == null) { + scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes()); + scopes.put(scopeHash, scopeGroup); + } + return scopeGroup; + } + + public void forEach(CheckedConsumer consumer) throws E { + for (ScopeGroup scopeGroup : scopes.values()) { + scopeGroup.forEach(consumer); + } + } + } + + class ScopeGroup { + private final ResourceGroup resourceGroup; + private final InstrumentationScope scope; + private final ByteString scopeSchemaUrl; + // index -> timiestamp -> dataPointGroupHash -> DataPointGroup + private final Map>> dataPointGroupsByIndexAndTimestamp; + + ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) { + this.resourceGroup = resourceGroup; + this.scope = scope; + this.scopeSchemaUrl = scopeSchemaUrl; + this.dataPointGroupsByIndexAndTimestamp = new HashMap<>(); + } + + public void addDataPoints(Metric metric, List dataPoints, BiFunction createDataPoint) { + for (int i = 0; i < dataPoints.size(); i++) { + T dataPoint = dataPoints.get(i); + addDataPoint(createDataPoint.apply(dataPoint, metric)); + } + } + + public void addDataPoint(DataPoint dataPoint) { + totalDataPoints++; + if (dataPoint.isValid(ignoredDataPointMessages) == false) { + ignoredDataPoints++; + return; + } + getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint); + } + + private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) { + TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(byteStringAccessor, dataPoint); + Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash(); + // in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp + Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano()); + // TODO determine based on attributes and scope name + String targetIndex = "metrics-generic.otel-default"; + var dataPointGroupsByTimestamp = dataPointGroupsByIndexAndTimestamp.computeIfAbsent(targetIndex, k -> new HashMap<>()); + var dataPointGroups = dataPointGroupsByTimestamp.computeIfAbsent(timestamp, k -> new HashMap<>()); + DataPointGroup dataPointGroup = dataPointGroups.get(dataPointGroupHash); + if (dataPointGroup == null) { + dataPointGroup = new DataPointGroup( + resourceGroup.resource, + resourceGroup.resourceSchemaUrl, + scope, + scopeSchemaUrl, + dataPoint.getAttributes(), + dataPoint.getUnit(), + new ArrayList<>(), + targetIndex + ); + dataPointGroups.put(dataPointGroupHash, dataPointGroup); + } + return dataPointGroup; + } + + public void forEach(CheckedConsumer consumer) throws E { + for (var dataPointGroupsByTime : dataPointGroupsByIndexAndTimestamp.values()) { + for (var dataPointGroups : dataPointGroupsByTime.values()) { + for (DataPointGroup dataPointGroup : dataPointGroups.values()) { + consumer.accept(dataPointGroup); + } + } + } + } + } + + public static final class DataPointGroup { + private final Resource resource; + private final ByteString resourceSchemaUrl; + private final InstrumentationScope scope; + private final ByteString scopeSchemaUrl; + private final List dataPointAttributes; + private final String unit; + private final List dataPoints; + private final String targetIndex; + private String metricNamesHash; + + public DataPointGroup( + Resource resource, + ByteString resourceSchemaUrl, + InstrumentationScope scope, + ByteString scopeSchemaUrl, + List dataPointAttributes, + String unit, + List dataPoints, + String targetIndex + ) { + this.resource = resource; + this.resourceSchemaUrl = resourceSchemaUrl; + this.scope = scope; + this.scopeSchemaUrl = scopeSchemaUrl; + this.dataPointAttributes = dataPointAttributes; + this.unit = unit; + this.dataPoints = dataPoints; + this.targetIndex = targetIndex; + } + + public long getTimestampUnixNano() { + return dataPoints.getFirst().getTimestampUnixNano(); + } + + public long getStartTimestampUnixNano() { + return dataPoints.getFirst().getStartTimestampUnixNano(); + } + + public String getMetricNamesHash() { + if (metricNamesHash == null) { + BufferedMurmur3Hasher hasher = new BufferedMurmur3Hasher(0); + for (int i = 0; i < dataPoints.size(); i++) { + hasher.addString(dataPoints.get(i).getMetricName()); + } + metricNamesHash = Integer.toHexString(hasher.digestHash().hashCode()); + } + return metricNamesHash; + } + + public void addDataPoint(DataPoint dataPoint) { + metricNamesHash = null; // reset the hash when adding a new data point + dataPoints.add(dataPoint); + } + + public Resource resource() { + return resource; + } + + public ByteString resourceSchemaUrl() { + return resourceSchemaUrl; + } + + public InstrumentationScope scope() { + return scope; + } + + public ByteString scopeSchemaUrl() { + return scopeSchemaUrl; + } + + public List dataPointAttributes() { + return dataPointAttributes; + } + + public String unit() { + return unit; + } + + public List dataPoints() { + return dataPoints; + } + + public String targetIndex() { + return targetIndex; + } + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java new file mode 100644 index 0000000000000..78517c2be8b99 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.docbuilder; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * This class constructs an Elasticsearch document representation of a metric data point group. + * It also handles dynamic templates for metrics based on their attributes. + */ +public class MetricDocumentBuilder { + + private final BufferedByteStringAccessor byteStringAccessor; + + public MetricDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public HashMap buildMetricDocument(XContentBuilder builder, DataPointGroupingContext.DataPointGroup dataPointGroup) + throws IOException { + HashMap dynamicTemplates = new HashMap<>(); + List dataPoints = dataPointGroup.dataPoints(); + builder.startObject(); + builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getTimestampUnixNano())); + if (dataPointGroup.getStartTimestampUnixNano() != 0) { + builder.field("start_timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getStartTimestampUnixNano())); + } + buildResource(dataPointGroup.resource(), dataPointGroup.resourceSchemaUrl(), builder); + buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope()); + buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit()); + builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash()); + + long docCount = 0; + builder.startObject("metrics"); + for (int i = 0, dataPointsSize = dataPoints.size(); i < dataPointsSize; i++) { + DataPoint dataPoint = dataPoints.get(i); + builder.field(dataPoint.getMetricName()); + dataPoint.buildMetricValue(builder); + String dynamicTemplate = dataPoint.getDynamicTemplate(); + if (dynamicTemplate != null) { + dynamicTemplates.put("metrics." + dataPoint.getMetricName(), dynamicTemplate); + } + } + builder.endObject(); + if (docCount > 0) { + builder.field("_doc_count", docCount); + } + builder.endObject(); + return dynamicTemplates; + } + + private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException { + builder.startObject("resource"); + addFieldIfNotEmpty(builder, "schema_url", schemaUrl); + if (resource.getDroppedAttributesCount() > 0) { + builder.field("dropped_attributes_count", resource.getDroppedAttributesCount()); + } + builder.startObject("attributes"); + buildAttributes(builder, resource.getAttributesList()); + builder.endObject(); + builder.endObject(); + } + + private void buildScope(XContentBuilder builder, ByteString schemaUrl, InstrumentationScope scope) throws IOException { + builder.startObject("scope"); + addFieldIfNotEmpty(builder, "schema_url", schemaUrl); + if (scope.getDroppedAttributesCount() > 0) { + builder.field("dropped_attributes_count", scope.getDroppedAttributesCount()); + } + addFieldIfNotEmpty(builder, "name", scope.getNameBytes()); + addFieldIfNotEmpty(builder, "version", scope.getVersionBytes()); + builder.startObject("attributes"); + buildAttributes(builder, scope.getAttributesList()); + builder.endObject(); + builder.endObject(); + } + + private void addFieldIfNotEmpty(XContentBuilder builder, String name, ByteString value) throws IOException { + if (value != null && value.isEmpty() == false) { + builder.field(name); + byteStringAccessor.utf8Value(builder, value); + } + } + + private void buildDataPointAttributes(XContentBuilder builder, List attributes, String unit) throws IOException { + builder.startObject("attributes"); + buildAttributes(builder, attributes); + builder.endObject(); + builder.field("unit", unit); + } + + private void buildAttributes(XContentBuilder builder, List attributes) throws IOException { + for (int i = 0, size = attributes.size(); i < size; i++) { + KeyValue attribute = attributes.get(i); + builder.field(attribute.getKey()); + attributeValue(builder, attribute.getValue()); + } + } + + private void attributeValue(XContentBuilder builder, AnyValue value) throws IOException { + switch (value.getValueCase()) { + case STRING_VALUE -> byteStringAccessor.utf8Value(builder, value.getStringValueBytes()); + case BOOL_VALUE -> builder.value(value.getBoolValue()); + case INT_VALUE -> builder.value(value.getIntValue()); + case DOUBLE_VALUE -> builder.value(value.getDoubleValue()); + case ARRAY_VALUE -> { + builder.startArray(); + List valuesList = value.getArrayValue().getValuesList(); + for (int i = 0, valuesListSize = valuesList.size(); i < valuesListSize; i++) { + AnyValue arrayValue = valuesList.get(i); + attributeValue(builder, arrayValue); + } + builder.endArray(); + } + default -> throw new IllegalArgumentException("Unsupported attribute value type: " + value.getValueCase()); + } + } + +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java new file mode 100644 index 0000000000000..747dfc0187859 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.proto; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * A utility class that uses a shared {@code byte[]} buffer to convert {@link ByteString} values to byte arrays. + * This avoids frequent allocations of byte arrays in {@link ByteString#toByteArray()}. + * Note that due to the use of a shared buffer, this class is not thread-safe. + */ +public class BufferedByteStringAccessor { + + private static final int DEFAULT_INITIAL_SIZE = 128; + + private byte[] bytes = new byte[DEFAULT_INITIAL_SIZE]; + + /** + * Adds a string dimension to the given {@link TsidBuilder} using the provided dimension name and {@link ByteString} value. + * The value is converted to a byte array using a shared buffer and added to the builder. + * + * @param tsidBuilder the builder to which the dimension will be added + * @param dimension the name of the dimension to add + * @param value the value of the dimension as a {@link ByteString} + */ + public void addStringDimension(TsidBuilder tsidBuilder, String dimension, ByteString value) { + if (value.isEmpty()) { + return; + } + tsidBuilder.addStringDimension(dimension, toBytes(value), 0, value.size()); + } + + /** + * Writes a UTF-8 encoded value to the provided {@link XContentBuilder} using {@link XContentBuilder#utf8Value}. + * This uses a shared byte array to avoid allocations. + * + * @param value the value to write + */ + public void utf8Value(XContentBuilder builder, ByteString value) throws IOException { + builder.utf8Value(toBytes(value), 0, value.size()); + } + + /* + * Not exposed as a public method to avoid risks of leaking a reference to the reused byte array. + */ + private byte[] toBytes(ByteString byteString) { + int size = byteString.size(); + if (bytes.length < size) { + bytes = new byte[size]; + } + byteString.copyTo(bytes, 0); + return bytes; + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java new file mode 100644 index 0000000000000..46a2c301593e1 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +class AttributeListTsidFunnel implements TsidFunnel> { + + private final String prefix; + private final BufferedByteStringAccessor byteStringAccessor; + + private AttributeListTsidFunnel(BufferedByteStringAccessor byteStringAccessor, String prefix) { + this.prefix = prefix; + this.byteStringAccessor = byteStringAccessor; + } + + static AttributeListTsidFunnel get(BufferedByteStringAccessor byteStringAccessor, String prefix) { + return new AttributeListTsidFunnel(byteStringAccessor, prefix); + } + + @Override + public void add(List attributesList, TsidBuilder tsidBuilder) { + for (int i = 0; i < attributesList.size(); i++) { + KeyValue keyValue = attributesList.get(i); + String attributeKey = keyValue.getKey(); + hashValue(tsidBuilder, prefix + attributeKey, keyValue.getValue()); + } + } + + private void hashValue(TsidBuilder tsidBuilder, String key, AnyValue value) { + switch (value.getValueCase()) { + case STRING_VALUE: + byteStringAccessor.addStringDimension(tsidBuilder, key, value.getStringValueBytes()); + break; + case BOOL_VALUE: + tsidBuilder.addBooleanDimension(key, value.getBoolValue()); + break; + case DOUBLE_VALUE: + tsidBuilder.addDoubleDimension(key, value.getDoubleValue()); + break; + case INT_VALUE: + tsidBuilder.addLongDimension(key, value.getIntValue()); + break; + case KVLIST_VALUE: + tsidBuilder.add(value.getKvlistValue().getValuesList(), AttributeListTsidFunnel.get(byteStringAccessor, key + ".")); + break; + case ARRAY_VALUE: + List valuesList = value.getArrayValue().getValuesList(); + for (int i = 0; i < valuesList.size(); i++) { + hashValue(tsidBuilder, key, valuesList.get(i)); + } + break; + } + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java new file mode 100644 index 0000000000000..a681026849576 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +class DataPointDimensionsTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + private DataPointDimensionsTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + static DataPointDimensionsTsidFunnel get(BufferedByteStringAccessor byteStringAccessor) { + return new DataPointDimensionsTsidFunnel(byteStringAccessor); + } + + @Override + public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) { + tsidBuilder.add(dataPoint.getAttributes(), AttributeListTsidFunnel.get(byteStringAccessor, "attributes.")); + tsidBuilder.addStringDimension("unit", dataPoint.getUnit()); + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java new file mode 100644 index 0000000000000..09aba608aa998 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +public class DataPointTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + private DataPointTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint) { + TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size()); + new DataPointTsidFunnel(byteStringAccessor).add(dataPoint, tsidBuilder); + return tsidBuilder; + } + + @Override + public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) { + tsidBuilder.add(dataPoint, DataPointDimensionsTsidFunnel.get(byteStringAccessor)); + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java new file mode 100644 index 0000000000000..8c12b4c283e69 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +public class ResourceTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + public ResourceTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAccessor, ResourceMetrics resourceMetrics) { + TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount() + 1); + new ResourceTsidFunnel(byteStringAccessor).add(resourceMetrics, tsidBuilder); + return tsidBuilder; + } + + @Override + public void add(ResourceMetrics resourceMetrics, TsidBuilder tsidBuilder) { + List resourceAttributes = resourceMetrics.getResource().getAttributesList(); + tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "resource.attributes.")); + byteStringAccessor.addStringDimension(tsidBuilder, "schema_url", resourceMetrics.getSchemaUrlBytes()); + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java new file mode 100644 index 0000000000000..47bcbb8c15f11 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +public class ScopeTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + public ScopeTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public static TsidBuilder forScope(BufferedByteStringAccessor byteStringAccessor, ScopeMetrics scopeMetrics) { + TsidBuilder tsidBuilder = new TsidBuilder(scopeMetrics.getScope().getAttributesCount() + 3); + new ScopeTsidFunnel(byteStringAccessor).add(scopeMetrics, tsidBuilder); + return tsidBuilder; + } + + @Override + public void add(ScopeMetrics scopeMetrics, TsidBuilder tsidBuilder) { + List resourceAttributes = scopeMetrics.getScope().getAttributesList(); + byteStringAccessor.addStringDimension(tsidBuilder, "scope.name", scopeMetrics.getScope().getNameBytes()); + byteStringAccessor.addStringDimension(tsidBuilder, "scope.schema_url", scopeMetrics.getSchemaUrlBytes()); + tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "scope.attributes.")); + byteStringAccessor.addStringDimension(tsidBuilder, "scope.version", scopeMetrics.getScope().getVersionBytes()); + } +} From e33f77469c905c5efcb8fdaef7a0968d152f0866 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 28 Aug 2025 08:40:30 +0000 Subject: [PATCH 02/11] [CI] Auto commit changes from spotless --- .../xpack/oteldata/otlp/OTLPMetricsRestAction.java | 1 + .../oteldata/otlp/OTLPMetricsTransportAction.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java index c21975779d61f..ff16e40fbcb36 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.oteldata.otlp; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 11894b676156f..e7c42cb643695 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.oteldata.otlp; -import com.google.protobuf.MessageLite; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; + +import com.google.protobuf.MessageLite; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -151,10 +153,12 @@ public void onFailure(Exception e) { } catch (Exception e) { logger.error("failed to execute otlp metrics request", e); - listener.onResponse(new MetricsResponse( + listener.onResponse( + new MetricsResponse( RestStatus.INTERNAL_SERVER_ERROR, responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage()) - )); + ) + ); } } @@ -178,7 +182,7 @@ private void addIndexRequest( .setRequireDataStream(true) .source(xContentBuilder) // TODO explicitly set _tsid once https://github.com/elastic/elasticsearch/pull/132566 is merged - //.tsid(DataPointGroupTsidFunnel.forDataPointGroup(dataPointGroup).buildTsid()) + // .tsid(DataPointGroupTsidFunnel.forDataPointGroup(dataPointGroup).buildTsid()) .setDynamicTemplates(dynamicTemplates) ); } From f9ecbd2a8f6aac283a34947233f5e761234b9d27 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 12:02:16 +0200 Subject: [PATCH 03/11] Apply suggestions from code review Co-authored-by: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> --- .../xpack/oteldata/otlp/datapoint/DataPoint.java | 4 ++-- .../oteldata/otlp/datapoint/DataPointGroupingContext.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java index 528e46b301163..864418cbd98d9 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java @@ -90,7 +90,7 @@ public interface DataPoint { * @param messages a set to collect validation error messages * @return true if the data point is valid, false otherwise */ - boolean isValid(Set messages); + boolean isValid(Set errors); record Number(NumberDataPoint dataPoint, Metric metric) implements DataPoint { @@ -149,7 +149,7 @@ public String getDynamicTemplate() { } @Override - public boolean isValid(Set messages) { + public boolean isValid(Set errors) { return true; } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java index 20604ccaac023..6e84dbb46a979 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -163,7 +163,7 @@ class ScopeGroup { private final ResourceGroup resourceGroup; private final InstrumentationScope scope; private final ByteString scopeSchemaUrl; - // index -> timiestamp -> dataPointGroupHash -> DataPointGroup + // index -> timestamp -> dataPointGroupHash -> DataPointGroup private final Map>> dataPointGroupsByIndexAndTimestamp; ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) { From 21d656eab121fea8e65b30154e7fb1d7471a713f Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 17:04:08 +0200 Subject: [PATCH 04/11] Revert changes to action and doc builder --- .../otlp/OTLPMetricsIndexingRestIT.java | 220 ++---------------- .../oteldata/otlp/OTLPMetricsRestAction.java | 13 +- .../otlp/OTLPMetricsTransportAction.java | 129 +--------- .../docbuilder/MetricDocumentBuilder.java | 139 ----------- 4 files changed, 17 insertions(+), 484 deletions(-) delete mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java diff --git a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java index 236c2bcfd2179..12b30e743194f 100644 --- a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java +++ b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java @@ -8,20 +8,17 @@ package org.elasticsearch.action.otlp; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.internal.FailedExportException; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; import io.opentelemetry.sdk.resources.Resource; import org.elasticsearch.client.Request; @@ -32,29 +29,17 @@ import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.test.rest.ObjectPath; import org.junit.Before; import org.junit.ClassRule; -import java.io.IOException; import java.time.Duration; -import java.time.Instant; import java.util.List; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static io.opentelemetry.api.common.AttributeKey.stringKey; -import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; -import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; -import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC; -import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC; -import static org.hamcrest.Matchers.aMapWithSize; -import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isA; -import static org.hamcrest.Matchers.not; public class OTLPMetricsIndexingRestIT extends ESRestTestCase { @@ -71,8 +56,6 @@ public class OTLPMetricsIndexingRestIT extends ESRestTestCase { .user(USER, PASS, "superuser", false) .setting("xpack.security.autoconfiguration.enabled", "false") .setting("xpack.license.self_generated.type", "trial") - .setting("xpack.security.enabled", "false") - .setting("xpack.watcher.enabled", "false") .build(); @Override @@ -115,136 +98,21 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testIngestMetricViaMeterProvider() throws Exception { - Meter sampleMeter = meterProvider.get("io.opentelemetry.example.metrics"); - - sampleMeter.gaugeBuilder("jvm.memory.total") - .setDescription("Reports JVM memory usage.") - .setUnit("By") - .buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty())); - - var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS); - assertThat(result.isSuccess(), is(true)); - - refreshMetricsIndices(); - - ObjectPath search = ObjectPath.createFromResponse( - client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search")) - ); - assertThat(search.evaluate("hits.total.value"), equalTo(1)); - } - public void testIngestMetricDataViaMetricExporter() throws Exception { - long now = Clock.getDefault().now(); - long totalMemory = Runtime.getRuntime().totalMemory(); - MetricData jvmMemoryMetricData = createLongGauge(TEST_RESOURCE, Attributes.empty(), "jvm.memory.total", totalMemory, "By", now); - - export(List.of(jvmMemoryMetricData)); - ObjectPath search = ObjectPath.createFromResponse( - client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search")) + MetricData jvmMemoryMetricData = createDoubleGauge( + TEST_RESOURCE, + Attributes.empty(), + "jvm.memory.total", + Runtime.getRuntime().totalMemory(), + "By", + Clock.getDefault().now() ); - assertThat(search.evaluate("hits.total.value"), equalTo(1)); - var source = search.evaluate("hits.hits.0._source"); - assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now))); - assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now))); - assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class)); - assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").toString(), equalTo(Long.toString(totalMemory))); - assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By")); - assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch")); - assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); - } - public void testGroupingSameGroup() throws Exception { - long now = Clock.getDefault().now(); - MetricData metric1 = createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now); - // uses an equal but not the same resource to test grouping across resourceMetrics - MetricData metric2 = createDoubleGauge(TEST_RESOURCE.toBuilder().build(), Attributes.empty(), "metric2", 42, "By", now); - - export(List.of(metric1, metric2)); - - ObjectPath path = ObjectPath.createFromResponse( - client().performRequest(new Request("GET", "metrics-generic.otel-default/_search")) + FailedExportException.HttpExportException exception = assertThrows( + FailedExportException.HttpExportException.class, + () -> export(List.of(jvmMemoryMetricData)) ); - assertThat(path.evaluate("hits.total.value"), equalTo(1)); - assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0))); - assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch")))); - } - - public void testGroupingDifferentGroup() throws Exception { - long now = Clock.getDefault().now(); - export( - List.of( - createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now), - createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now + TimeUnit.MILLISECONDS.toNanos(1)), - createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "", now), - createDoubleGauge(TEST_RESOURCE, Attributes.of(stringKey("foo"), "bar"), "metric1", 42, "By", now) - ) - ); - ObjectPath path = ObjectPath.createFromResponse( - client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search")) - ); - assertThat(path.evaluate("hits.total.value"), equalTo(4)); - } - - public void testGauge() throws Exception { - long now = Clock.getDefault().now(); - export( - List.of( - createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "double_gauge", 42.0, "By", now), - createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now) - ) - ); - Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); - assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double")); - assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge")); - assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge")); - } - - public void testCounterTemporality() throws Exception { - long now = Clock.getDefault().now(); - export( - List.of( - createCounter(TEST_RESOURCE, Attributes.empty(), "cumulative_counter", 42, "By", now, CUMULATIVE, MONOTONIC), - createCounter(TEST_RESOURCE, Attributes.empty(), "delta_counter", 42, "By", now, DELTA, MONOTONIC) - ) - ); - - Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); - assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter")); - assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge")); - } - - public void testCounterMonotonicity() throws Exception { - long now = Clock.getDefault().now(); - export( - List.of( - createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter", 42, "By", now, CUMULATIVE, NON_MONOTONIC), - createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter_delta", 42, "By", now, DELTA, NON_MONOTONIC) - - ) - ); - - Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge")); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long")); - assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge")); - } - - private static Map getMapping(String target) throws IOException { - Map mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping"))) - .evaluate(""); - assertThat(mappings, aMapWithSize(1)); - Map mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings"); - assertThat(mapping, not(anEmptyMap())); - return mapping; - } - - private static String timestampAsString(long now) { - return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(now)).toString(); + assertThat(exception.getResponse().statusCode(), equalTo(RestStatus.NOT_IMPLEMENTED.getStatus())); } private void export(List metrics) throws Exception { @@ -256,11 +124,7 @@ private void export(List metrics) throws Exception { throw new RuntimeException("Failed to export metrics", failure); } assertThat(result.isSuccess(), is(true)); - refreshMetricsIndices(); - } - - private static void refreshMetricsIndices() throws IOException { - assertOK(client().performRequest(new Request("GET", "metrics-*/_refresh"))); + assertOK(client().performRequest(new Request("GET", "_refresh/metrics-*"))); } private static MetricData createDoubleGauge( @@ -280,62 +144,4 @@ private static MetricData createDoubleGauge( ImmutableGaugeData.create(List.of(ImmutableDoublePointData.create(timeEpochNanos, timeEpochNanos, attributes, value))) ); } - - private static MetricData createLongGauge( - Resource resource, - Attributes attributes, - String name, - long value, - String unit, - long timeEpochNanos - ) { - return ImmutableMetricData.createLongGauge( - resource, - TEST_SCOPE, - name, - "Your description could be here.", - unit, - ImmutableGaugeData.create(List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value))) - ); - } - - private static MetricData createCounter( - Resource resource, - Attributes attributes, - String name, - long value, - String unit, - long timeEpochNanos, - AggregationTemporality temporality, - Monotonicity monotonicity - ) { - return ImmutableMetricData.createLongSum( - resource, - TEST_SCOPE, - name, - "Your description could be here.", - unit, - ImmutableSumData.create( - monotonicity.isMonotonic(), - temporality, - List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value)) - ) - ); - } - - // this is just to enhance readability of the createCounter calls (avoid boolean parameter) - enum Monotonicity { - MONOTONIC(true), - NON_MONOTONIC(false); - - private final boolean monotonic; - - Monotonicity(boolean monotonic) { - this.monotonic = monotonic; - } - - public boolean isMonotonic() { - return monotonic; - } - } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java index ff16e40fbcb36..23f46363d3060 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.oteldata.otlp; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; @@ -50,7 +48,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) { @Override public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) { - return new RestResponse(r.getStatus(), "application/x-protobuf", r.getResponse()); + RestStatus restStatus = r.getStatus(); + return new RestResponse(restStatus, "application/x-protobuf", r.getResponse()); } }) ); @@ -60,13 +59,7 @@ public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) // (a request that does not carry any telemetry data) // the server SHOULD respond with success. // https://opentelemetry.io/docs/specs/otlp/#full-success-1 - return channel -> channel.sendResponse( - new RestResponse( - RestStatus.OK, - "application/x-protobuf", - new BytesArray(ExportMetricsServiceResponse.newBuilder().build().toByteArray()) - ) - ); + return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", new BytesArray(new byte[0]))); } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index e7c42cb643695..722b727fff40f 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -7,12 +7,6 @@ package org.elasticsearch.xpack.oteldata.otlp; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; - -import com.google.protobuf.MessageLite; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -21,17 +15,11 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.CompositeIndicesRequest; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.injection.guice.Inject; @@ -39,14 +27,8 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; -import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MetricDocumentBuilder; -import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; -import java.util.Arrays; /** * Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests. @@ -80,112 +62,7 @@ public OTLPMetricsTransportAction( @Override protected void doExecute(Task task, MetricsRequest request, ActionListener listener) { - BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor(); - DataPointGroupingContext context = new DataPointGroupingContext(byteStringAccessor); - try { - var metricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.exportMetricsServiceRequest.streamInput()); - context.groupDataPoints(metricsServiceRequest); - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - MetricDocumentBuilder metricDocumentBuilder = new MetricDocumentBuilder(byteStringAccessor); - context.consume(dataPointGroup -> addIndexRequest(bulkRequestBuilder, metricDocumentBuilder, dataPointGroup)); - if (bulkRequestBuilder.numberOfActions() == 0) { - if (context.totalDataPoints() == 0) { - // If the server receives an empty request - // (a request that does not carry any telemetry data) - // the server SHOULD respond with success. - // https://opentelemetry.io/docs/specs/otlp/#full-success-1 - listener.onResponse(new MetricsResponse(RestStatus.OK, ExportMetricsServiceResponse.newBuilder().build())); - } else { - // If the processing of the request fails because the request contains data that cannot be decoded - // or is otherwise invalid and such failure is permanent, - // then the server MUST respond with HTTP 400 Bad Request. - // https://opentelemetry.io/docs/specs/otlp/#bad-data - listener.onResponse( - new MetricsResponse( - RestStatus.BAD_REQUEST, - responseWithRejectedDataPoints(context.totalDataPoints(), context.getIgnoredDataPointsMessage()) - ) - ); - } - return; - } - - bulkRequestBuilder.execute(new ActionListener<>() { - @Override - public void onResponse(BulkResponse bulkItemResponses) { - MessageLite response; - // If the request is only partially accepted - // (i.e. when the server accepts only parts of the data and rejects the rest), - // the server MUST respond with HTTP 200 OK. - // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 - RestStatus status = RestStatus.OK; - if (bulkItemResponses.hasFailures() || context.getIgnoredDataPoints() > 0) { - int failures = (int) Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count(); - if (failures == bulkItemResponses.getItems().length) { - // If the processing of the request fails, - // the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. - // https://opentelemetry.io/docs/specs/otlp/#failures-1 - status = RestStatus.INTERNAL_SERVER_ERROR; - } - response = responseWithRejectedDataPoints( - failures + context.getIgnoredDataPoints(), - bulkItemResponses.buildFailureMessage() + context.getIgnoredDataPointsMessage() - ); - } else { - response = ExportMetricsServiceResponse.newBuilder().build(); - } - listener.onResponse(new MetricsResponse(status, response)); - } - - @Override - public void onFailure(Exception e) { - listener.onResponse( - // https://opentelemetry.io/docs/specs/otlp/#failures-1 - // If the processing of the request fails, - // the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. - new MetricsResponse( - RestStatus.INTERNAL_SERVER_ERROR, - responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage()) - ) - ); - } - }); - - } catch (Exception e) { - logger.error("failed to execute otlp metrics request", e); - listener.onResponse( - new MetricsResponse( - RestStatus.INTERNAL_SERVER_ERROR, - responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage()) - ) - ); - } - } - - private static ExportMetricsPartialSuccess responseWithRejectedDataPoints(int rejectedDataPoints, String message) { - return ExportMetricsServiceResponse.newBuilder() - .getPartialSuccessBuilder() - .setRejectedDataPoints(rejectedDataPoints) - .setErrorMessage(message) - .build(); - } - - private void addIndexRequest( - BulkRequestBuilder bulkRequestBuilder, - MetricDocumentBuilder metricDocumentBuilder, - DataPointGroupingContext.DataPointGroup dataPointGroup - ) throws IOException { - try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) { - var dynamicTemplates = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dataPointGroup); - bulkRequestBuilder.add( - new IndexRequest(dataPointGroup.targetIndex()).opType(DocWriteRequest.OpType.CREATE) - .setRequireDataStream(true) - .source(xContentBuilder) - // TODO explicitly set _tsid once https://github.com/elastic/elasticsearch/pull/132566 is merged - // .tsid(DataPointGroupTsidFunnel.forDataPointGroup(dataPointGroup).buildTsid()) - .setDynamicTemplates(dynamicTemplates) - ); - } + listener.onResponse(new MetricsResponse(RestStatus.NOT_IMPLEMENTED, new BytesArray(new byte[0]))); } public static class MetricsRequest extends ActionRequest implements CompositeIndicesRequest { @@ -210,10 +87,6 @@ public static class MetricsResponse extends ActionResponse { private final BytesReference response; private final RestStatus status; - public MetricsResponse(RestStatus status, MessageLite response) { - this(status, new BytesArray(response.toByteArray())); - } - public MetricsResponse(RestStatus status, BytesReference response) { this.response = response; this.status = status; diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java deleted file mode 100644 index 78517c2be8b99..0000000000000 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.oteldata.otlp.docbuilder; - -import io.opentelemetry.proto.common.v1.AnyValue; -import io.opentelemetry.proto.common.v1.InstrumentationScope; -import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.resource.v1.Resource; - -import com.google.protobuf.ByteString; - -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; -import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; -import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * This class constructs an Elasticsearch document representation of a metric data point group. - * It also handles dynamic templates for metrics based on their attributes. - */ -public class MetricDocumentBuilder { - - private final BufferedByteStringAccessor byteStringAccessor; - - public MetricDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) { - this.byteStringAccessor = byteStringAccessor; - } - - public HashMap buildMetricDocument(XContentBuilder builder, DataPointGroupingContext.DataPointGroup dataPointGroup) - throws IOException { - HashMap dynamicTemplates = new HashMap<>(); - List dataPoints = dataPointGroup.dataPoints(); - builder.startObject(); - builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getTimestampUnixNano())); - if (dataPointGroup.getStartTimestampUnixNano() != 0) { - builder.field("start_timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getStartTimestampUnixNano())); - } - buildResource(dataPointGroup.resource(), dataPointGroup.resourceSchemaUrl(), builder); - buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope()); - buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit()); - builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash()); - - long docCount = 0; - builder.startObject("metrics"); - for (int i = 0, dataPointsSize = dataPoints.size(); i < dataPointsSize; i++) { - DataPoint dataPoint = dataPoints.get(i); - builder.field(dataPoint.getMetricName()); - dataPoint.buildMetricValue(builder); - String dynamicTemplate = dataPoint.getDynamicTemplate(); - if (dynamicTemplate != null) { - dynamicTemplates.put("metrics." + dataPoint.getMetricName(), dynamicTemplate); - } - } - builder.endObject(); - if (docCount > 0) { - builder.field("_doc_count", docCount); - } - builder.endObject(); - return dynamicTemplates; - } - - private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException { - builder.startObject("resource"); - addFieldIfNotEmpty(builder, "schema_url", schemaUrl); - if (resource.getDroppedAttributesCount() > 0) { - builder.field("dropped_attributes_count", resource.getDroppedAttributesCount()); - } - builder.startObject("attributes"); - buildAttributes(builder, resource.getAttributesList()); - builder.endObject(); - builder.endObject(); - } - - private void buildScope(XContentBuilder builder, ByteString schemaUrl, InstrumentationScope scope) throws IOException { - builder.startObject("scope"); - addFieldIfNotEmpty(builder, "schema_url", schemaUrl); - if (scope.getDroppedAttributesCount() > 0) { - builder.field("dropped_attributes_count", scope.getDroppedAttributesCount()); - } - addFieldIfNotEmpty(builder, "name", scope.getNameBytes()); - addFieldIfNotEmpty(builder, "version", scope.getVersionBytes()); - builder.startObject("attributes"); - buildAttributes(builder, scope.getAttributesList()); - builder.endObject(); - builder.endObject(); - } - - private void addFieldIfNotEmpty(XContentBuilder builder, String name, ByteString value) throws IOException { - if (value != null && value.isEmpty() == false) { - builder.field(name); - byteStringAccessor.utf8Value(builder, value); - } - } - - private void buildDataPointAttributes(XContentBuilder builder, List attributes, String unit) throws IOException { - builder.startObject("attributes"); - buildAttributes(builder, attributes); - builder.endObject(); - builder.field("unit", unit); - } - - private void buildAttributes(XContentBuilder builder, List attributes) throws IOException { - for (int i = 0, size = attributes.size(); i < size; i++) { - KeyValue attribute = attributes.get(i); - builder.field(attribute.getKey()); - attributeValue(builder, attribute.getValue()); - } - } - - private void attributeValue(XContentBuilder builder, AnyValue value) throws IOException { - switch (value.getValueCase()) { - case STRING_VALUE -> byteStringAccessor.utf8Value(builder, value.getStringValueBytes()); - case BOOL_VALUE -> builder.value(value.getBoolValue()); - case INT_VALUE -> builder.value(value.getIntValue()); - case DOUBLE_VALUE -> builder.value(value.getDoubleValue()); - case ARRAY_VALUE -> { - builder.startArray(); - List valuesList = value.getArrayValue().getValuesList(); - for (int i = 0, valuesListSize = valuesList.size(); i < valuesListSize; i++) { - AnyValue arrayValue = valuesList.get(i); - attributeValue(builder, arrayValue); - } - builder.endArray(); - } - default -> throw new IllegalArgumentException("Unsupported attribute value type: " + value.getValueCase()); - } - } - -} From bc35f6ed0e6376a88f748c668d68db91a61fe209 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 18:27:57 +0200 Subject: [PATCH 05/11] Add unit tests --- .../oteldata/otlp/datapoint/DataPoint.java | 21 +- .../xpack/oteldata/otlp/OtlpUtils.java | 98 ++++++++++ .../DataPointGroupingContextTests.java | 183 ++++++++++++++++++ .../otlp/datapoint/DataPointNumberTests.java | 75 +++++++ 4 files changed, 357 insertions(+), 20 deletions(-) create mode 100644 x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java create mode 100644 x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java create mode 100644 x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java index 864418cbd98d9..b23fe891db944 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java @@ -11,9 +11,6 @@ import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; -import org.elasticsearch.xcontent.XContentBuilder; - -import java.io.IOException; import java.util.List; import java.util.Set; @@ -68,14 +65,6 @@ public interface DataPoint { */ String getMetricName(); - /** - * Builds the metric value for the data point and writes it to the provided XContentBuilder. - * - * @param builder the XContentBuilder to write the metric value to - * @throws IOException if an I/O error occurs while writing to the builder - */ - void buildMetricValue(XContentBuilder builder) throws IOException; - /** * Returns the dynamic template name for the data point based on its type and value. * This is used to dynamically map the appropriate field type according to the data point's characteristics. @@ -87,7 +76,7 @@ public interface DataPoint { /** * Validates whether the data point can be indexed into Elasticsearch. * - * @param messages a set to collect validation error messages + * @param errors a set to collect validation error messages * @return true if the data point is valid, false otherwise */ boolean isValid(Set errors); @@ -119,14 +108,6 @@ public String getMetricName() { return metric.getName(); } - @Override - public void buildMetricValue(XContentBuilder builder) throws IOException { - switch (dataPoint.getValueCase()) { - case AS_DOUBLE -> builder.value(dataPoint.getAsDouble()); - case AS_INT -> builder.value(dataPoint.getAsInt()); - } - } - @Override public String getDynamicTemplate() { String type; diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java new file mode 100644 index 0000000000000..9d453aae4d29f --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.oteldata.otlp; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.proto.resource.v1.Resource; + +import java.util.ArrayList; +import java.util.List; + +public class OtlpUtils { + public static KeyValue createKeyValue(String key, String value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue(value).build()).build(); + } + + private static Resource createResource(List attributes) { + return Resource.newBuilder().addAllAttributes(attributes).build(); + } + + public static ResourceMetrics createResourceMetrics(List attributes, List scopeMetrics) { + return ResourceMetrics.newBuilder().setResource(createResource(attributes)).addAllScopeMetrics(scopeMetrics).build(); + } + + private static InstrumentationScope createScope(String name, String version) { + return InstrumentationScope.newBuilder().setName(name).setVersion(version).build(); + } + + public static ScopeMetrics createScopeMetrics(String name, String version, Iterable metrics) { + return ScopeMetrics.newBuilder().setScope(createScope(name, version)).addAllMetrics(metrics).build(); + } + + public static Metric createGaugeMetric(String name, String unit, List dataPoints) { + return Metric.newBuilder().setName(name).setUnit(unit).setGauge(Gauge.newBuilder().addAllDataPoints(dataPoints).build()).build(); + } + + public static Metric createSumMetric( + String name, + String unit, + List dataPoints, + boolean isMonotonic, + AggregationTemporality temporality + ) { + return Metric.newBuilder() + .setName(name) + .setUnit(unit) + .setSum( + Sum.newBuilder().addAllDataPoints(dataPoints).setIsMonotonic(isMonotonic).setAggregationTemporality(temporality).build() + ) + .build(); + } + + public static NumberDataPoint createDoubleDataPoint(long timestamp, List attributes, double value) { + return NumberDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(timestamp) + .addAllAttributes(attributes) + .setAsDouble(value) + .build(); + } + + public static NumberDataPoint createLongDataPoint(long timestamp, List attributes, long value) { + return NumberDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(timestamp) + .addAllAttributes(attributes) + .setAsInt(value) + .build(); + } + + public static ExportMetricsServiceRequest createMetricsRequest(List metrics) { + + List resourceMetrics = new ArrayList<>(); + for (Metric metric : metrics) { + resourceMetrics.add( + createResourceMetrics( + List.of(createKeyValue("service.name", "test-service")), + List.of(createScopeMetrics("test", "1.0.0", List.of(metric))) + ) + ); + } + + return ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(resourceMetrics).build(); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java new file mode 100644 index 0000000000000..3d25e0ab74c73 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createKeyValue; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createMetricsRequest; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createResourceMetrics; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; + +public class DataPointGroupingContextTests extends ESTestCase { + + private final DataPointGroupingContext context = new DataPointGroupingContext(new BufferedByteStringAccessor()); + private final long nowUnixNanos = System.currentTimeMillis() * 1_000_000L; + + public void testGroupingSameGroup() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 42.0))), + createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 84.5))), + createSumMetric( + "http.requests.count", + "", + List.of(createLongDataPoint(nowUnixNanos, List.of(), 100L)), + true, + AGGREGATION_TEMPORALITY_CUMULATIVE + ) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(3, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(1, groupCount.get()); + } + + public void testGroupingDifferentGroupUnit() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 0.42))), + createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentResource() throws Exception { + ResourceMetrics resource1 = createResourceMetrics( + List.of(createKeyValue("service.name", "test-service_1")), + List.of( + createScopeMetrics( + "test", + "1.0.0", + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 0.42)))) + ) + ) + ); + ResourceMetrics resource2 = createResourceMetrics( + List.of(createKeyValue("service.name", "test-service_2")), + List.of( + createScopeMetrics( + "test", + "1.0.0", + List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L)))) + ) + ) + ); + + context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentScope() throws Exception { + ResourceMetrics resource1 = createResourceMetrics( + List.of(createKeyValue("service.name", "test-service")), + List.of( + createScopeMetrics( + "test_scope_1", + "1.0.0", + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 0.42)))) + ) + ) + ); + ResourceMetrics resource2 = createResourceMetrics( + List.of(createKeyValue("service.name", "test-service")), + List.of( + createScopeMetrics( + "test_scope_2", + "1.0.0", + List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L)))) + ) + ) + ); + + context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentGroupTimestamp() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of(), 0.42))), + createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentGroupAttributes() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric( + "system.cpu.usage", + "", + List.of(createDoubleDataPoint(nowUnixNanos, List.of(createKeyValue("core", "cpu0")), 0.42)) + ), + createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + // Helper methods to create OpenTelemetry proto objects + +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java new file mode 100644 index 0000000000000..33a23c03f0a54 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import org.elasticsearch.test.ESTestCase; + +import java.util.List; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; +import static org.hamcrest.Matchers.equalTo; + +public class DataPointNumberTests extends ESTestCase { + + private final long nowUnixNanos = System.currentTimeMillis() * 1_000_000L; + + public void testGauge() { + DataPoint.Number doubleGauge = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createGaugeMetric("system.cpu.usage", "", List.of()) + ); + assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double")); + DataPoint.Number longGauge = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of(), 42L), + createGaugeMetric("system.cpu.usage", "", List.of()) + ); + assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long")); + } + + public void testCounterTemporality() { + DataPoint.Number doubleCumulative = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE) + ); + assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double")); + DataPoint.Number longCumulative = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of(), 42L), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE) + ); + assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long")); + DataPoint.Number doubleDelta = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA) + ); + assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double")); + DataPoint.Number longDelta = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of(), 42L), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA) + ); + assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long")); + } + + public void testCounterNonMonotonic() { + DataPoint.Number doubleNonMonotonic = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE) + ); + assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double")); + DataPoint.Number longNonMonotonic = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of(), 42L), + createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA) + ); + assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long")); + } + +} From dcfa5b6ec785808c13d3a1118cb92ab6e349ce8d Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 18:46:54 +0200 Subject: [PATCH 06/11] Add unit tests for BufferedByteStringAccessor --- .../BufferedByteStringAccessorTests.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java new file mode 100644 index 0000000000000..a028cabd1948c --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.proto; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import static org.hamcrest.Matchers.equalTo; + +public class BufferedByteStringAccessorTests extends ESTestCase { + + private final BufferedByteStringAccessor accessor = new BufferedByteStringAccessor(); + + public void testAddStringDimension() { + String value = randomUnicodeOfLengthBetween(10, 1000); + TsidBuilder byteStringTsidBuilder = new TsidBuilder(); + accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8(value)); + TsidBuilder basicTsidBuilder = new TsidBuilder(); + basicTsidBuilder.addStringDimension("test", value); + assertThat(byteStringTsidBuilder.hash(), equalTo(basicTsidBuilder.hash())); + } + + public void testUtf8Value() throws Exception { + String value = randomUnicodeOfLengthBetween(10, 1000); + ByteString byteString = ByteString.copyFromUtf8(value); + String json; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.field("value"); + accessor.utf8Value(builder, byteString); + builder.endObject(); + json = Strings.toString(builder); + } + + assertThat(XContentHelper.convertToMap(JsonXContent.jsonXContent, json, false).get("value"), equalTo(value)); + } + +} From 2f7bcd3e1405592b83422ab2ce5f9f38a089f9c5 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 19:18:30 +0200 Subject: [PATCH 07/11] Add unit tests for AttributeListTsidFunnel --- .../xpack/oteldata/otlp/OtlpUtils.java | 43 +++++++++++++- .../DataPointGroupingContextTests.java | 12 ++-- .../tsid/AttributeListTsidFunnelTests.java | 59 +++++++++++++++++++ 3 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java index 9d453aae4d29f..47d2383f85c10 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java @@ -8,8 +8,10 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; import io.opentelemetry.proto.metrics.v1.AggregationTemporality; import io.opentelemetry.proto.metrics.v1.Gauge; import io.opentelemetry.proto.metrics.v1.Metric; @@ -20,13 +22,50 @@ import io.opentelemetry.proto.resource.v1.Resource; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class OtlpUtils { - public static KeyValue createKeyValue(String key, String value) { + + public static KeyValueList keyValueList(KeyValue... values) { + return KeyValueList.newBuilder().addAllValues(List.of(values)).build(); + } + + public static KeyValue keyValue(String key, String value) { return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue(value).build()).build(); } + public static KeyValue keyValue(String key, String... values) { + return KeyValue.newBuilder() + .setKey(key) + .setValue( + AnyValue.newBuilder() + .setArrayValue( + ArrayValue.newBuilder() + .addAllValues(Arrays.stream(values).map(v -> AnyValue.newBuilder().setStringValue(v).build()).toList()) + .build() + ) + .build() + ) + .build(); + } + + public static KeyValue keyValue(String key, long value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setIntValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, double value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setDoubleValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, boolean value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setBoolValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, KeyValueList keyValueList) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setKvlistValue(keyValueList).build()).build(); + } + private static Resource createResource(List attributes) { return Resource.newBuilder().addAllAttributes(attributes).build(); } @@ -87,7 +126,7 @@ public static ExportMetricsServiceRequest createMetricsRequest(List metr for (Metric metric : metrics) { resourceMetrics.add( createResourceMetrics( - List.of(createKeyValue("service.name", "test-service")), + List.of(keyValue("service.name", "test-service")), List.of(createScopeMetrics("test", "1.0.0", List.of(metric))) ) ); diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 3d25e0ab74c73..8dd10f353b350 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -19,12 +19,12 @@ import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; -import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createKeyValue; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createMetricsRequest; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createResourceMetrics; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; public class DataPointGroupingContextTests extends ESTestCase { @@ -76,7 +76,7 @@ public void testGroupingDifferentGroupUnit() throws Exception { public void testGroupingDifferentResource() throws Exception { ResourceMetrics resource1 = createResourceMetrics( - List.of(createKeyValue("service.name", "test-service_1")), + List.of(keyValue("service.name", "test-service_1")), List.of( createScopeMetrics( "test", @@ -86,7 +86,7 @@ public void testGroupingDifferentResource() throws Exception { ) ); ResourceMetrics resource2 = createResourceMetrics( - List.of(createKeyValue("service.name", "test-service_2")), + List.of(keyValue("service.name", "test-service_2")), List.of( createScopeMetrics( "test", @@ -108,7 +108,7 @@ public void testGroupingDifferentResource() throws Exception { public void testGroupingDifferentScope() throws Exception { ResourceMetrics resource1 = createResourceMetrics( - List.of(createKeyValue("service.name", "test-service")), + List.of(keyValue("service.name", "test-service")), List.of( createScopeMetrics( "test_scope_1", @@ -118,7 +118,7 @@ public void testGroupingDifferentScope() throws Exception { ) ); ResourceMetrics resource2 = createResourceMetrics( - List.of(createKeyValue("service.name", "test-service")), + List.of(keyValue("service.name", "test-service")), List.of( createScopeMetrics( "test_scope_2", @@ -163,7 +163,7 @@ public void testGroupingDifferentGroupAttributes() throws Exception { createGaugeMetric( "system.cpu.usage", "", - List.of(createDoubleDataPoint(nowUnixNanos, List.of(createKeyValue("core", "cpu0")), 0.42)) + List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0")), 0.42)) ), createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) ) diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java new file mode 100644 index 0000000000000..5db5517356481 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValueList; + +public class AttributeListTsidFunnelTests extends ESTestCase { + + private final TsidBuilder plainBuilder = new TsidBuilder(); + private final TsidBuilder funnelBuilder = new TsidBuilder(); + private final BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor(); + private final AttributeListTsidFunnel funnel = AttributeListTsidFunnel.get(byteStringAccessor, "attributes."); + + public void testSimpleAttributes() { + funnel.add( + List.of( + keyValue("string", "value"), + keyValue("array", "value1", "value2"), + keyValue("bool", true), + keyValue("double", 1.5), + keyValue("long", 42L), + keyValue("int", 42) + ), + funnelBuilder + ); + plainBuilder.addStringDimension("attributes.string", "value"); + plainBuilder.addStringDimension("attributes.array", "value1"); + plainBuilder.addStringDimension("attributes.array", "value2"); + plainBuilder.addBooleanDimension("attributes.bool", true); + plainBuilder.addDoubleDimension("attributes.double", 1.5); + plainBuilder.addLongDimension("attributes.long", 42); + plainBuilder.addIntDimension("attributes.int", 42); + assertEquals(plainBuilder.hash(), funnelBuilder.hash()); + } + + public void testNestedAttributes() { + funnel.add( + List.of(keyValue("foo", "bar"), keyValue("nested", keyValueList(keyValue("string", "value"), keyValue("int", 42)))), + funnelBuilder + ); + plainBuilder.addStringDimension("attributes.nested.string", "value"); + plainBuilder.addLongDimension("attributes.nested.int", 42); + plainBuilder.addStringDimension("attributes.foo", "bar"); + assertEquals(plainBuilder.hash(), funnelBuilder.hash()); + } + +} From 27ee86e703267eba76ad275b96e536f772b723d2 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 19:30:22 +0200 Subject: [PATCH 08/11] Address review comments --- .../proto/BufferedByteStringAccessor.java | 3 ++ .../xpack/oteldata/otlp/OtlpUtils.java | 11 +++++--- .../DataPointGroupingContextTests.java | 28 +++++++++---------- .../otlp/datapoint/DataPointNumberTests.java | 16 +++++------ .../BufferedByteStringAccessorTests.java | 13 ++++++--- 5 files changed, 40 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java index 747dfc0187859..4ef2828670fc3 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java @@ -35,6 +35,9 @@ public class BufferedByteStringAccessor { */ public void addStringDimension(TsidBuilder tsidBuilder, String dimension, ByteString value) { if (value.isEmpty()) { + // Ignoring invalid values + // According to the spec https://opentelemetry.io/docs/specs/otel/common/#attribute: + // The attribute key MUST be a non-null and non-empty string. return; } tsidBuilder.addStringDimension(dimension, toBytes(value), 0, value.size()); diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java index 47d2383f85c10..ef08aee6a2082 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java @@ -25,6 +25,9 @@ import java.util.Arrays; import java.util.List; +import static org.elasticsearch.test.ESTestCase.randomDouble; +import static org.elasticsearch.test.ESTestCase.randomLong; + public class OtlpUtils { public static KeyValueList keyValueList(KeyValue... values) { @@ -102,21 +105,21 @@ public static Metric createSumMetric( .build(); } - public static NumberDataPoint createDoubleDataPoint(long timestamp, List attributes, double value) { + public static NumberDataPoint createDoubleDataPoint(long timestamp, List attributes) { return NumberDataPoint.newBuilder() .setTimeUnixNano(timestamp) .setStartTimeUnixNano(timestamp) .addAllAttributes(attributes) - .setAsDouble(value) + .setAsDouble(randomDouble()) .build(); } - public static NumberDataPoint createLongDataPoint(long timestamp, List attributes, long value) { + public static NumberDataPoint createLongDataPoint(long timestamp, List attributes) { return NumberDataPoint.newBuilder() .setTimeUnixNano(timestamp) .setStartTimeUnixNano(timestamp) .addAllAttributes(attributes) - .setAsInt(value) + .setAsInt(randomLong()) .build(); } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 8dd10f353b350..9cd0cd5122716 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -35,12 +35,12 @@ public void testGroupingSameGroup() throws Exception { // Group data points ExportMetricsServiceRequest metricsRequest = createMetricsRequest( List.of( - createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 42.0))), - createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 84.5))), + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))), + createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))), createSumMetric( "http.requests.count", "", - List.of(createLongDataPoint(nowUnixNanos, List.of(), 100L)), + List.of(createLongDataPoint(nowUnixNanos, List.of())), true, AGGREGATION_TEMPORALITY_CUMULATIVE ) @@ -60,8 +60,8 @@ public void testGroupingDifferentGroupUnit() throws Exception { // Group data points ExportMetricsServiceRequest metricsRequest = createMetricsRequest( List.of( - createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 0.42))), - createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) + createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))), + createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of()))) ) ); context.groupDataPoints(metricsRequest); @@ -81,7 +81,7 @@ public void testGroupingDifferentResource() throws Exception { createScopeMetrics( "test", "1.0.0", - List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 0.42)))) + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of())))) ) ) ); @@ -91,7 +91,7 @@ public void testGroupingDifferentResource() throws Exception { createScopeMetrics( "test", "1.0.0", - List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L)))) + List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))) ) ) ); @@ -113,7 +113,7 @@ public void testGroupingDifferentScope() throws Exception { createScopeMetrics( "test_scope_1", "1.0.0", - List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(), 0.42)))) + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of())))) ) ) ); @@ -123,7 +123,7 @@ public void testGroupingDifferentScope() throws Exception { createScopeMetrics( "test_scope_2", "1.0.0", - List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L)))) + List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))) ) ) ); @@ -142,8 +142,8 @@ public void testGroupingDifferentGroupTimestamp() throws Exception { // Group data points ExportMetricsServiceRequest metricsRequest = createMetricsRequest( List.of( - createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of(), 0.42))), - createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of()))), + createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))) ) ); context.groupDataPoints(metricsRequest); @@ -163,9 +163,9 @@ public void testGroupingDifferentGroupAttributes() throws Exception { createGaugeMetric( "system.cpu.usage", "", - List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0")), 0.42)) + List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0")))) ), - createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of(), 42L))) + createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))) ) ); context.groupDataPoints(metricsRequest); @@ -178,6 +178,4 @@ public void testGroupingDifferentGroupAttributes() throws Exception { assertEquals(2, groupCount.get()); } - // Helper methods to create OpenTelemetry proto objects - } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java index 33a23c03f0a54..46dcc1ca65de6 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java @@ -25,12 +25,12 @@ public class DataPointNumberTests extends ESTestCase { public void testGauge() { DataPoint.Number doubleGauge = new DataPoint.Number( - createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createDoubleDataPoint(nowUnixNanos, List.of()), createGaugeMetric("system.cpu.usage", "", List.of()) ); assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double")); DataPoint.Number longGauge = new DataPoint.Number( - createLongDataPoint(nowUnixNanos, List.of(), 42L), + createLongDataPoint(nowUnixNanos, List.of()), createGaugeMetric("system.cpu.usage", "", List.of()) ); assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long")); @@ -38,22 +38,22 @@ public void testGauge() { public void testCounterTemporality() { DataPoint.Number doubleCumulative = new DataPoint.Number( - createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createDoubleDataPoint(nowUnixNanos, List.of()), createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE) ); assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double")); DataPoint.Number longCumulative = new DataPoint.Number( - createLongDataPoint(nowUnixNanos, List.of(), 42L), + createLongDataPoint(nowUnixNanos, List.of()), createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE) ); assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long")); DataPoint.Number doubleDelta = new DataPoint.Number( - createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createDoubleDataPoint(nowUnixNanos, List.of()), createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA) ); assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double")); DataPoint.Number longDelta = new DataPoint.Number( - createLongDataPoint(nowUnixNanos, List.of(), 42L), + createLongDataPoint(nowUnixNanos, List.of()), createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA) ); assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long")); @@ -61,12 +61,12 @@ public void testCounterTemporality() { public void testCounterNonMonotonic() { DataPoint.Number doubleNonMonotonic = new DataPoint.Number( - createDoubleDataPoint(nowUnixNanos, List.of(), 42.0), + createDoubleDataPoint(nowUnixNanos, List.of()), createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE) ); assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double")); DataPoint.Number longNonMonotonic = new DataPoint.Number( - createLongDataPoint(nowUnixNanos, List.of(), 42L), + createLongDataPoint(nowUnixNanos, List.of()), createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA) ); assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long")); diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java index a028cabd1948c..620e4027cd44c 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java @@ -23,7 +23,7 @@ public class BufferedByteStringAccessorTests extends ESTestCase { private final BufferedByteStringAccessor accessor = new BufferedByteStringAccessor(); public void testAddStringDimension() { - String value = randomUnicodeOfLengthBetween(10, 1000); + String value = randomUnicodeOfLengthBetween(1, 1000); TsidBuilder byteStringTsidBuilder = new TsidBuilder(); accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8(value)); TsidBuilder basicTsidBuilder = new TsidBuilder(); @@ -31,14 +31,19 @@ public void testAddStringDimension() { assertThat(byteStringTsidBuilder.hash(), equalTo(basicTsidBuilder.hash())); } + public void testAddEmptyStringDimension() { + TsidBuilder byteStringTsidBuilder = new TsidBuilder(); + accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8("")); + assertThat(byteStringTsidBuilder.size(), equalTo(0)); + } + public void testUtf8Value() throws Exception { - String value = randomUnicodeOfLengthBetween(10, 1000); - ByteString byteString = ByteString.copyFromUtf8(value); + String value = randomUnicodeOfLengthBetween(0, 1000); String json; try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); builder.field("value"); - accessor.utf8Value(builder, byteString); + accessor.utf8Value(builder, ByteString.copyFromUtf8(value)); builder.endObject(); json = Strings.toString(builder); } From c0ae9d753f6354dbf37f1ad834576a14c580d414 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 29 Aug 2025 17:39:45 +0000 Subject: [PATCH 09/11] [CI] Auto commit changes from spotless --- .../otlp/datapoint/DataPointGroupingContextTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 9cd0cd5122716..9667684cbb81d 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -160,11 +160,7 @@ public void testGroupingDifferentGroupAttributes() throws Exception { // Group data points ExportMetricsServiceRequest metricsRequest = createMetricsRequest( List.of( - createGaugeMetric( - "system.cpu.usage", - "", - List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0")))) - ), + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0"))))), createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))) ) ); From 0241e48e5685d30d2749c9a4d800eb63c6de1ad8 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 19:45:14 +0200 Subject: [PATCH 10/11] Inline and remove unnecessary DataPointDimensionsTsidFunnel --- .../tsid/DataPointDimensionsTsidFunnel.java | 32 ------------------- .../otlp/tsid/DataPointTsidFunnel.java | 3 +- 2 files changed, 2 insertions(+), 33 deletions(-) delete mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java deleted file mode 100644 index a681026849576..0000000000000 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointDimensionsTsidFunnel.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.oteldata.otlp.tsid; - -import org.elasticsearch.cluster.routing.TsidBuilder; -import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; -import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; -import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; - -class DataPointDimensionsTsidFunnel implements TsidFunnel { - - private final BufferedByteStringAccessor byteStringAccessor; - - private DataPointDimensionsTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { - this.byteStringAccessor = byteStringAccessor; - } - - static DataPointDimensionsTsidFunnel get(BufferedByteStringAccessor byteStringAccessor) { - return new DataPointDimensionsTsidFunnel(byteStringAccessor); - } - - @Override - public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) { - tsidBuilder.add(dataPoint.getAttributes(), AttributeListTsidFunnel.get(byteStringAccessor, "attributes.")); - tsidBuilder.addStringDimension("unit", dataPoint.getUnit()); - } -} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java index 09aba608aa998..5543b41e15863 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java @@ -28,6 +28,7 @@ public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAcce @Override public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) { - tsidBuilder.add(dataPoint, DataPointDimensionsTsidFunnel.get(byteStringAccessor)); + tsidBuilder.add(dataPoint.getAttributes(), AttributeListTsidFunnel.get(byteStringAccessor, "attributes.")); + tsidBuilder.addStringDimension("unit", dataPoint.getUnit()); } } From 744ff192236f6b23838bfee305bbba96d4cd8e3d Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 29 Aug 2025 19:45:59 +0200 Subject: [PATCH 11/11] Remove scope schema_url and version from grouping This is in line with that the ES exporter is doing and these fields aren't mapped as a dimension anyway --- .../elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java index 47bcbb8c15f11..53b4d29307bef 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java @@ -34,8 +34,6 @@ public static TsidBuilder forScope(BufferedByteStringAccessor byteStringAccessor public void add(ScopeMetrics scopeMetrics, TsidBuilder tsidBuilder) { List resourceAttributes = scopeMetrics.getScope().getAttributesList(); byteStringAccessor.addStringDimension(tsidBuilder, "scope.name", scopeMetrics.getScope().getNameBytes()); - byteStringAccessor.addStringDimension(tsidBuilder, "scope.schema_url", scopeMetrics.getSchemaUrlBytes()); tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "scope.attributes.")); - byteStringAccessor.addStringDimension(tsidBuilder, "scope.version", scopeMetrics.getScope().getVersionBytes()); } }