Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;

import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -80,9 +81,10 @@ public interface DataPoint {
* Returns the dynamic template name for the data point based on its type and value.
* This is used to dynamically map the appropriate field type according to the data point's characteristics.
*
* @param mappingHints hints for building the dynamic template
* @return the dynamic template name as a string
*/
String getDynamicTemplate();
String getDynamicTemplate(MappingHints mappingHints);

/**
* Validates whether the data point can be indexed into Elasticsearch.
Expand All @@ -92,6 +94,14 @@ public interface DataPoint {
*/
boolean isValid(Set<String> errors);

/**
* Returns the {@code _doc_count} for the data point.
* This is used when {@link MappingHints#docCount()} is true.
*
* @return the {@code _doc_count}
*/
long getDocCount();

record Number(NumberDataPoint dataPoint, Metric metric) implements DataPoint {

@Override
Expand Down Expand Up @@ -128,7 +138,12 @@ public void buildMetricValue(XContentBuilder builder) throws IOException {
}

@Override
public String getDynamicTemplate() {
public long getDocCount() {
return 1;
}

@Override
public String getDynamicTemplate(MappingHints mappingHints) {
String type;
if (metric.hasSum()
// TODO add support for delta counters - for now we represent them as gauges
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp.docbuilder;

import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;

import java.util.List;

/**
* Represents mapping hints that can be used to influence how data is indexed in Elasticsearch.
* These hints can be provided by users via data point attributes.
*
* @param aggregateMetricDouble Indicates that the metric should be mapped as an aggregate_metric_double.
* This hint is available for histogram and exponential histogram metrics.
* @param docCount Indicates that the metric should be mapped with a _doc_count field.
* This hint is available for all metric types.
* When used for a histogram, exponential histogram, or summary metric,
* the _doc_count field will be populated with the number of total counts.
* It is not recommended to use this hint for multiple metrics that are grouped together
* into the same document.
* In these cases, the behavior is undefined but does not lead to data loss.
*/
public record MappingHints(boolean aggregateMetricDouble, boolean docCount) {
public static final String MAPPING_HINTS = "elasticsearch.mapping.hints";

private static final MappingHints EMPTY = new MappingHints(false, false);
private static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double";
private static final String DOC_COUNT = "_doc_count";

public static MappingHints fromAttributes(List<KeyValue> attributes) {
boolean aggregateMetricDouble = false;
boolean docCount = false;
for (int i = 0, attributesSize = attributes.size(); i < attributesSize; i++) {
KeyValue attribute = attributes.get(i);
if (attribute.getKey().equals(MAPPING_HINTS)) {
if (attribute.getValue().hasArrayValue()) {
List<AnyValue> valuesList = attribute.getValue().getArrayValue().getValuesList();
for (int j = 0, valuesListSize = valuesList.size(); j < valuesListSize; j++) {
AnyValue hint = valuesList.get(j);
if (hint.hasStringValue()) {
String value = hint.getStringValue();
if (value.equals(AGGREGATE_METRIC_DOUBLE)) {
aggregateMetricDouble = true;
} else if (value.equals(DOC_COUNT)) {
docCount = true;
}
}
}
}
return new MappingHints(aggregateMetricDouble, docCount);
}
}
return EMPTY;
}

public static MappingHints empty() {
return EMPTY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public HashMap<String, String> buildMetricDocument(XContentBuilder builder, Data
DataPoint dataPoint = dataPoints.get(i);
builder.field(dataPoint.getMetricName());
dataPoint.buildMetricValue(builder);
String dynamicTemplate = dataPoint.getDynamicTemplate();
String dynamicTemplate = dataPoint.getDynamicTemplate(MappingHints.empty());
if (dynamicTemplate != null) {
dynamicTemplates.put("metrics." + dataPoint.getMetricName(), dynamicTemplate);
}
Expand Down Expand Up @@ -111,11 +111,26 @@ private void buildDataPointAttributes(XContentBuilder builder, List<KeyValue> at
private void buildAttributes(XContentBuilder builder, List<KeyValue> attributes) throws IOException {
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attribute = attributes.get(i);
builder.field(attribute.getKey());
attributeValue(builder, attribute.getValue());
String key = attribute.getKey();
if (isIgnoredAttribute(key) == false) {
builder.field(key);
attributeValue(builder, attribute.getValue());
}
}
}

/**
* Checks if the given attribute key is an ignored attribute.
* Ignored attributes are well-known Elastic-specific attributes
* that influence how the documents are indexed but are not stored themselves.
*
* @param attributeKey the attribute key to check
* @return true if the attribute is ignored, false otherwise
*/
public static boolean isIgnoredAttribute(String attributeKey) {
return attributeKey.equals(MappingHints.MAPPING_HINTS);
}

private void attributeValue(XContentBuilder builder, AnyValue value) throws IOException {
switch (value.getValueCase()) {
case STRING_VALUE -> byteStringAccessor.utf8Value(builder, value.getStringValueBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import org.elasticsearch.cluster.routing.TsidBuilder;
import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MetricDocumentBuilder;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.util.List;
Expand All @@ -35,7 +36,9 @@ public void add(List<KeyValue> attributesList, TsidBuilder tsidBuilder) {
for (int i = 0; i < attributesList.size(); i++) {
KeyValue keyValue = attributesList.get(i);
String attributeKey = keyValue.getKey();
hashValue(tsidBuilder, prefix + attributeKey, keyValue.getValue());
if (MetricDocumentBuilder.isIgnoredAttribute(attributeKey) == false) {
hashValue(tsidBuilder, prefix + attributeKey, keyValue.getValue());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.oteldata.otlp.datapoint;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints;

import java.util.List;

Expand All @@ -28,48 +29,48 @@ public void testGauge() {
createDoubleDataPoint(nowUnixNanos),
createGaugeMetric("system.cpu.usage", "", List.of())
);
assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double"));
assertThat(doubleGauge.getDynamicTemplate(MappingHints.empty()), equalTo("gauge_double"));
DataPoint.Number longGauge = new DataPoint.Number(
createLongDataPoint(nowUnixNanos),
createGaugeMetric("system.cpu.usage", "", List.of())
);
assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long"));
assertThat(longGauge.getDynamicTemplate(MappingHints.empty()), equalTo("gauge_long"));
}

public void testCounterTemporality() {
DataPoint.Number doubleCumulative = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
);
assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double"));
assertThat(doubleCumulative.getDynamicTemplate(MappingHints.empty()), equalTo("counter_double"));
DataPoint.Number longCumulative = new DataPoint.Number(
createLongDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
);
assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long"));
assertThat(longCumulative.getDynamicTemplate(MappingHints.empty()), equalTo("counter_long"));
DataPoint.Number doubleDelta = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
);
assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double"));
assertThat(doubleDelta.getDynamicTemplate(MappingHints.empty()), equalTo("gauge_double"));
DataPoint.Number longDelta = new DataPoint.Number(
createLongDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
);
assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long"));
assertThat(longDelta.getDynamicTemplate(MappingHints.empty()), equalTo("gauge_long"));
}

public void testCounterNonMonotonic() {
DataPoint.Number doubleNonMonotonic = new DataPoint.Number(
createDoubleDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE)
);
assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double"));
assertThat(doubleNonMonotonic.getDynamicTemplate(MappingHints.empty()), equalTo("gauge_double"));
DataPoint.Number longNonMonotonic = new DataPoint.Number(
createLongDataPoint(nowUnixNanos),
createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA)
);
assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long"));
assertThat(longNonMonotonic.getDynamicTemplate(MappingHints.empty()), equalTo("gauge_long"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp.datapoint;

import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.ArrayValue;
import io.opentelemetry.proto.common.v1.KeyValue;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MappingHints;

import java.util.List;

public class MappingHintsTests extends ESTestCase {

public void testEmptyAttributes() {
MappingHints hints = MappingHints.fromAttributes(List.of());
assertFalse(hints.aggregateMetricDouble());
assertFalse(hints.docCount());
}

public void testNoMappingHints() {
KeyValue kv = KeyValue.newBuilder()
.setKey("some.other.key")
.setValue(AnyValue.newBuilder().setStringValue("some_value").build())
.build();
MappingHints hints = MappingHints.fromAttributes(List.of(kv));
assertFalse(hints.aggregateMetricDouble());
assertFalse(hints.docCount());
}

public void testSingleMappingHint() {
// Test with just aggregate_metric_double hint
KeyValue aggregateMetricHint = createMappingHint("aggregate_metric_double");
MappingHints hints = MappingHints.fromAttributes(List.of(aggregateMetricHint));
assertTrue(hints.aggregateMetricDouble());
assertFalse(hints.docCount());

// Test with just _doc_count hint
KeyValue docCountHint = createMappingHint("_doc_count");
hints = MappingHints.fromAttributes(List.of(docCountHint));
assertFalse(hints.aggregateMetricDouble());
assertTrue(hints.docCount());
}

public void testMultipleMappingHints() {
// Test with both hints
KeyValue bothHints = createMappingHint("aggregate_metric_double", "_doc_count");
MappingHints hints = MappingHints.fromAttributes(List.of(bothHints));
assertTrue(hints.aggregateMetricDouble());
assertTrue(hints.docCount());
}

public void testInvalidHints() {
// Test with invalid hint
KeyValue invalidHint = createMappingHint("invalid_hint");
MappingHints hints = MappingHints.fromAttributes(List.of(invalidHint));
assertFalse(hints.aggregateMetricDouble());
assertFalse(hints.docCount());

// Test with mix of valid and invalid hints
KeyValue mixedHints = createMappingHint("aggregate_metric_double", "invalid_hint", "_doc_count");
hints = MappingHints.fromAttributes(List.of(mixedHints));
assertTrue(hints.aggregateMetricDouble());
assertTrue(hints.docCount());
}

public void testNonArrayValue() {
// Test with non-array value
KeyValue nonArrayHint = KeyValue.newBuilder()
.setKey("elasticsearch.mapping.hints")
.setValue(AnyValue.newBuilder().setStringValue("aggregate_metric_double").build())
.build();
MappingHints hints = MappingHints.fromAttributes(List.of(nonArrayHint));
assertFalse(hints.aggregateMetricDouble());
assertFalse(hints.docCount());
}

public void testNonStringArrayValues() {
// Test with non-string array values
AnyValue numberValue = AnyValue.newBuilder().setIntValue(42).build();
AnyValue boolValue = AnyValue.newBuilder().setBoolValue(true).build();

ArrayValue.Builder arrayBuilder = ArrayValue.newBuilder();
arrayBuilder.addValues(numberValue);
arrayBuilder.addValues(boolValue);

KeyValue invalidTypeHints = KeyValue.newBuilder()
.setKey("elasticsearch.mapping.hints")
.setValue(AnyValue.newBuilder().setArrayValue(arrayBuilder).build())
.build();

MappingHints hints = MappingHints.fromAttributes(List.of(invalidTypeHints));
assertFalse(hints.aggregateMetricDouble());
assertFalse(hints.docCount());
}

private KeyValue createMappingHint(String... hintValues) {
ArrayValue.Builder arrayBuilder = ArrayValue.newBuilder();
for (String hint : hintValues) {
arrayBuilder.addValues(AnyValue.newBuilder().setStringValue(hint));
}

return KeyValue.newBuilder()
.setKey("elasticsearch.mapping.hints")
.setValue(AnyValue.newBuilder().setArrayValue(arrayBuilder))
.build();
}
}