diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java b/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java index e4e2a438335d9..210612d7737a0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java @@ -207,7 +207,6 @@ public TsidBuilder addAll(TsidBuilder other) { * @throws IllegalArgumentException if no dimensions have been added */ public MurmurHash3.Hash128 hash() { - throwIfEmpty(); Collections.sort(dimensions); murmur3Hasher.reset(); for (Dimension dim : dimensions) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/TsidBuilderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/TsidBuilderTests.java index d1d4c174af108..726b02c268aeb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/TsidBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/TsidBuilderTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster.routing; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.hash.MurmurHash3; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.Text; @@ -115,11 +116,9 @@ public void testAddAllWithNullOrEmpty() { } public void testExceptionWhenNoDimensions() { - // Test that exception is thrown when no dimensions are added TsidBuilder builder = TsidBuilder.newBuilder(); - IllegalArgumentException hashException = expectThrows(IllegalArgumentException.class, builder::hash); - assertTrue(hashException.getMessage().contains("Dimensions are empty")); + assertThat(builder.hash(), equalTo(new MurmurHash3.Hash128())); IllegalArgumentException tsidException = expectThrows(IllegalArgumentException.class, builder::buildTsid); assertTrue(tsidException.getMessage().contains("Dimensions are empty")); diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index a3c3c59c6d28e..3dcc22e3229bb 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -15,6 +15,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -35,6 +36,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; @@ -47,6 +49,7 @@ import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; +import java.util.Map; /** * Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests. @@ -126,11 +129,13 @@ private void addIndexRequest( DataPointGroupingContext.DataPointGroup dataPointGroup ) throws IOException { try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) { - var dynamicTemplates = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dataPointGroup); + Map dynamicTemplates = Maps.newHashMapWithExpectedSize(dataPointGroup.dataPoints().size()); + BytesRef tsid = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dynamicTemplates, dataPointGroup); bulkRequestBuilder.add( new IndexRequest(dataPointGroup.targetIndex().index()).opType(DocWriteRequest.OpType.CREATE) .setRequireDataStream(true) .source(xContentBuilder) + .tsid(tsid) .setDynamicTemplates(dynamicTemplates) ); } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java index e20edb7a5092a..3dad2dfcfd945 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -128,7 +128,7 @@ private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) Hash128 resourceHash = resourceTsidBuilder.hash(); ResourceGroup resourceGroup = resourceGroups.get(resourceHash); if (resourceGroup == null) { - resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes()); + resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes(), resourceTsidBuilder); resourceGroups.put(resourceHash, resourceGroup); } return resourceGroup; @@ -137,20 +137,23 @@ private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) class ResourceGroup { private final Resource resource; private final ByteString resourceSchemaUrl; + private final TsidBuilder resourceTsidBuilder; private final Map scopes; - ResourceGroup(Resource resource, ByteString resourceSchemaUrl) { + ResourceGroup(Resource resource, ByteString resourceSchemaUrl, TsidBuilder resourceTsidBuilder) { this.resource = resource; this.resourceSchemaUrl = resourceSchemaUrl; + this.resourceTsidBuilder = resourceTsidBuilder; this.scopes = new HashMap<>(); } public ScopeGroup getOrCreateScope(ScopeMetrics scopeMetrics) { TsidBuilder scopeTsidBuilder = ScopeTsidFunnel.forScope(byteStringAccessor, scopeMetrics); Hash128 scopeHash = scopeTsidBuilder.hash(); + scopeTsidBuilder.addAll(resourceTsidBuilder); ScopeGroup scopeGroup = scopes.get(scopeHash); if (scopeGroup == null) { - scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes()); + scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes(), scopeTsidBuilder); scopes.put(scopeHash, scopeGroup); } return scopeGroup; @@ -169,15 +172,17 @@ class ScopeGroup { private final ResourceGroup resourceGroup; private final InstrumentationScope scope; private final ByteString scopeSchemaUrl; + private final TsidBuilder scopeTsidBuilder; @Nullable private final String receiverName; // index -> timestamp -> dataPointGroupHash -> DataPointGroup private final Map>> dataPointGroupsByIndexAndTimestamp; - ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) { + ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl, TsidBuilder scopeTsidBuilder) { this.resourceGroup = resourceGroup; this.scope = scope; this.scopeSchemaUrl = scopeSchemaUrl; + this.scopeTsidBuilder = scopeTsidBuilder; this.dataPointGroupsByIndexAndTimestamp = new HashMap<>(); this.receiverName = extractReceiverName(scope); } @@ -216,8 +221,13 @@ public void addDataPoint(DataPoint dataPoint) { } private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) { - TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(byteStringAccessor, dataPoint); + TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint( + byteStringAccessor, + dataPoint, + scopeTsidBuilder.size() + ); Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash(); + dataPointGroupTsidBuilder.addAll(scopeTsidBuilder); // in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano()); TargetIndex targetIndex = TargetIndex.evaluate( @@ -236,6 +246,7 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) { resourceGroup.resourceSchemaUrl, scope, scopeSchemaUrl, + dataPointGroupTsidBuilder, dataPoint.getAttributes(), dataPoint.getUnit(), targetIndex @@ -261,6 +272,7 @@ public static final class DataPointGroup { private final ByteString resourceSchemaUrl; private final InstrumentationScope scope; private final ByteString scopeSchemaUrl; + private final TsidBuilder tsidBuilder; private final List dataPointAttributes; private final String unit; private final Set metricNames = new HashSet<>(); @@ -273,6 +285,7 @@ public DataPointGroup( ByteString resourceSchemaUrl, InstrumentationScope scope, ByteString scopeSchemaUrl, + TsidBuilder tsidBuilder, List dataPointAttributes, String unit, TargetIndex targetIndex @@ -281,6 +294,7 @@ public DataPointGroup( this.resourceSchemaUrl = resourceSchemaUrl; this.scope = scope; this.scopeSchemaUrl = scopeSchemaUrl; + this.tsidBuilder = tsidBuilder; this.dataPointAttributes = dataPointAttributes; this.unit = unit; this.targetIndex = targetIndex; @@ -334,6 +348,10 @@ public ByteString scopeSchemaUrl() { return scopeSchemaUrl; } + public TsidBuilder tsidBuilder() { + return tsidBuilder; + } + public List dataPointAttributes() { return dataPointAttributes; } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java index 9d3023faa1568..258c209f091e2 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java @@ -14,6 +14,8 @@ import com.google.protobuf.ByteString; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.routing.TsidBuilder; import org.elasticsearch.common.Strings; import org.elasticsearch.common.hash.BufferedMurmur3Hasher; import org.elasticsearch.xcontent.XContentBuilder; @@ -23,8 +25,8 @@ import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -40,9 +42,11 @@ public MetricDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) { this.byteStringAccessor = byteStringAccessor; } - public HashMap buildMetricDocument(XContentBuilder builder, DataPointGroupingContext.DataPointGroup dataPointGroup) - throws IOException { - HashMap dynamicTemplates = new HashMap<>(); + public BytesRef buildMetricDocument( + XContentBuilder builder, + Map dynamicTemplates, + DataPointGroupingContext.DataPointGroup dataPointGroup + ) throws IOException { List dataPoints = dataPointGroup.dataPoints(); builder.startObject(); builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getTimestampUnixNano())); @@ -53,7 +57,8 @@ public HashMap buildMetricDocument(XContentBuilder builder, Data buildDataStream(builder, dataPointGroup.targetIndex()); buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope()); buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit()); - builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash(hasher)); + String metricNamesHash = dataPointGroup.getMetricNamesHash(hasher); + builder.field("_metric_names_hash", metricNamesHash); long docCount = 0; builder.startObject("metrics"); @@ -75,7 +80,9 @@ public HashMap buildMetricDocument(XContentBuilder builder, Data builder.field("_doc_count", docCount); } builder.endObject(); - return dynamicTemplates; + TsidBuilder tsidBuilder = dataPointGroup.tsidBuilder(); + tsidBuilder.addStringDimension("_metric_names_hash", metricNamesHash); + return tsidBuilder.buildTsid(); } private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException { diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java index 5543b41e15863..787a7d41c27d4 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java @@ -14,14 +14,17 @@ public class DataPointTsidFunnel implements TsidFunnel { + // for "unit" and "_metric_names_hash" that will be added in + // MetricDocumentBuilder once the data point group is complete + private static final int EXTRA_DIMENSIONS_SIZE = 2; private final BufferedByteStringAccessor byteStringAccessor; private DataPointTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { this.byteStringAccessor = byteStringAccessor; } - public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint) { - TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size()); + public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint, int scopeTsidBuilderSize) { + TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size() + scopeTsidBuilderSize + EXTRA_DIMENSIONS_SIZE); new DataPointTsidFunnel(byteStringAccessor).add(dataPoint, tsidBuilder); return tsidBuilder; } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java index 8c12b4c283e69..80662f5c74194 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java @@ -25,7 +25,7 @@ public ResourceTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { } public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAccessor, ResourceMetrics resourceMetrics) { - TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount() + 1); + TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount()); new ResourceTsidFunnel(byteStringAccessor).add(resourceMetrics, tsidBuilder); return tsidBuilder; } @@ -34,6 +34,5 @@ public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAcces public void add(ResourceMetrics resourceMetrics, TsidBuilder tsidBuilder) { List resourceAttributes = resourceMetrics.getResource().getAttributesList(); tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "resource.attributes.")); - byteStringAccessor.addStringDimension(tsidBuilder, "schema_url", resourceMetrics.getSchemaUrlBytes()); } } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java index 51014fbb64a97..317f6954785a6 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java @@ -190,15 +190,13 @@ public static SummaryDataPoint createSummaryDataPoint(long timestamp, List metrics) { + return createMetricsRequest(List.of(keyValue("service.name", "test-service")), metrics); + } + public static ExportMetricsServiceRequest createMetricsRequest(List resourceAttributes, List metrics) { List resourceMetrics = new ArrayList<>(); for (Metric metric : metrics) { - resourceMetrics.add( - createResourceMetrics( - List.of(keyValue("service.name", "test-service")), - List.of(createScopeMetrics("test", "1.0.0", List.of(metric))) - ) - ); + resourceMetrics.add(createResourceMetrics(resourceAttributes, List.of(createScopeMetrics("test", "1.0.0", List.of(metric))))); } return ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(resourceMetrics).build(); diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java index 78bd6efb659a3..541bb6065068d 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java @@ -7,16 +7,20 @@ package org.elasticsearch.xpack.oteldata.otlp.docbuilder; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.metrics.v1.AggregationTemporality; import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; import io.opentelemetry.proto.resource.v1.Resource; -import com.google.protobuf.ByteString; - +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.routing.TsidBuilder; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ObjectPath; @@ -24,16 +28,14 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; -import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; +import org.elasticsearch.xpack.oteldata.otlp.OtlpUtils; import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; -import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; @@ -55,6 +57,7 @@ public class MetricDocumentBuilderTests extends ESTestCase { private final MetricDocumentBuilder documentBuilder = new MetricDocumentBuilder(new BufferedByteStringAccessor()); + private final DataPointGroupingContext dataPointGroupingContext = new DataPointGroupingContext(new BufferedByteStringAccessor()); private final long timestamp = randomLong(); private final long startTimestamp = randomLong(); @@ -63,74 +66,85 @@ public void testBuildMetricDocument() throws IOException { resourceAttributes.add(keyValue("service.name", "test-service")); resourceAttributes.add(keyValue("host.name", "test-host")); Resource resource = Resource.newBuilder().addAllAttributes(resourceAttributes).setDroppedAttributesCount(1).build(); - ByteString resourceSchemaUrl = ByteString.copyFromUtf8("https://opentelemetry.io/schemas/1.0.0"); - InstrumentationScope scope = InstrumentationScope.newBuilder() .setName("test-scope") .setVersion("1.0.0") .setDroppedAttributesCount(2) .addAttributes(keyValue("scope_attr", "value")) .build(); - ByteString scopeSchemaUrl = ByteString.copyFromUtf8("https://opentelemetry.io/schemas/1.0.0"); List dataPointAttributes = List.of(keyValue("operation", "test"), (keyValue("environment", "production"))); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - resourceSchemaUrl, - scope, - scopeSchemaUrl, - dataPointAttributes, + Metric gaugeMetric = createGaugeMetric( + "system.cpu.usage", "{test}", - TargetIndex.defaultMetrics() + List.of(createDoubleDataPoint(timestamp, startTimestamp, dataPointAttributes)) ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Number( - createDoubleDataPoint(timestamp, startTimestamp, dataPointAttributes), - createGaugeMetric("system.cpu.usage", "", List.of()) - ) + Metric sumMetric = createSumMetric( + "system.network.packets", + "{test}", + List.of(createLongDataPoint(timestamp, startTimestamp, dataPointAttributes)), + true, + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Number( - createLongDataPoint(timestamp, startTimestamp, dataPointAttributes), - createSumMetric( - "system.network.packets", - "{test}", - List.of(), - true, - AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + dataPointGroupingContext.groupDataPoints( + ExportMetricsServiceRequest.newBuilder() + .addResourceMetrics( + ResourceMetrics.newBuilder() + .setResource(resource) + .setSchemaUrl("https://opentelemetry.io/schemas/1.0.0") + .addScopeMetrics( + ScopeMetrics.newBuilder() + .setScope(scope) + .setSchemaUrl("https://opentelemetry.io/schemas/1.0.0") + .addMetrics(gaugeMetric) + .addMetrics(sumMetric) + .build() + ) + .build() ) - ) + .build() ); - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - - assertThat(doc.evaluate("@timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(timestamp))); - assertThat(doc.evaluate("start_timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(startTimestamp))); - assertThat(doc.evaluate("data_stream.type"), equalTo("metrics")); - assertThat(doc.evaluate("data_stream.dataset"), equalTo("generic.otel")); - assertThat(doc.evaluate("data_stream.namespace"), equalTo("default")); - assertThat(doc.evaluate("resource.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); - assertThat(doc.evaluate("resource.dropped_attributes_count"), equalTo(1)); - assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("test-service")); - assertThat(doc.evaluate("resource.attributes.host\\.name"), equalTo("test-host")); - assertThat(doc.evaluate("scope.name"), equalTo("test-scope")); - assertThat(doc.evaluate("scope.version"), equalTo("1.0.0")); - assertThat(doc.evaluate("scope.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); - assertThat(doc.evaluate("scope.dropped_attributes_count"), equalTo(2)); - assertThat(doc.evaluate("scope.attributes.scope_attr"), equalTo("value")); - assertThat(doc.evaluate("_metric_names_hash"), isA(String.class)); - assertThat(doc.evaluate("attributes.operation"), equalTo("test")); - assertThat(doc.evaluate("attributes.environment"), equalTo("production")); - assertThat(doc.evaluate("unit"), equalTo("{test}")); - assertThat(doc.evaluate("metrics.system\\.cpu\\.usage"), isA(Number.class)); - assertThat(doc.evaluate("metrics.system\\.network\\.packets"), isA(Number.class)); - assertThat(dynamicTemplates, hasEntry("metrics.system.cpu.usage", "gauge_double")); - assertThat(dynamicTemplates, hasEntry("metrics.system.network.packets", "counter_long")); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + BytesRef tsid = documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("@timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(timestamp))); + assertThat(doc.evaluate("start_timestamp").longValue(), equalTo(TimeUnit.NANOSECONDS.toMillis(startTimestamp))); + assertThat(doc.evaluate("data_stream.type"), equalTo("metrics")); + assertThat(doc.evaluate("data_stream.dataset"), equalTo("generic.otel")); + assertThat(doc.evaluate("data_stream.namespace"), equalTo("default")); + assertThat(doc.evaluate("resource.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); + assertThat(doc.evaluate("resource.dropped_attributes_count"), equalTo(1)); + assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("test-service")); + assertThat(doc.evaluate("resource.attributes.host\\.name"), equalTo("test-host")); + assertThat(doc.evaluate("scope.name"), equalTo("test-scope")); + assertThat(doc.evaluate("scope.version"), equalTo("1.0.0")); + assertThat(doc.evaluate("scope.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); + assertThat(doc.evaluate("scope.dropped_attributes_count"), equalTo(2)); + assertThat(doc.evaluate("scope.attributes.scope_attr"), equalTo("value")); + assertThat(doc.evaluate("_metric_names_hash"), isA(String.class)); + assertThat(doc.evaluate("attributes.operation"), equalTo("test")); + assertThat(doc.evaluate("attributes.environment"), equalTo("production")); + assertThat(doc.evaluate("unit"), equalTo("{test}")); + assertThat(doc.evaluate("metrics.system\\.cpu\\.usage"), isA(Number.class)); + assertThat(doc.evaluate("metrics.system\\.network\\.packets"), isA(Number.class)); + assertThat(dynamicTemplates, hasEntry("metrics.system.cpu.usage", "gauge_double")); + assertThat(dynamicTemplates, hasEntry("metrics.system.network.packets", "counter_long")); + + TsidBuilder expectedTsidBuilder = new TsidBuilder(); + expectedTsidBuilder.addStringDimension("resource.attributes.service.name", "test-service"); + expectedTsidBuilder.addStringDimension("resource.attributes.host.name", "test-host"); + expectedTsidBuilder.addStringDimension("scope.name", "test-scope"); + expectedTsidBuilder.addStringDimension("scope.attributes.scope_attr", "value"); + expectedTsidBuilder.addStringDimension("_metric_names_hash", doc.evaluate("_metric_names_hash")); + expectedTsidBuilder.addStringDimension("attributes.operation", "test"); + expectedTsidBuilder.addStringDimension("attributes.environment", "production"); + expectedTsidBuilder.addStringDimension("unit", "{test}"); + assertThat(tsid, equalTo(expectedTsidBuilder.buildTsid())); + }); } public void testAttributeTypes() throws IOException { @@ -140,248 +154,183 @@ public void testAttributeTypes() throws IOException { resourceAttributes.add(keyValue("int_attr", 123L)); resourceAttributes.add(keyValue("double_attr", 123.45)); resourceAttributes.add(keyValue("array_attr", "value1", "value2")); - - Resource resource = Resource.newBuilder().addAllAttributes(resourceAttributes).build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() + ExportMetricsServiceRequest metricsRequest = OtlpUtils.createMetricsRequest( + resourceAttributes, + List.of(createGaugeMetric("test.metric", "", List.of(createDoubleDataPoint(timestamp, startTimestamp, List.of())))) ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of())) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - documentBuilder.buildMetricDocument(builder, dataPointGroup); - - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - - assertThat(doc.evaluate("resource.attributes.string_attr"), equalTo("string_value")); - assertThat(doc.evaluate("resource.attributes.bool_attr"), equalTo(true)); - assertThat(doc.evaluate("resource.attributes.int_attr"), equalTo(123)); - assertThat(doc.evaluate("resource.attributes.double_attr"), equalTo(123.45)); - - assertThat(doc.evaluate("resource.attributes.array_attr.0"), equalTo("value1")); - assertThat(doc.evaluate("resource.attributes.array_attr.1"), equalTo("value2")); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("resource.attributes.string_attr"), equalTo("string_value")); + assertThat(doc.evaluate("resource.attributes.bool_attr"), equalTo(true)); + assertThat(doc.evaluate("resource.attributes.int_attr"), equalTo(123)); + assertThat(doc.evaluate("resource.attributes.double_attr"), equalTo(123.45)); + + assertThat(doc.evaluate("resource.attributes.array_attr.0"), equalTo("value1")); + assertThat(doc.evaluate("resource.attributes.array_attr.1"), equalTo("value2")); + }); } public void testEmptyFields() throws IOException { - Resource resource = Resource.newBuilder().build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() + Metric metric = createGaugeMetric("test.metric", "", List.of(createDoubleDataPoint(timestamp, startTimestamp, List.of()))); + ResourceMetrics resourceMetrics = OtlpUtils.createResourceMetrics( + List.of(keyValue("service.name", "test-service")), + List.of(ScopeMetrics.newBuilder().addMetrics(metric).build()) ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of())) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - documentBuilder.buildMetricDocument(builder, dataPointGroup); - - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - // Verify that empty fields are not included - assertThat(doc.evaluate("resource.schema_url"), is(nullValue())); - assertThat(doc.evaluate("resource.dropped_attributes_count"), is(nullValue())); - assertThat(doc.evaluate("scope.name"), is(nullValue())); - assertThat(doc.evaluate("scope.schema_url"), is(nullValue())); - assertThat(doc.evaluate("scope.dropped_attributes_count"), is(nullValue())); - assertThat(doc.evaluate("scope.version"), is(nullValue())); - assertThat(doc.evaluate("unit"), is(nullValue())); + ExportMetricsServiceRequest metricsRequest = ExportMetricsServiceRequest.newBuilder().addResourceMetrics(resourceMetrics).build(); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + // Verify that empty fields are not included + assertThat(doc.evaluate("resource.schema_url"), is(nullValue())); + assertThat(doc.evaluate("resource.dropped_attributes_count"), is(nullValue())); + assertThat(doc.evaluate("scope.name"), is(nullValue())); + assertThat(doc.evaluate("scope.schema_url"), is(nullValue())); + assertThat(doc.evaluate("scope.dropped_attributes_count"), is(nullValue())); + assertThat(doc.evaluate("scope.version"), is(nullValue())); + assertThat(doc.evaluate("unit"), is(nullValue())); + }); } public void testExponentialHistogram() throws Exception { - Resource resource = Resource.newBuilder().build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() - ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.ExponentialHistogram( - ExponentialHistogramDataPoint.newBuilder() - .setTimeUnixNano(timestamp) - .setStartTimeUnixNano(startTimestamp) - .setZeroCount(1) - .setPositive(ExponentialHistogramDataPoint.Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) - .setNegative(ExponentialHistogramDataPoint.Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) - .build(), - createExponentialHistogramMetric("exponential_histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) - ) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + ExponentialHistogramDataPoint dataPoint = ExponentialHistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setZeroCount(1) + .setPositive(ExponentialHistogramDataPoint.Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .setNegative(ExponentialHistogramDataPoint.Buckets.newBuilder().setOffset(0).addAllBucketCounts(List.of(1L, 1L))) + .build(); - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - assertThat(doc.evaluate("metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0))); - assertThat(doc.evaluate("metrics.exponential_histogram.counts"), equalTo(List.of(1, 1, 1, 1, 1))); - assertThat(dynamicTemplates, hasEntry("metrics.exponential_histogram", "histogram")); + ExportMetricsServiceRequest metricsRequest = OtlpUtils.createMetricsRequest( + List.of(createExponentialHistogramMetric("exponential_histogram", "", List.of(dataPoint), AGGREGATION_TEMPORALITY_DELTA)) + ); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("metrics.exponential_histogram.values"), equalTo(List.of(-3.0, -1.5, 0.0, 1.5, 3.0))); + assertThat(doc.evaluate("metrics.exponential_histogram.counts"), equalTo(List.of(1, 1, 1, 1, 1))); + assertThat(dynamicTemplates, hasEntry("metrics.exponential_histogram", "histogram")); + }); } public void testExponentialHistogramAsAggregateMetricDouble() throws Exception { - Resource resource = Resource.newBuilder().build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() - ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.ExponentialHistogram( - ExponentialHistogramDataPoint.newBuilder() - .setTimeUnixNano(timestamp) - .setStartTimeUnixNano(startTimestamp) - .setSum(42) - .setCount(1L) - .addAllAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE)) - .build(), - createExponentialHistogramMetric("histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) - ) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + ExponentialHistogramDataPoint dataPoint = ExponentialHistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setSum(42) + .setCount(1L) + .addAllAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE)) + .build(); - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - assertThat(doc.evaluate("metrics.histogram.sum"), equalTo(42.0)); - assertThat(doc.evaluate("metrics.histogram.value_count"), equalTo(1)); - assertThat(dynamicTemplates, hasEntry("metrics.histogram", "summary")); + ExportMetricsServiceRequest metricsRequest = OtlpUtils.createMetricsRequest( + List.of(createExponentialHistogramMetric("histogram", "", List.of(dataPoint), AGGREGATION_TEMPORALITY_DELTA)) + ); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("metrics.histogram.sum"), equalTo(42.0)); + assertThat(doc.evaluate("metrics.histogram.value_count"), equalTo(1)); + assertThat(dynamicTemplates, hasEntry("metrics.histogram", "summary")); + }); } public void testHistogram() throws Exception { - Resource resource = Resource.newBuilder().build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() - ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Histogram( - HistogramDataPoint.newBuilder() - .setTimeUnixNano(timestamp) - .setStartTimeUnixNano(startTimestamp) - .addBucketCounts(10L) - .addExplicitBounds(5.0) - .build(), - createHistogramMetric("histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) - ) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + HistogramDataPoint dataPoint = HistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .addBucketCounts(10L) + .addExplicitBounds(5.0) + .build(); - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - assertThat(doc.evaluate("metrics.histogram.values"), equalTo(List.of(2.5))); - assertThat(doc.evaluate("metrics.histogram.counts"), equalTo(List.of(10))); - assertThat(dynamicTemplates, hasEntry("metrics.histogram", "histogram")); + ExportMetricsServiceRequest metricsRequest = OtlpUtils.createMetricsRequest( + List.of(createHistogramMetric("histogram", "", List.of(dataPoint), AGGREGATION_TEMPORALITY_DELTA)) + ); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("metrics.histogram.values"), equalTo(List.of(2.5))); + assertThat(doc.evaluate("metrics.histogram.counts"), equalTo(List.of(10))); + assertThat(dynamicTemplates, hasEntry("metrics.histogram", "histogram")); + }); } public void testHistogramAsAggregateMetricDouble() throws Exception { - Resource resource = Resource.newBuilder().build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() - ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Histogram( - HistogramDataPoint.newBuilder() - .setTimeUnixNano(timestamp) - .setStartTimeUnixNano(startTimestamp) - .setSum(42) - .setCount(1L) - .addAllAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE)) - .build(), - createHistogramMetric("histogram", "", List.of(), AGGREGATION_TEMPORALITY_DELTA) - ) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + HistogramDataPoint dataPoint = HistogramDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setSum(42) + .setCount(1L) + .addAllAttributes(mappingHints(MappingHints.AGGREGATE_METRIC_DOUBLE)) + .build(); - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - assertThat(doc.evaluate("metrics.histogram.sum"), equalTo(42.0)); - assertThat(doc.evaluate("metrics.histogram.value_count"), equalTo(1)); - assertThat(dynamicTemplates, hasEntry("metrics.histogram", "summary")); + ExportMetricsServiceRequest metricsRequest = OtlpUtils.createMetricsRequest( + List.of(createHistogramMetric("histogram", "", List.of(dataPoint), AGGREGATION_TEMPORALITY_DELTA)) + ); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("metrics.histogram.sum"), equalTo(42.0)); + assertThat(doc.evaluate("metrics.histogram.value_count"), equalTo(1)); + assertThat(dynamicTemplates, hasEntry("metrics.histogram", "summary")); + }); } public void testSummary() throws Exception { - Resource resource = Resource.newBuilder().build(); - InstrumentationScope scope = InstrumentationScope.newBuilder().build(); - - DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup( - resource, - null, - scope, - null, - List.of(), - "", - TargetIndex.defaultMetrics() - ); - dataPointGroup.addDataPoint( - Set.of(), - new DataPoint.Summary( - SummaryDataPoint.newBuilder() - .setTimeUnixNano(timestamp) - .setStartTimeUnixNano(startTimestamp) - .setCount(1) - .setSum(42.0) - .addAllAttributes(mappingHints(MappingHints.DOC_COUNT)) - .build(), - createSummaryMetric("summary", "", List.of()) - ) - ); - - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); - HashMap dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup); + SummaryDataPoint dataPoint = SummaryDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(startTimestamp) + .setCount(1) + .setSum(42.0) + .addAllAttributes(mappingHints(MappingHints.DOC_COUNT)) + .build(); - ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); - assertThat(doc.evaluate("metrics.summary.sum"), equalTo(42.0)); - assertThat(doc.evaluate("metrics.summary.value_count"), equalTo(1)); - assertThat(doc.evaluate("_doc_count"), equalTo(1)); - assertThat(dynamicTemplates, hasEntry("metrics.summary", "summary")); + ExportMetricsServiceRequest metricsRequest = OtlpUtils.createMetricsRequest( + List.of(createSummaryMetric("summary", "", List.of(dataPoint))) + ); + dataPointGroupingContext.groupDataPoints(metricsRequest); + assertThat(dataPointGroupingContext.totalDataPoints(), equalTo(1)); + dataPointGroupingContext.consume(dataPointGroup -> { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + HashMap dynamicTemplates = new HashMap<>(); + documentBuilder.buildMetricDocument(builder, dynamicTemplates, dataPointGroup); + ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + + assertThat(doc.evaluate("metrics.summary.sum"), equalTo(42.0)); + assertThat(doc.evaluate("metrics.summary.value_count"), equalTo(1)); + assertThat(doc.evaluate("_doc_count"), equalTo(1)); + assertThat(dynamicTemplates, hasEntry("metrics.summary", "summary")); + }); } + }