Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;

import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -65,6 +68,14 @@ public interface DataPoint {
*/
String getMetricName();

/**
* Builds the metric value for the data point and writes it to the provided XContentBuilder.
*
* @param builder the XContentBuilder to write the metric value to
* @throws IOException if an I/O error occurs while writing to the builder
*/
void buildMetricValue(XContentBuilder builder) throws IOException;

/**
* Returns the dynamic template name for the data point based on its type and value.
* This is used to dynamically map the appropriate field type according to the data point's characteristics.
Expand Down Expand Up @@ -108,6 +119,14 @@ public String getMetricName() {
return metric.getName();
}

@Override
public void buildMetricValue(XContentBuilder builder) throws IOException {
switch (dataPoint.getValueCase()) {
case AS_DOUBLE -> builder.value(dataPoint.getAsDouble());
case AS_INT -> builder.value(dataPoint.getAsInt());
}
}

@Override
public String getDynamicTemplate() {
String type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp.docbuilder;

import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.resource.v1.Resource;

import com.google.protobuf.ByteString;

import org.elasticsearch.common.Strings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint;
import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* This class constructs an Elasticsearch document representation of a metric data point group.
* It also handles dynamic templates for metrics based on their attributes.
*/
public class MetricDocumentBuilder {

private final BufferedByteStringAccessor byteStringAccessor;

public MetricDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) {
this.byteStringAccessor = byteStringAccessor;
}

public HashMap<String, String> buildMetricDocument(XContentBuilder builder, DataPointGroupingContext.DataPointGroup dataPointGroup)
throws IOException {
HashMap<String, String> dynamicTemplates = new HashMap<>();
List<DataPoint> dataPoints = dataPointGroup.dataPoints();
builder.startObject();
builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getTimestampUnixNano()));
if (dataPointGroup.getStartTimestampUnixNano() != 0) {
builder.field("start_timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getStartTimestampUnixNano()));
}
buildResource(dataPointGroup.resource(), dataPointGroup.resourceSchemaUrl(), builder);
buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope());
buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit());
builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash());

builder.startObject("metrics");
for (int i = 0, dataPointsSize = dataPoints.size(); i < dataPointsSize; i++) {
DataPoint dataPoint = dataPoints.get(i);
builder.field(dataPoint.getMetricName());
dataPoint.buildMetricValue(builder);
String dynamicTemplate = dataPoint.getDynamicTemplate();
if (dynamicTemplate != null) {
dynamicTemplates.put("metrics." + dataPoint.getMetricName(), dynamicTemplate);
}
}
builder.endObject();
builder.endObject();
return dynamicTemplates;
}

private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException {
builder.startObject("resource");
addFieldIfNotEmpty(builder, "schema_url", schemaUrl);
if (resource.getDroppedAttributesCount() > 0) {
builder.field("dropped_attributes_count", resource.getDroppedAttributesCount());
}
builder.startObject("attributes");
buildAttributes(builder, resource.getAttributesList());
builder.endObject();
builder.endObject();
}

private void buildScope(XContentBuilder builder, ByteString schemaUrl, InstrumentationScope scope) throws IOException {
builder.startObject("scope");
addFieldIfNotEmpty(builder, "schema_url", schemaUrl);
if (scope.getDroppedAttributesCount() > 0) {
builder.field("dropped_attributes_count", scope.getDroppedAttributesCount());
}
addFieldIfNotEmpty(builder, "name", scope.getNameBytes());
addFieldIfNotEmpty(builder, "version", scope.getVersionBytes());
builder.startObject("attributes");
buildAttributes(builder, scope.getAttributesList());
builder.endObject();
builder.endObject();
}

private void addFieldIfNotEmpty(XContentBuilder builder, String name, ByteString value) throws IOException {
if (value != null && value.isEmpty() == false) {
builder.field(name);
byteStringAccessor.utf8Value(builder, value);
}
}

private void buildDataPointAttributes(XContentBuilder builder, List<KeyValue> attributes, String unit) throws IOException {
builder.startObject("attributes");
buildAttributes(builder, attributes);
builder.endObject();
if (Strings.hasLength(unit)) {
builder.field("unit", unit);
}
}

private void buildAttributes(XContentBuilder builder, List<KeyValue> attributes) throws IOException {
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attribute = attributes.get(i);
builder.field(attribute.getKey());
attributeValue(builder, attribute.getValue());
}
}

