Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;

import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints;
Expand Down Expand Up @@ -72,10 +73,11 @@ public interface DataPoint {
/**
* Builds the metric value for the data point and writes it to the provided XContentBuilder.
*
* @param mappingHints hints for building the metric value
* @param builder the XContentBuilder to write the metric value to
* @throws IOException if an I/O error occurs while writing to the builder
*/
void buildMetricValue(XContentBuilder builder) throws IOException;
void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException;

/**
* Returns the dynamic template name for the data point based on its type and value.
Expand Down Expand Up @@ -130,7 +132,7 @@ public String getMetricName() {
}

@Override
public void buildMetricValue(XContentBuilder builder) throws IOException {
public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException {
switch (dataPoint.getValueCase()) {
case AS_DOUBLE -> builder.value(dataPoint.getAsDouble());
case AS_INT -> builder.value(dataPoint.getAsInt());
Expand Down Expand Up @@ -168,4 +170,60 @@ public boolean isValid(Set<String> errors) {
return true;
}
}

record Summary(SummaryDataPoint dataPoint, Metric metric) implements DataPoint {

@Override
public long getTimestampUnixNano() {
return dataPoint.getTimeUnixNano();
}

@Override
public List<KeyValue> getAttributes() {
return dataPoint.getAttributesList();
}

@Override
public long getStartTimestampUnixNano() {
return dataPoint.getStartTimeUnixNano();
}

@Override
public String getUnit() {
return metric.getUnit();
}

@Override
public String getMetricName() {
return metric.getName();
}

@Override
public void buildMetricValue(MappingHints mappingHints, XContentBuilder builder) throws IOException {
// TODO: Add support for quantiles
buildAggregateMetricDouble(builder, dataPoint.getSum(), dataPoint.getCount());
}

@Override
public long getDocCount() {
return dataPoint.getCount();
}

@Override
public String getDynamicTemplate(MappingHints mappingHints) {
return "summary";
}

@Override
public boolean isValid(Set<String> errors) {
return true;
}
}

private static void buildAggregateMetricDouble(XContentBuilder builder, double sum, long valueCount) throws IOException {
builder.startObject();
builder.field("sum", sum);
builder.field("value_count", valueCount);
builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public void groupDataPoints(ExportMetricsServiceRequest exportMetricsServiceRequ
ignoredDataPointMessages.add("Histogram is not supported yet. Dropping " + metric.getName());
break;
case SUMMARY:
ignoredDataPoints += metric.getSummary().getDataPointsList().size();
ignoredDataPointMessages.add("Summary is not supported yet. Dropping " + metric.getName());
scopeGroup.addDataPoints(metric, metric.getSummary().getDataPointsList(), DataPoint.Summary::new);
break;
default:
ignoredDataPoints++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
* In these cases, the behavior is undefined but does not lead to data loss.
*/
public record MappingHints(boolean aggregateMetricDouble, boolean docCount) {
private static final String MAPPING_HINTS = "elasticsearch.mapping.hints";

public static final String MAPPING_HINTS = "elasticsearch.mapping.hints";
public static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double";
public static final String DOC_COUNT = "_doc_count";

private static final MappingHints EMPTY = new MappingHints(false, false);
private static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double";
private static final String DOC_COUNT = "_doc_count";

public static MappingHints fromAttributes(List<KeyValue> attributes) {
boolean aggregateMetricDouble = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,25 @@ public HashMap<String, String> buildMetricDocument(XContentBuilder builder, Data
buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit());
builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash(hasher));

long docCount = 0;
builder.startObject("metrics");
for (int i = 0, dataPointsSize = dataPoints.size(); i < dataPointsSize; i++) {
DataPoint dataPoint = dataPoints.get(i);
builder.field(dataPoint.getMetricName());
dataPoint.buildMetricValue(builder);
String dynamicTemplate = dataPoint.getDynamicTemplate(MappingHints.empty());
MappingHints mappingHints = MappingHints.fromAttributes(dataPoint.getAttributes());
dataPoint.buildMetricValue(mappingHints, builder);
String dynamicTemplate = dataPoint.getDynamicTemplate(mappingHints);
if (dynamicTemplate != null) {
dynamicTemplates.put("metrics." + dataPoint.getMetricName(), dynamicTemplate);
}
if (mappingHints.docCount()) {
docCount = dataPoint.getDocCount();
}
}
builder.endObject();
if (docCount > 0) {
builder.field("_doc_count", docCount);
}
builder.endObject();
return dynamicTemplates;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;

import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -38,6 +42,10 @@ public static KeyValue keyValue(String key, String value) {
return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue(value).build()).build();
}

public static List<KeyValue> mappingHints(String... mappingHints) {
return List.of(keyValue(MappingHints.MAPPING_HINTS, mappingHints));
}

public static KeyValue keyValue(String key, String... values) {
return KeyValue.newBuilder()
.setKey(key)
Expand Down Expand Up @@ -105,6 +113,14 @@ public static Metric createSumMetric(
.build();
}

public static Metric createSummaryMetric(String name, String unit, List<SummaryDataPoint> dataPoints) {
return Metric.newBuilder()
.setName(name)
.setUnit(unit)
.setSummary(Summary.newBuilder().addAllDataPoints(dataPoints).build())
.build();
}

public static NumberDataPoint createDoubleDataPoint(long timestamp) {
return createDoubleDataPoint(timestamp, timestamp, List.of());
}
Expand All @@ -131,6 +147,16 @@ public static NumberDataPoint createLongDataPoint(long timeUnixNano, long startT
.build();
}

public static SummaryDataPoint createSummaryDataPoint(long timestamp, List<KeyValue> attributes) {
return SummaryDataPoint.newBuilder()
.setTimeUnixNano(timestamp)
.setStartTimeUnixNano(timestamp)
.addAllAttributes(attributes)
.setCount(randomLong())
.setSum(randomDouble())
.build();
}

public static ExportMetricsServiceRequest createMetricsRequest(List<Metric> metrics) {

List<ResourceMetrics> resourceMetrics = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createResourceMetrics;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSummaryDataPoint;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSummaryMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -46,11 +48,12 @@ public void testGroupingSameGroup() throws Exception {
List.of(createLongDataPoint(nowUnixNanos)),
true,
AGGREGATION_TEMPORALITY_CUMULATIVE
)
),
createSummaryMetric("summary", "", List.of(createSummaryDataPoint(nowUnixNanos, List.of())))
)
);
context.groupDataPoints(metricsRequest);
assertEquals(3, context.totalDataPoints());
assertEquals(4, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;

import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -37,7 +38,9 @@
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSummaryMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.mappingHints;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -197,4 +200,40 @@ public void testEmptyFields() throws IOException {
assertThat(doc.evaluate("unit"), is(nullValue()));
}

public void testSummary() throws Exception {
Resource resource = Resource.newBuilder().build();
InstrumentationScope scope = InstrumentationScope.newBuilder().build();

DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
resource,
null,
scope,
null,
List.of(),
"",
TargetIndex.defaultMetrics()
);
dataPointGroup.addDataPoint(
Set.of(),
new DataPoint.Summary(
SummaryDataPoint.newBuilder()
.setTimeUnixNano(timestamp)
.setStartTimeUnixNano(startTimestamp)
.setCount(1)
.setSum(42.0)
.addAllAttributes(mappingHints(MappingHints.DOC_COUNT))
.build(),
createSummaryMetric("summary", "", List.of())
)
);

XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
HashMap<String, String> dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup);

ObjectPath doc = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
assertThat(doc.evaluate("metrics.summary.sum"), equalTo(42.0));
assertThat(doc.evaluate("metrics.summary.value_count"), equalTo(1));
assertThat(doc.evaluate("_doc_count"), equalTo(1));
assertThat(dynamicTemplates, hasEntry("metrics.summary", "summary"));
}
}