diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java b/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java index d29dced2adb28..3a3c47d8c5bb8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java @@ -34,7 +34,15 @@ public class TsidBuilder { private static final int MAX_TSID_VALUE_FIELDS = 16; private final BufferedMurmur3Hasher murmur3Hasher = new BufferedMurmur3Hasher(0L); - private final List dimensions = new ArrayList<>(); + private final List dimensions; + + public TsidBuilder() { + this.dimensions = new ArrayList<>(); + } + + public TsidBuilder(int size) { + this.dimensions = new ArrayList<>(size); + } public static TsidBuilder newBuilder() { return new TsidBuilder(); @@ -281,6 +289,10 @@ private static int writeHash128(MurmurHash3.Hash128 hash128, byte[] buffer, int return index; } + public int size() { + return dimensions.size(); + } + /** * A functional interface that describes how objects of a complex type are added to a TSID. * diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java new file mode 100644 index 0000000000000..b23fe891db944 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import java.util.List; +import java.util.Set; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; + +/** + * Represents a metrics data point in the OpenTelemetry metrics data model. + * This interface defines methods to access various properties of a data point, + * such as its timestamp, attributes, unit, metric name, and methods to build + * the metric value in a specific format. + * The reason this class is needed is that the generated classes from the + * OpenTelemetry proto definitions don't implement a common interface, + * which makes it difficult to handle different types of data points uniformly. + */ +public interface DataPoint { + + /** + * Returns the timestamp of the data point in Unix nanoseconds. + * + * @return the timestamp in nanoseconds + */ + long getTimestampUnixNano(); + + /** + * Returns the start timestamp of the data point in Unix nanoseconds. + * This allows detecting when a sequence of observations is unbroken. + * This field indicates to consumers the start time for points with cumulative and delta temporality, + * and can support correct rate calculation. + * + * @return the start timestamp in nanoseconds + */ + long getStartTimestampUnixNano(); + + /** + * Returns the attributes associated with the data point. + * + * @return a list of key-value pairs representing the attributes + */ + List getAttributes(); + + /** + * Returns the unit of measurement for the data point. + * + * @return the unit as a string + */ + String getUnit(); + + /** + * Returns the name of the metric associated with the data point. + * + * @return the metric name as a string + */ + String getMetricName(); + + /** + * Returns the dynamic template name for the data point based on its type and value. + * This is used to dynamically map the appropriate field type according to the data point's characteristics. + * + * @return the dynamic template name as a string + */ + String getDynamicTemplate(); + + /** + * Validates whether the data point can be indexed into Elasticsearch. + * + * @param errors a set to collect validation error messages + * @return true if the data point is valid, false otherwise + */ + boolean isValid(Set errors); + + record Number(NumberDataPoint dataPoint, Metric metric) implements DataPoint { + + @Override + public long getTimestampUnixNano() { + return dataPoint.getTimeUnixNano(); + } + + @Override + public List getAttributes() { + return dataPoint.getAttributesList(); + } + + @Override + public long getStartTimestampUnixNano() { + return dataPoint.getStartTimeUnixNano(); + } + + @Override + public String getUnit() { + return metric.getUnit(); + } + + @Override + public String getMetricName() { + return metric.getName(); + } + + @Override + public String getDynamicTemplate() { + String type; + if (metric.hasSum() + // TODO add support for delta counters - for now we represent them as gauges + && metric.getSum().getAggregationTemporality() == AGGREGATION_TEMPORALITY_CUMULATIVE + // TODO add support for up/down counters - for now we represent them as gauges + && metric.getSum().getIsMonotonic()) { + type = "counter_"; + } else { + type = "gauge_"; + } + if (dataPoint.getValueCase() == NumberDataPoint.ValueCase.AS_INT) { + return type + "long"; + } else if (dataPoint.getValueCase() == NumberDataPoint.ValueCase.AS_DOUBLE) { + return type + "double"; + } else { + return null; + } + } + + @Override + public boolean isValid(Set errors) { + return true; + } + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java new file mode 100644 index 0000000000000..6e84dbb46a979 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -0,0 +1,316 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.resource.v1.Resource; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.common.hash.BufferedMurmur3Hasher; +import org.elasticsearch.common.hash.MurmurHash3.Hash128; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; +import org.elasticsearch.xpack.oteldata.otlp.tsid.DataPointTsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.tsid.ResourceTsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.tsid.ScopeTsidFunnel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; + +public class DataPointGroupingContext { + + private final BufferedByteStringAccessor byteStringAccessor; + private final Map resourceGroups = new HashMap<>(); + private final Set ignoredDataPointMessages = new HashSet<>(); + + private int totalDataPoints = 0; + private int ignoredDataPoints = 0; + + public DataPointGroupingContext(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public void groupDataPoints(ExportMetricsServiceRequest exportMetricsServiceRequest) { + List resourceMetricsList = exportMetricsServiceRequest.getResourceMetricsList(); + for (int i = 0; i < resourceMetricsList.size(); i++) { + ResourceMetrics resourceMetrics = resourceMetricsList.get(i); + ResourceGroup resourceGroup = getOrCreateResourceGroup(resourceMetrics); + List scopeMetricsList = resourceMetrics.getScopeMetricsList(); + for (int j = 0; j < scopeMetricsList.size(); j++) { + ScopeMetrics scopeMetrics = scopeMetricsList.get(j); + ScopeGroup scopeGroup = resourceGroup.getOrCreateScope(scopeMetrics); + List metricsList = scopeMetrics.getMetricsList(); + for (int k = 0; k < metricsList.size(); k++) { + var metric = metricsList.get(k); + switch (metric.getDataCase()) { + case SUM: + scopeGroup.addDataPoints(metric, metric.getSum().getDataPointsList(), DataPoint.Number::new); + break; + case GAUGE: + scopeGroup.addDataPoints(metric, metric.getGauge().getDataPointsList(), DataPoint.Number::new); + break; + case EXPONENTIAL_HISTOGRAM: + ignoredDataPoints += metric.getExponentialHistogram().getDataPointsCount(); + ignoredDataPointMessages.add("Exponential histogram is not supported yet. Dropping " + metric.getName()); + break; + case HISTOGRAM: + ignoredDataPoints += metric.getHistogram().getDataPointsCount(); + ignoredDataPointMessages.add("Histogram is not supported yet. Dropping " + metric.getName()); + break; + case SUMMARY: + ignoredDataPoints += metric.getSummary().getDataPointsList().size(); + ignoredDataPointMessages.add("Summary is not supported yet. Dropping " + metric.getName()); + break; + default: + ignoredDataPoints++; + ignoredDataPointMessages.add("unsupported metric type " + metric.getDataCase()); + break; + } + } + } + } + } + + /** + * Consumes all data point groups in the context, removing them from the context. + * + * @param consumer the consumer to process each {@link DataPointGroup} + * @param the type of exception that can be thrown by the consumer + * @throws E if the consumer throws an exception + */ + public void consume(CheckedConsumer consumer) throws E { + for (Iterator iterator = resourceGroups.values().iterator(); iterator.hasNext();) { + ResourceGroup resourceGroup = iterator.next(); + // Remove the resource group from the map can help to significantly reduce GC overhead. + // This avoids that the resource groups are promoted to survivor space when the context is kept alive for a while, + // for example, when referenced in the bulk response listener. + iterator.remove(); + resourceGroup.forEach(consumer); + } + } + + public int totalDataPoints() { + return totalDataPoints; + } + + public int getIgnoredDataPoints() { + return ignoredDataPoints; + } + + public String getIgnoredDataPointsMessage() { + return ignoredDataPointMessages.isEmpty() ? "" : String.join("\n", ignoredDataPointMessages); + } + + private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) { + TsidBuilder resourceTsidBuilder = ResourceTsidFunnel.forResource(byteStringAccessor, resourceMetrics); + Hash128 resourceHash = resourceTsidBuilder.hash(); + ResourceGroup resourceGroup = resourceGroups.get(resourceHash); + if (resourceGroup == null) { + resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes()); + resourceGroups.put(resourceHash, resourceGroup); + } + return resourceGroup; + } + + class ResourceGroup { + private final Resource resource; + private final ByteString resourceSchemaUrl; + private final Map scopes; + + ResourceGroup(Resource resource, ByteString resourceSchemaUrl) { + this.resource = resource; + this.resourceSchemaUrl = resourceSchemaUrl; + this.scopes = new HashMap<>(); + } + + public ScopeGroup getOrCreateScope(ScopeMetrics scopeMetrics) { + TsidBuilder scopeTsidBuilder = ScopeTsidFunnel.forScope(byteStringAccessor, scopeMetrics); + Hash128 scopeHash = scopeTsidBuilder.hash(); + ScopeGroup scopeGroup = scopes.get(scopeHash); + if (scopeGroup == null) { + scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes()); + scopes.put(scopeHash, scopeGroup); + } + return scopeGroup; + } + + public void forEach(CheckedConsumer consumer) throws E { + for (ScopeGroup scopeGroup : scopes.values()) { + scopeGroup.forEach(consumer); + } + } + } + + class ScopeGroup { + private final ResourceGroup resourceGroup; + private final InstrumentationScope scope; + private final ByteString scopeSchemaUrl; + // index -> timestamp -> dataPointGroupHash -> DataPointGroup + private final Map>> dataPointGroupsByIndexAndTimestamp; + + ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) { + this.resourceGroup = resourceGroup; + this.scope = scope; + this.scopeSchemaUrl = scopeSchemaUrl; + this.dataPointGroupsByIndexAndTimestamp = new HashMap<>(); + } + + public void addDataPoints(Metric metric, List dataPoints, BiFunction createDataPoint) { + for (int i = 0; i < dataPoints.size(); i++) { + T dataPoint = dataPoints.get(i); + addDataPoint(createDataPoint.apply(dataPoint, metric)); + } + } + + public void addDataPoint(DataPoint dataPoint) { + totalDataPoints++; + if (dataPoint.isValid(ignoredDataPointMessages) == false) { + ignoredDataPoints++; + return; + } + getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint); + } + + private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) { + TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(byteStringAccessor, dataPoint); + Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash(); + // in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp + Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano()); + // TODO determine based on attributes and scope name + String targetIndex = "metrics-generic.otel-default"; + var dataPointGroupsByTimestamp = dataPointGroupsByIndexAndTimestamp.computeIfAbsent(targetIndex, k -> new HashMap<>()); + var dataPointGroups = dataPointGroupsByTimestamp.computeIfAbsent(timestamp, k -> new HashMap<>()); + DataPointGroup dataPointGroup = dataPointGroups.get(dataPointGroupHash); + if (dataPointGroup == null) { + dataPointGroup = new DataPointGroup( + resourceGroup.resource, + resourceGroup.resourceSchemaUrl, + scope, + scopeSchemaUrl, + dataPoint.getAttributes(), + dataPoint.getUnit(), + new ArrayList<>(), + targetIndex + ); + dataPointGroups.put(dataPointGroupHash, dataPointGroup); + } + return dataPointGroup; + } + + public void forEach(CheckedConsumer consumer) throws E { + for (var dataPointGroupsByTime : dataPointGroupsByIndexAndTimestamp.values()) { + for (var dataPointGroups : dataPointGroupsByTime.values()) { + for (DataPointGroup dataPointGroup : dataPointGroups.values()) { + consumer.accept(dataPointGroup); + } + } + } + } + } + + public static final class DataPointGroup { + private final Resource resource; + private final ByteString resourceSchemaUrl; + private final InstrumentationScope scope; + private final ByteString scopeSchemaUrl; + private final List dataPointAttributes; + private final String unit; + private final List dataPoints; + private final String targetIndex; + private String metricNamesHash; + + public DataPointGroup( + Resource resource, + ByteString resourceSchemaUrl, + InstrumentationScope scope, + ByteString scopeSchemaUrl, + List dataPointAttributes, + String unit, + List dataPoints, + String targetIndex + ) { + this.resource = resource; + this.resourceSchemaUrl = resourceSchemaUrl; + this.scope = scope; + this.scopeSchemaUrl = scopeSchemaUrl; + this.dataPointAttributes = dataPointAttributes; + this.unit = unit; + this.dataPoints = dataPoints; + this.targetIndex = targetIndex; + } + + public long getTimestampUnixNano() { + return dataPoints.getFirst().getTimestampUnixNano(); + } + + public long getStartTimestampUnixNano() { + return dataPoints.getFirst().getStartTimestampUnixNano(); + } + + public String getMetricNamesHash() { + if (metricNamesHash == null) { + BufferedMurmur3Hasher hasher = new BufferedMurmur3Hasher(0); + for (int i = 0; i < dataPoints.size(); i++) { + hasher.addString(dataPoints.get(i).getMetricName()); + } + metricNamesHash = Integer.toHexString(hasher.digestHash().hashCode()); + } + return metricNamesHash; + } + + public void addDataPoint(DataPoint dataPoint) { + metricNamesHash = null; // reset the hash when adding a new data point + dataPoints.add(dataPoint); + } + + public Resource resource() { + return resource; + } + + public ByteString resourceSchemaUrl() { + return resourceSchemaUrl; + } + + public InstrumentationScope scope() { + return scope; + } + + public ByteString scopeSchemaUrl() { + return scopeSchemaUrl; + } + + public List dataPointAttributes() { + return dataPointAttributes; + } + + public String unit() { + return unit; + } + + public List dataPoints() { + return dataPoints; + } + + public String targetIndex() { + return targetIndex; + } + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java new file mode 100644 index 0000000000000..4ef2828670fc3 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.proto; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * A utility class that uses a shared {@code byte[]} buffer to convert {@link ByteString} values to byte arrays. + * This avoids frequent allocations of byte arrays in {@link ByteString#toByteArray()}. + * Note that due to the use of a shared buffer, this class is not thread-safe. + */ +public class BufferedByteStringAccessor { + + private static final int DEFAULT_INITIAL_SIZE = 128; + + private byte[] bytes = new byte[DEFAULT_INITIAL_SIZE]; + + /** + * Adds a string dimension to the given {@link TsidBuilder} using the provided dimension name and {@link ByteString} value. + * The value is converted to a byte array using a shared buffer and added to the builder. + * + * @param tsidBuilder the builder to which the dimension will be added + * @param dimension the name of the dimension to add + * @param value the value of the dimension as a {@link ByteString} + */ + public void addStringDimension(TsidBuilder tsidBuilder, String dimension, ByteString value) { + if (value.isEmpty()) { + // Ignoring invalid values + // According to the spec https://opentelemetry.io/docs/specs/otel/common/#attribute: + // The attribute key MUST be a non-null and non-empty string. + return; + } + tsidBuilder.addStringDimension(dimension, toBytes(value), 0, value.size()); + } + + /** + * Writes a UTF-8 encoded value to the provided {@link XContentBuilder} using {@link XContentBuilder#utf8Value}. + * This uses a shared byte array to avoid allocations. + * + * @param value the value to write + */ + public void utf8Value(XContentBuilder builder, ByteString value) throws IOException { + builder.utf8Value(toBytes(value), 0, value.size()); + } + + /* + * Not exposed as a public method to avoid risks of leaking a reference to the reused byte array. + */ + private byte[] toBytes(ByteString byteString) { + int size = byteString.size(); + if (bytes.length < size) { + bytes = new byte[size]; + } + byteString.copyTo(bytes, 0); + return bytes; + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java new file mode 100644 index 0000000000000..46a2c301593e1 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +class AttributeListTsidFunnel implements TsidFunnel> { + + private final String prefix; + private final BufferedByteStringAccessor byteStringAccessor; + + private AttributeListTsidFunnel(BufferedByteStringAccessor byteStringAccessor, String prefix) { + this.prefix = prefix; + this.byteStringAccessor = byteStringAccessor; + } + + static AttributeListTsidFunnel get(BufferedByteStringAccessor byteStringAccessor, String prefix) { + return new AttributeListTsidFunnel(byteStringAccessor, prefix); + } + + @Override + public void add(List attributesList, TsidBuilder tsidBuilder) { + for (int i = 0; i < attributesList.size(); i++) { + KeyValue keyValue = attributesList.get(i); + String attributeKey = keyValue.getKey(); + hashValue(tsidBuilder, prefix + attributeKey, keyValue.getValue()); + } + } + + private void hashValue(TsidBuilder tsidBuilder, String key, AnyValue value) { + switch (value.getValueCase()) { + case STRING_VALUE: + byteStringAccessor.addStringDimension(tsidBuilder, key, value.getStringValueBytes()); + break; + case BOOL_VALUE: + tsidBuilder.addBooleanDimension(key, value.getBoolValue()); + break; + case DOUBLE_VALUE: + tsidBuilder.addDoubleDimension(key, value.getDoubleValue()); + break; + case INT_VALUE: + tsidBuilder.addLongDimension(key, value.getIntValue()); + break; + case KVLIST_VALUE: + tsidBuilder.add(value.getKvlistValue().getValuesList(), AttributeListTsidFunnel.get(byteStringAccessor, key + ".")); + break; + case ARRAY_VALUE: + List valuesList = value.getArrayValue().getValuesList(); + for (int i = 0; i < valuesList.size(); i++) { + hashValue(tsidBuilder, key, valuesList.get(i)); + } + break; + } + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java new file mode 100644 index 0000000000000..5543b41e15863 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +public class DataPointTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + private DataPointTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint) { + TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size()); + new DataPointTsidFunnel(byteStringAccessor).add(dataPoint, tsidBuilder); + return tsidBuilder; + } + + @Override + public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) { + tsidBuilder.add(dataPoint.getAttributes(), AttributeListTsidFunnel.get(byteStringAccessor, "attributes.")); + tsidBuilder.addStringDimension("unit", dataPoint.getUnit()); + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java new file mode 100644 index 0000000000000..8c12b4c283e69 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +public class ResourceTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + public ResourceTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAccessor, ResourceMetrics resourceMetrics) { + TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount() + 1); + new ResourceTsidFunnel(byteStringAccessor).add(resourceMetrics, tsidBuilder); + return tsidBuilder; + } + + @Override + public void add(ResourceMetrics resourceMetrics, TsidBuilder tsidBuilder) { + List resourceAttributes = resourceMetrics.getResource().getAttributesList(); + tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "resource.attributes.")); + byteStringAccessor.addStringDimension(tsidBuilder, "schema_url", resourceMetrics.getSchemaUrlBytes()); + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java new file mode 100644 index 0000000000000..53b4d29307bef --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +public class ScopeTsidFunnel implements TsidFunnel { + + private final BufferedByteStringAccessor byteStringAccessor; + + public ScopeTsidFunnel(BufferedByteStringAccessor byteStringAccessor) { + this.byteStringAccessor = byteStringAccessor; + } + + public static TsidBuilder forScope(BufferedByteStringAccessor byteStringAccessor, ScopeMetrics scopeMetrics) { + TsidBuilder tsidBuilder = new TsidBuilder(scopeMetrics.getScope().getAttributesCount() + 3); + new ScopeTsidFunnel(byteStringAccessor).add(scopeMetrics, tsidBuilder); + return tsidBuilder; + } + + @Override + public void add(ScopeMetrics scopeMetrics, TsidBuilder tsidBuilder) { + List resourceAttributes = scopeMetrics.getScope().getAttributesList(); + byteStringAccessor.addStringDimension(tsidBuilder, "scope.name", scopeMetrics.getScope().getNameBytes()); + tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "scope.attributes.")); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java new file mode 100644 index 0000000000000..ef08aee6a2082 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.oteldata.otlp; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.proto.resource.v1.Resource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.elasticsearch.test.ESTestCase.randomDouble; +import static org.elasticsearch.test.ESTestCase.randomLong; + +public class OtlpUtils { + + public static KeyValueList keyValueList(KeyValue... values) { + return KeyValueList.newBuilder().addAllValues(List.of(values)).build(); + } + + public static KeyValue keyValue(String key, String value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, String... values) { + return KeyValue.newBuilder() + .setKey(key) + .setValue( + AnyValue.newBuilder() + .setArrayValue( + ArrayValue.newBuilder() + .addAllValues(Arrays.stream(values).map(v -> AnyValue.newBuilder().setStringValue(v).build()).toList()) + .build() + ) + .build() + ) + .build(); + } + + public static KeyValue keyValue(String key, long value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setIntValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, double value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setDoubleValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, boolean value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setBoolValue(value).build()).build(); + } + + public static KeyValue keyValue(String key, KeyValueList keyValueList) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setKvlistValue(keyValueList).build()).build(); + } + + private static Resource createResource(List attributes) { + return Resource.newBuilder().addAllAttributes(attributes).build(); + } + + public static ResourceMetrics createResourceMetrics(List attributes, List scopeMetrics) { + return ResourceMetrics.newBuilder().setResource(createResource(attributes)).addAllScopeMetrics(scopeMetrics).build(); + } + + private static InstrumentationScope createScope(String name, String version) { + return InstrumentationScope.newBuilder().setName(name).setVersion(version).build(); + } + + public static ScopeMetrics createScopeMetrics(String name, String version, Iterable metrics) { + return ScopeMetrics.newBuilder().setScope(createScope(name, version)).addAllMetrics(metrics).build(); + } + + public static Metric createGaugeMetric(String name, String unit, List dataPoints) { + return Metric.newBuilder().setName(name).setUnit(unit).setGauge(Gauge.newBuilder().addAllDataPoints(dataPoints).build()).build(); + } + + public static Metric createSumMetric( + String name, + String unit, + List dataPoints, + boolean isMonotonic, + AggregationTemporality temporality + ) { + return Metric.newBuilder() + .setName(name) + .setUnit(unit) + .setSum( + Sum.newBuilder().addAllDataPoints(dataPoints).setIsMonotonic(isMonotonic).setAggregationTemporality(temporality).build() + ) + .build(); + } + + public static NumberDataPoint createDoubleDataPoint(long timestamp, List attributes) { + return NumberDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(timestamp) + .addAllAttributes(attributes) + .setAsDouble(randomDouble()) + .build(); + } + + public static NumberDataPoint createLongDataPoint(long timestamp, List attributes) { + return NumberDataPoint.newBuilder() + .setTimeUnixNano(timestamp) + .setStartTimeUnixNano(timestamp) + .addAllAttributes(attributes) + .setAsInt(randomLong()) + .build(); + } + + public static ExportMetricsServiceRequest createMetricsRequest(List metrics) { + + List resourceMetrics = new ArrayList<>(); + for (Metric metric : metrics) { + resourceMetrics.add( + createResourceMetrics( + List.of(keyValue("service.name", "test-service")), + List.of(createScopeMetrics("test", "1.0.0", List.of(metric))) + ) + ); + } + + return ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(resourceMetrics).build(); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java new file mode 100644 index 0000000000000..9667684cbb81d --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createMetricsRequest; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createResourceMetrics; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; + +public class DataPointGroupingContextTests extends ESTestCase { + + private final DataPointGroupingContext context = new DataPointGroupingContext(new BufferedByteStringAccessor()); + private final long nowUnixNanos = System.currentTimeMillis() * 1_000_000L; + + public void testGroupingSameGroup() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))), + createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))), + createSumMetric( + "http.requests.count", + "", + List.of(createLongDataPoint(nowUnixNanos, List.of())), + true, + AGGREGATION_TEMPORALITY_CUMULATIVE + ) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(3, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(1, groupCount.get()); + } + + public void testGroupingDifferentGroupUnit() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))), + createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of()))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentResource() throws Exception { + ResourceMetrics resource1 = createResourceMetrics( + List.of(keyValue("service.name", "test-service_1")), + List.of( + createScopeMetrics( + "test", + "1.0.0", + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of())))) + ) + ) + ); + ResourceMetrics resource2 = createResourceMetrics( + List.of(keyValue("service.name", "test-service_2")), + List.of( + createScopeMetrics( + "test", + "1.0.0", + List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))) + ) + ) + ); + + context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentScope() throws Exception { + ResourceMetrics resource1 = createResourceMetrics( + List.of(keyValue("service.name", "test-service")), + List.of( + createScopeMetrics( + "test_scope_1", + "1.0.0", + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of())))) + ) + ) + ); + ResourceMetrics resource2 = createResourceMetrics( + List.of(keyValue("service.name", "test-service")), + List.of( + createScopeMetrics( + "test_scope_2", + "1.0.0", + List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))) + ) + ) + ); + + context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentGroupTimestamp() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of()))), + createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + + public void testGroupingDifferentGroupAttributes() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0"))))), + createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + context.consume(dataPointGroup -> groupCount.incrementAndGet()); + assertEquals(2, groupCount.get()); + } + +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java new file mode 100644 index 0000000000000..46dcc1ca65de6 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import org.elasticsearch.test.ESTestCase; + +import java.util.List; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; +import static org.hamcrest.Matchers.equalTo; + +public class DataPointNumberTests extends ESTestCase { + + private final long nowUnixNanos = System.currentTimeMillis() * 1_000_000L; + + public void testGauge() { + DataPoint.Number doubleGauge = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of()), + createGaugeMetric("system.cpu.usage", "", List.of()) + ); + assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double")); + DataPoint.Number longGauge = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of()), + createGaugeMetric("system.cpu.usage", "", List.of()) + ); + assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long")); + } + + public void testCounterTemporality() { + DataPoint.Number doubleCumulative = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of()), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE) + ); + assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double")); + DataPoint.Number longCumulative = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of()), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE) + ); + assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long")); + DataPoint.Number doubleDelta = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of()), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA) + ); + assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double")); + DataPoint.Number longDelta = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of()), + createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA) + ); + assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long")); + } + + public void testCounterNonMonotonic() { + DataPoint.Number doubleNonMonotonic = new DataPoint.Number( + createDoubleDataPoint(nowUnixNanos, List.of()), + createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE) + ); + assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double")); + DataPoint.Number longNonMonotonic = new DataPoint.Number( + createLongDataPoint(nowUnixNanos, List.of()), + createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA) + ); + assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long")); + } + +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java new file mode 100644 index 0000000000000..620e4027cd44c --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.proto; + +import com.google.protobuf.ByteString; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import static org.hamcrest.Matchers.equalTo; + +public class BufferedByteStringAccessorTests extends ESTestCase { + + private final BufferedByteStringAccessor accessor = new BufferedByteStringAccessor(); + + public void testAddStringDimension() { + String value = randomUnicodeOfLengthBetween(1, 1000); + TsidBuilder byteStringTsidBuilder = new TsidBuilder(); + accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8(value)); + TsidBuilder basicTsidBuilder = new TsidBuilder(); + basicTsidBuilder.addStringDimension("test", value); + assertThat(byteStringTsidBuilder.hash(), equalTo(basicTsidBuilder.hash())); + } + + public void testAddEmptyStringDimension() { + TsidBuilder byteStringTsidBuilder = new TsidBuilder(); + accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8("")); + assertThat(byteStringTsidBuilder.size(), equalTo(0)); + } + + public void testUtf8Value() throws Exception { + String value = randomUnicodeOfLengthBetween(0, 1000); + String json; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.field("value"); + accessor.utf8Value(builder, ByteString.copyFromUtf8(value)); + builder.endObject(); + json = Strings.toString(builder); + } + + assertThat(XContentHelper.convertToMap(JsonXContent.jsonXContent, json, false).get("value"), equalTo(value)); + } + +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java new file mode 100644 index 0000000000000..5db5517356481 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.tsid; + +import org.elasticsearch.cluster.routing.TsidBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; + +import java.util.List; + +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValueList; + +public class AttributeListTsidFunnelTests extends ESTestCase { + + private final TsidBuilder plainBuilder = new TsidBuilder(); + private final TsidBuilder funnelBuilder = new TsidBuilder(); + private final BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor(); + private final AttributeListTsidFunnel funnel = AttributeListTsidFunnel.get(byteStringAccessor, "attributes."); + + public void testSimpleAttributes() { + funnel.add( + List.of( + keyValue("string", "value"), + keyValue("array", "value1", "value2"), + keyValue("bool", true), + keyValue("double", 1.5), + keyValue("long", 42L), + keyValue("int", 42) + ), + funnelBuilder + ); + plainBuilder.addStringDimension("attributes.string", "value"); + plainBuilder.addStringDimension("attributes.array", "value1"); + plainBuilder.addStringDimension("attributes.array", "value2"); + plainBuilder.addBooleanDimension("attributes.bool", true); + plainBuilder.addDoubleDimension("attributes.double", 1.5); + plainBuilder.addLongDimension("attributes.long", 42); + plainBuilder.addIntDimension("attributes.int", 42); + assertEquals(plainBuilder.hash(), funnelBuilder.hash()); + } + + public void testNestedAttributes() { + funnel.add( + List.of(keyValue("foo", "bar"), keyValue("nested", keyValueList(keyValue("string", "value"), keyValue("int", 42)))), + funnelBuilder + ); + plainBuilder.addStringDimension("attributes.nested.string", "value"); + plainBuilder.addLongDimension("attributes.nested.int", 42); + plainBuilder.addStringDimension("attributes.foo", "bar"); + assertEquals(plainBuilder.hash(), funnelBuilder.hash()); + } + +}