Skip to content

Commit 662882a

Browse files
authored
OTLP: create ES documents (#133898)
1 parent f9e404b commit 662882a

File tree

6 files changed

+390
-27
lines changed

6 files changed

+390
-27
lines changed

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import io.opentelemetry.proto.metrics.v1.Metric;
1212
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
1313

14+
import org.elasticsearch.xcontent.XContentBuilder;
15+
16+
import java.io.IOException;
1417
import java.util.List;
1518
import java.util.Set;
1619

@@ -65,6 +68,14 @@ public interface DataPoint {
6568
*/
6669
String getMetricName();
6770

71+
/**
72+
* Builds the metric value for the data point and writes it to the provided XContentBuilder.
73+
*
74+
* @param builder the XContentBuilder to write the metric value to
75+
* @throws IOException if an I/O error occurs while writing to the builder
76+
*/
77+
void buildMetricValue(XContentBuilder builder) throws IOException;
78+
6879
/**
6980
* Returns the dynamic template name for the data point based on its type and value.
7081
* This is used to dynamically map the appropriate field type according to the data point's characteristics.
@@ -108,6 +119,14 @@ public String getMetricName() {
108119
return metric.getName();
109120
}
110121

122+
@Override
123+
public void buildMetricValue(XContentBuilder builder) throws IOException {
124+
switch (dataPoint.getValueCase()) {
125+
case AS_DOUBLE -> builder.value(dataPoint.getAsDouble());
126+
case AS_INT -> builder.value(dataPoint.getAsInt());
127+
}
128+
}
129+
111130
@Override
112131
public String getDynamicTemplate() {
113132
String type;
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.oteldata.otlp.docbuilder;
9+
10+
import io.opentelemetry.proto.common.v1.AnyValue;
11+
import io.opentelemetry.proto.common.v1.InstrumentationScope;
12+
import io.opentelemetry.proto.common.v1.KeyValue;
13+
import io.opentelemetry.proto.resource.v1.Resource;
14+
15+
import com.google.protobuf.ByteString;
16+
17+
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.xcontent.XContentBuilder;
19+
import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint;
20+
import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext;
21+
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
22+
23+
import java.io.IOException;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.concurrent.TimeUnit;
27+
28+
/**
29+
* This class constructs an Elasticsearch document representation of a metric data point group.
30+
* It also handles dynamic templates for metrics based on their attributes.
31+
*/
32+
public class MetricDocumentBuilder {
33+
34+
private final BufferedByteStringAccessor byteStringAccessor;
35+
36+
public MetricDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) {
37+
this.byteStringAccessor = byteStringAccessor;
38+
}
39+
40+
public HashMap<String, String> buildMetricDocument(XContentBuilder builder, DataPointGroupingContext.DataPointGroup dataPointGroup)
41+
throws IOException {
42+
HashMap<String, String> dynamicTemplates = new HashMap<>();
43+
List<DataPoint> dataPoints = dataPointGroup.dataPoints();
44+
builder.startObject();
45+
builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getTimestampUnixNano()));
46+
if (dataPointGroup.getStartTimestampUnixNano() != 0) {
47+
builder.field("start_timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getStartTimestampUnixNano()));
48+
}
49+
buildResource(dataPointGroup.resource(), dataPointGroup.resourceSchemaUrl(), builder);
50+
buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope());
51+
buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit());
52+
builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash());
53+
54+
builder.startObject("metrics");
55+
for (int i = 0, dataPointsSize = dataPoints.size(); i < dataPointsSize; i++) {
56+
DataPoint dataPoint = dataPoints.get(i);
57+
builder.field(dataPoint.getMetricName());
58+
dataPoint.buildMetricValue(builder);
59+
String dynamicTemplate = dataPoint.getDynamicTemplate();
60+
if (dynamicTemplate != null) {
61+
dynamicTemplates.put("metrics." + dataPoint.getMetricName(), dynamicTemplate);
62+
}
63+
}
64+
builder.endObject();
65+
builder.endObject();
66+
return dynamicTemplates;
67+
}
68+
69+
private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException {
70+
builder.startObject("resource");
71+
addFieldIfNotEmpty(builder, "schema_url", schemaUrl);
72+
if (resource.getDroppedAttributesCount() > 0) {
73+
builder.field("dropped_attributes_count", resource.getDroppedAttributesCount());
74+
}
75+
builder.startObject("attributes");
76+
buildAttributes(builder, resource.getAttributesList());
77+
builder.endObject();
78+
builder.endObject();
79+
}
80+
81+
private void buildScope(XContentBuilder builder, ByteString schemaUrl, InstrumentationScope scope) throws IOException {
82+
builder.startObject("scope");
83+
addFieldIfNotEmpty(builder, "schema_url", schemaUrl);
84+
if (scope.getDroppedAttributesCount() > 0) {
85+
builder.field("dropped_attributes_count", scope.getDroppedAttributesCount());
86+
}
87+
addFieldIfNotEmpty(builder, "name", scope.getNameBytes());
88+
addFieldIfNotEmpty(builder, "version", scope.getVersionBytes());
89+
builder.startObject("attributes");
90+
buildAttributes(builder, scope.getAttributesList());
91+
builder.endObject();
92+
builder.endObject();
93+
}
94+
95+
private void addFieldIfNotEmpty(XContentBuilder builder, String name, ByteString value) throws IOException {
96+
if (value != null && value.isEmpty() == false) {
97+
builder.field(name);
98+
byteStringAccessor.utf8Value(builder, value);
99+
}
100+
}
101+
102+
private void buildDataPointAttributes(XContentBuilder builder, List<KeyValue> attributes, String unit) throws IOException {
103+
builder.startObject("attributes");
104+
buildAttributes(builder, attributes);
105+
builder.endObject();
106+
if (Strings.hasLength(unit)) {
107+
builder.field("unit", unit);
108+
}
109+
}
110+
111+
private void buildAttributes(XContentBuilder builder, List<KeyValue> attributes) throws IOException {
112+
for (int i = 0, size = attributes.size(); i < size; i++) {
113+
KeyValue attribute = attributes.get(i);
114+
builder.field(attribute.getKey());
115+
attributeValue(builder, attribute.getValue());
116+
}
117+
}
118+
119+
private void attributeValue(XContentBuilder builder, AnyValue value) throws IOException {
120+
switch (value.getValueCase()) {
121+
case STRING_VALUE -> byteStringAccessor.utf8Value(builder, value.getStringValueBytes());
122+
case BOOL_VALUE -> builder.value(value.getBoolValue());
123+
case INT_VALUE -> builder.value(value.getIntValue());
124+
case DOUBLE_VALUE -> builder.value(value.getDoubleValue());
125+
case ARRAY_VALUE -> {
126+
builder.startArray();
127+
List<AnyValue> valuesList = value.getArrayValue().getValuesList();
128+
for (int i = 0, valuesListSize = valuesList.size(); i < valuesListSize; i++) {
129+
AnyValue arrayValue = valuesList.get(i);
130+
attributeValue(builder, arrayValue);
131+
}
132+
builder.endArray();
133+
}
134+
default -> throw new IllegalArgumentException("Unsupported attribute value type: " + value.getValueCase());
135+
}
136+
}
137+
138+
}