private void attributeValue(XContentBuilder builder, AnyValue value) throws IOException {
switch (value.getValueCase()) {
case STRING_VALUE -> byteStringAccessor.utf8Value(builder, value.getStringValueBytes());
case BOOL_VALUE -> builder.value(value.getBoolValue());
case INT_VALUE -> builder.value(value.getIntValue());
case DOUBLE_VALUE -> builder.value(value.getDoubleValue());
case ARRAY_VALUE -> {
builder.startArray();
List<AnyValue> valuesList = value.getArrayValue().getValuesList();
for (int i = 0, valuesListSize = valuesList.size(); i < valuesListSize; i++) {
AnyValue arrayValue = valuesList.get(i);
attributeValue(builder, arrayValue);
}
builder.endArray();
}
default -> throw new IllegalArgumentException("Unsupported attribute value type: " + value.getValueCase());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,27 @@ public static Metric createSumMetric(
.build();
}

public static NumberDataPoint createDoubleDataPoint(long timestamp, List<KeyValue> attributes) {
public static NumberDataPoint createDoubleDataPoint(long timestamp) {
return createDoubleDataPoint(timestamp, timestamp, List.of());
}

public static NumberDataPoint createDoubleDataPoint(long timeUnixNano, long startTimeUnixNano, List<KeyValue> attributes) {
return NumberDataPoint.newBuilder()
.setTimeUnixNano(timestamp)
.setStartTimeUnixNano(timestamp)
.setTimeUnixNano(timeUnixNano)
.setStartTimeUnixNano(startTimeUnixNano)
.addAllAttributes(attributes)
.setAsDouble(randomDouble())
.build();
}

public static NumberDataPoint createLongDataPoint(long timestamp, List<KeyValue> attributes) {
public static NumberDataPoint createLongDataPoint(long timestamp) {
return createLongDataPoint(timestamp, timestamp, List.of());
}

public static NumberDataPoint createLongDataPoint(long timeUnixNano, long startTimeUnixNano, List<KeyValue> attributes) {
return NumberDataPoint.newBuilder()
.setTimeUnixNano(timestamp)
.setStartTimeUnixNano(timestamp)
.setTimeUnixNano(timeUnixNano)
.setStartTimeUnixNano(startTimeUnixNano)
.addAllAttributes(attributes)
.setAsInt(randomLong())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ public void testGroupingSameGroup() throws Exception {
// Group data points
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))),
createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))),
createSumMetric(
"http.requests.count",
"",
List.of(createLongDataPoint(nowUnixNanos, List.of())),
List.of(createLongDataPoint(nowUnixNanos)),
true,
AGGREGATION_TEMPORALITY_CUMULATIVE
)
Expand All @@ -60,8 +60,8 @@ public void testGroupingDifferentGroupUnit() throws Exception {
// Group data points
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of())))
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos))),
createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos)))
)
);
context.groupDataPoints(metricsRequest);
Expand All @@ -81,7 +81,7 @@ public void testGroupingDifferentResource() throws Exception {
createScopeMetrics(
"test",
"1.0.0",
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))))
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))))
)
)
);
Expand All @@ -91,7 +91,7 @@ public void testGroupingDifferentResource() throws Exception {
createScopeMetrics(
"test",
"1.0.0",
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))))
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos))))
)
)
);
Expand All @@ -113,7 +113,7 @@ public void testGroupingDifferentScope() throws Exception {
createScopeMetrics(
"test_scope_1",
"1.0.0",
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))))
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))))
)
)
);
Expand All @@ -123,7 +123,7 @@ public void testGroupingDifferentScope() throws Exception {
createScopeMetrics(
"test_scope_2",
"1.0.0",
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))))
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos))))
)
)
);
Expand All @@ -142,8 +142,8 @@ public void testGroupingDifferentGroupTimestamp() throws Exception {
// Group data points
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of()))),
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1))),
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos)))
)
);
context.groupDataPoints(metricsRequest);
Expand All @@ -160,8 +160,12 @@ public void testGroupingDifferentGroupAttributes() throws Exception {
// Group data points
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0"))))),
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))
createGaugeMetric(
"system.cpu.usage",
"",
List.of(createDoubleDataPoint(nowUnixNanos, nowUnixNanos, List.of(keyValue("core", "cpu0"))))
),
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos)))
)
);
context.groupDataPoints(metricsRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,48 @@ public class DataPointNumberTests extends ESTestCase {

public void testGauge() {
DataPoint.Number doubleGauge = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos, List.of()),
createDoubleDataPoint(nowUnixNanos),
createGaugeMetric("system.cpu.usage", "", List.of())
);
assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double"));
DataPoint.Number longGauge = new DataPoint.Number(
createLongDataPoint(nowUnixNanos, List.of()),
createLongDataPoint(nowUnixNanos),
createGaugeMetric("system.cpu.usage", "", List.of())
);
assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long"));
}

public void testCounterTemporality() {
DataPoint.Number doubleCumulative = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos, List.of()),
createDoubleDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
);
assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double"));
DataPoint.Number longCumulative = new DataPoint.Number(
createLongDataPoint(nowUnixNanos, List.of()),
createLongDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
);
assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long"));
DataPoint.Number doubleDelta = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos, List.of()),
createDoubleDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
);
assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double"));
DataPoint.Number longDelta = new DataPoint.Number(
createLongDataPoint(nowUnixNanos, List.of()),
createLongDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
);
assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long"));
}

public void testCounterNonMonotonic() {
DataPoint.Number doubleNonMonotonic = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos, List.of()),
createDoubleDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE)
);
assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double"));
DataPoint.Number longNonMonotonic = new DataPoint.Number(
createLongDataPoint(nowUnixNanos, List.of()),
createLongDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA)
);
assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long"));
Expand Down
Loading