x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,27 @@ public static Metric createSumMetric(
105105
.build();
106106
}
107107

108-
public static NumberDataPoint createDoubleDataPoint(long timestamp, List<KeyValue> attributes) {
108+
public static NumberDataPoint createDoubleDataPoint(long timestamp) {
109+
return createDoubleDataPoint(timestamp, timestamp, List.of());
110+
}
111+
112+
public static NumberDataPoint createDoubleDataPoint(long timeUnixNano, long startTimeUnixNano, List<KeyValue> attributes) {
109113
return NumberDataPoint.newBuilder()
110-
.setTimeUnixNano(timestamp)
111-
.setStartTimeUnixNano(timestamp)
114+
.setTimeUnixNano(timeUnixNano)
115+
.setStartTimeUnixNano(startTimeUnixNano)
112116
.addAllAttributes(attributes)
113117
.setAsDouble(randomDouble())
114118
.build();
115119
}
116120

117-
public static NumberDataPoint createLongDataPoint(long timestamp, List<KeyValue> attributes) {
121+
public static NumberDataPoint createLongDataPoint(long timestamp) {
122+
return createLongDataPoint(timestamp, timestamp, List.of());
123+
}
124+
125+
public static NumberDataPoint createLongDataPoint(long timeUnixNano, long startTimeUnixNano, List<KeyValue> attributes) {
118126
return NumberDataPoint.newBuilder()
119-
.setTimeUnixNano(timestamp)
120-
.setStartTimeUnixNano(timestamp)
127+
.setTimeUnixNano(timeUnixNano)
128+
.setStartTimeUnixNano(startTimeUnixNano)
121129
.addAllAttributes(attributes)
122130
.setAsInt(randomLong())
123131
.build();

x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ public void testGroupingSameGroup() throws Exception {
3535
// Group data points
3636
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
3737
List.of(
38-
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
39-
createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
38+
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))),
39+
createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))),
4040
createSumMetric(
4141
"http.requests.count",
4242
"",
43-
List.of(createLongDataPoint(nowUnixNanos, List.of())),
43+
List.of(createLongDataPoint(nowUnixNanos)),
4444
true,
4545
AGGREGATION_TEMPORALITY_CUMULATIVE
4646
)
@@ -60,8 +60,8 @@ public void testGroupingDifferentGroupUnit() throws Exception {
6060
// Group data points
6161
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
6262
List.of(
63-
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
64-
createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of())))
63+
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos))),
64+
createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos)))
6565
)
6666
);
6767
context.groupDataPoints(metricsRequest);
@@ -81,7 +81,7 @@ public void testGroupingDifferentResource() throws Exception {
8181
createScopeMetrics(
8282
"test",
8383
"1.0.0",
84-
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))))
84+
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))))
8585
)
8686
)
8787
);
@@ -91,7 +91,7 @@ public void testGroupingDifferentResource() throws Exception {
9191
createScopeMetrics(
9292
"test",
9393
"1.0.0",
94-
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))))
94+
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos))))
9595
)
9696
)
9797
);
@@ -113,7 +113,7 @@ public void testGroupingDifferentScope() throws Exception {
113113
createScopeMetrics(
114114
"test_scope_1",
115115
"1.0.0",
116-
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))))
116+
List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))))
117117
)
118118
)
119119
);
@@ -123,7 +123,7 @@ public void testGroupingDifferentScope() throws Exception {
123123
createScopeMetrics(
124124
"test_scope_2",
125125
"1.0.0",
126-
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))))
126+
List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos))))
127127
)
128128
)
129129
);
@@ -142,8 +142,8 @@ public void testGroupingDifferentGroupTimestamp() throws Exception {
142142
// Group data points
143143
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
144144
List.of(
145-
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of()))),
146-
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))
145+
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1))),
146+
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos)))
147147
)
148148
);
149149
context.groupDataPoints(metricsRequest);
@@ -160,8 +160,12 @@ public void testGroupingDifferentGroupAttributes() throws Exception {
160160
// Group data points
161161
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
162162
List.of(
163-
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0"))))),
164-
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))
163+
createGaugeMetric(
164+
"system.cpu.usage",
165+
"",
166+
List.of(createDoubleDataPoint(nowUnixNanos, nowUnixNanos, List.of(keyValue("core", "cpu0"))))
167+
),
168+
createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos)))
165169
)
166170
);
167171
context.groupDataPoints(metricsRequest);

x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,48 +25,48 @@ public class DataPointNumberTests extends ESTestCase {
2525

2626
public void testGauge() {
2727
DataPoint.Number doubleGauge = new DataPoint.Number(
28-
createDoubleDataPoint(nowUnixNanos, List.of()),
28+
createDoubleDataPoint(nowUnixNanos),
2929
createGaugeMetric("system.cpu.usage", "", List.of())
3030
);
3131
assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double"));
3232
DataPoint.Number longGauge = new DataPoint.Number(
33-
createLongDataPoint(nowUnixNanos, List.of()),
33+
createLongDataPoint(nowUnixNanos),
3434
createGaugeMetric("system.cpu.usage", "", List.of())
3535
);
3636
assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long"));
3737
}
3838

3939
public void testCounterTemporality() {
4040
DataPoint.Number doubleCumulative = new DataPoint.Number(
41-
createDoubleDataPoint(nowUnixNanos, List.of()),
41+
createDoubleDataPoint(nowUnixNanos),
4242
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
4343
);
4444
assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double"));
4545
DataPoint.Number longCumulative = new DataPoint.Number(
46-
createLongDataPoint(nowUnixNanos, List.of()),
46+
createLongDataPoint(nowUnixNanos),
4747
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
4848
);
4949
assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long"));
5050
DataPoint.Number doubleDelta = new DataPoint.Number(
51-
createDoubleDataPoint(nowUnixNanos, List.of()),
51+
createDoubleDataPoint(nowUnixNanos),
5252
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
5353
);
5454
assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double"));
5555
DataPoint.Number longDelta = new DataPoint.Number(
56-
createLongDataPoint(nowUnixNanos, List.of()),
56+
createLongDataPoint(nowUnixNanos),
5757
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
5858
);
5959
assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long"));
6060
}
6161

6262
public void testCounterNonMonotonic() {
6363
DataPoint.Number doubleNonMonotonic = new DataPoint.Number(
64-
createDoubleDataPoint(nowUnixNanos, List.of()),
64+
createDoubleDataPoint(nowUnixNanos),
6565
createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE)
6666
);
6767
assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double"));
6868
DataPoint.Number longNonMonotonic = new DataPoint.Number(
69-
createLongDataPoint(nowUnixNanos, List.of()),
69+
createLongDataPoint(nowUnixNanos),
7070
createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA)
7171
);
7272
assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long"));

0 commit comments

Comments
 (0)