From c345e988ca3bb5e65a0a413941cb830c641f592c Mon Sep 17 00:00:00 2001 From: Anh Nguyen Date: Tue, 30 Sep 2025 16:59:00 -0400 Subject: [PATCH] Mirror upstream elastic/elasticsearch#133903 as single snapshot commit for AI review BASE=4c4e49b2e9cbae0572760a0fc38dbbd58f7fa2be HEAD=051723c20ef007ff63b8b530a5b3805947404935 Branch=main --- .../ingest/common/RerouteProcessor.java | 23 +-- .../cluster/metadata/DataStream.java | 28 ++++ .../datapoint/DataPointGroupingContext.java | 37 +++- .../oteldata/otlp/datapoint/TargetIndex.java | 158 ++++++++++++++++++ .../otlp/docbuilder/MappingHints.java | 7 +- .../docbuilder/MetricDocumentBuilder.java | 15 +- .../DataPointGroupingContextTests.java | 76 +++++++++ .../otlp/datapoint/TargetIndexTests.java | 147 ++++++++++++++++ .../MetricDocumentBuilderTests.java | 10 +- .../tsid/AttributeListTsidFunnelTests.java | 13 ++ 10 files changed, 483 insertions(+), 31 deletions(-) create mode 100644 x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndex.java create mode 100644 x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndexTests.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java index 6580a5af3d005..11bfa05abeccf 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java @@ -9,6 +9,7 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.core.Nullable; import org.elasticsearch.ingest.AbstractProcessor; @@ -17,11 +18,9 @@ import org.elasticsearch.ingest.Processor; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.function.Function; -import java.util.regex.Pattern; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; @@ -221,11 +220,6 @@ public RerouteProcessor create( */ static final class DataStreamValueSource { - private static final int MAX_LENGTH = 100; - private static final String REPLACEMENT = "_"; - private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]"); - private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]"); - private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]"); static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}"); static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}"); static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}"); @@ -235,24 +229,15 @@ static final class DataStreamValueSource { private final Function sanitizer; public static DataStreamValueSource type(String type) { - return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE)); + return new DataStreamValueSource(type, DataStream::sanitizeType); } public static DataStreamValueSource dataset(String dataset) { - return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET)); + return new DataStreamValueSource(dataset, DataStream::sanitizeDataset); } public static DataStreamValueSource namespace(String namespace) { - return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE)); - } - - private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) { - if (s == null) { - return null; - } - s = s.toLowerCase(Locale.ROOT); - s = s.substring(0, Math.min(s.length(), MAX_LENGTH)); - return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT); + return new DataStreamValueSource(namespace, DataStream::sanitizeNamespace); } private DataStreamValueSource(String value, Function sanitizer) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 3d5dd4ce9e17c..fddcc838e3a11 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -74,6 +74,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS; @@ -98,6 +99,33 @@ public final class DataStream implements SimpleDiffable, ToXContentO public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); public static final String TIMESTAMP_FIELD_NAME = "@timestamp"; + private static final int MAX_LENGTH = 100; + private static final String REPLACEMENT = "_"; + private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]"); + private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]"); + private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]"); + + public static String sanitizeType(String type) { + return sanitizeDataStreamField(type, DISALLOWED_IN_TYPE); + } + + public static String sanitizeDataset(String dataset) { + return sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET); + } + + public static String sanitizeNamespace(String namespace) { + return sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE); + } + + private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) { + if (s == null) { + return null; + } + s = s.toLowerCase(Locale.ROOT); + s = s.substring(0, Math.min(s.length(), MAX_LENGTH)); + return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT); + } + // Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations public static final Comparator TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> { try { diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java index 6bde8dbd5b5e4..d1b662c3e7f3f 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.hash.BufferedMurmur3Hasher; import org.elasticsearch.common.hash.MurmurHash3.Hash128; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import org.elasticsearch.xpack.oteldata.otlp.tsid.DataPointTsidFunnel; import org.elasticsearch.xpack.oteldata.otlp.tsid.ResourceTsidFunnel; @@ -160,17 +161,36 @@ public void forEach(CheckedConsumer con } class ScopeGroup { + private static final String RECEIVER = "/receiver/"; + private final ResourceGroup resourceGroup; private final InstrumentationScope scope; private final ByteString scopeSchemaUrl; + @Nullable + private final String receiverName; // index -> timestamp -> dataPointGroupHash -> DataPointGroup - private final Map>> dataPointGroupsByIndexAndTimestamp; + private final Map>> dataPointGroupsByIndexAndTimestamp; ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) { this.resourceGroup = resourceGroup; this.scope = scope; this.scopeSchemaUrl = scopeSchemaUrl; this.dataPointGroupsByIndexAndTimestamp = new HashMap<>(); + this.receiverName = extractReceiverName(scope); + } + + private @Nullable String extractReceiverName(InstrumentationScope scope) { + String scopeName = scope.getName(); + int indexOfReceiver = scopeName.indexOf(RECEIVER); + if (indexOfReceiver >= 0) { + int beginIndex = indexOfReceiver + RECEIVER.length(); + int endIndex = scopeName.indexOf('/', beginIndex); + if (endIndex < 0) { + endIndex = scopeName.length(); + } + return scopeName.substring(beginIndex, endIndex); + } + return null; } public void addDataPoints(Metric metric, List dataPoints, BiFunction createDataPoint) { @@ -197,8 +217,13 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) { Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash(); // in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano()); - // TODO determine based on attributes and scope name - String targetIndex = "metrics-generic.otel-default"; + TargetIndex targetIndex = TargetIndex.evaluate( + TargetIndex.TYPE_METRICS, + dataPoint.getAttributes(), + receiverName, + scope.getAttributesList(), + resourceGroup.resource.getAttributesList() + ); var dataPointGroupsByTimestamp = dataPointGroupsByIndexAndTimestamp.computeIfAbsent(targetIndex, k -> new HashMap<>()); var dataPointGroups = dataPointGroupsByTimestamp.computeIfAbsent(timestamp, k -> new HashMap<>()); DataPointGroup dataPointGroup = dataPointGroups.get(dataPointGroupHash); @@ -237,7 +262,7 @@ public static final class DataPointGroup { private final String unit; private final Set metricNames = new HashSet<>(); private final List dataPoints = new ArrayList<>(); - private final String targetIndex; + private final TargetIndex targetIndex; private String metricNamesHash; public DataPointGroup( @@ -247,7 +272,7 @@ public DataPointGroup( ByteString scopeSchemaUrl, List dataPointAttributes, String unit, - String targetIndex + TargetIndex targetIndex ) { this.resource = resource; this.resourceSchemaUrl = resourceSchemaUrl; @@ -318,7 +343,7 @@ public List dataPoints() { return dataPoints; } - public String targetIndex() { + public TargetIndex targetIndex() { return targetIndex; } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndex.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndex.java new file mode 100644 index 0000000000000..6dc0aa114bfd4 --- /dev/null +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndex.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.core.Nullable; + +import java.util.List; + +/** + * Represents the target index for a data point, which can be either a specific index or a data stream. + * The index is determined based on attributes, scope name, and default values. + */ +public final class TargetIndex { + + public static final String TYPE_METRICS = "metrics"; + + private static final String ELASTICSEARCH_INDEX = "elasticsearch.index"; + private static final String DATA_STREAM_DATASET = "data_stream.dataset"; + private static final String DATA_STREAM_NAMESPACE = "data_stream.namespace"; + private static final String DEFAULT_DATASET = "generic"; + private static final String OTEL_DATASET_SUFFIX = ".otel"; + private static final String DEFAULT_NAMESPACE = "default"; + private static final TargetIndex DEFAULT_METRICS_TARGET = evaluate(TYPE_METRICS, List.of(), null, List.of(), List.of()); + + private String index; + private String type; + private String dataset; + private String namespace; + + public static TargetIndex defaultMetrics() { + return DEFAULT_METRICS_TARGET; + } + + public static boolean isTargetIndexAttribute(String attributeKey) { + return attributeKey.equals(ELASTICSEARCH_INDEX) + || attributeKey.equals(DATA_STREAM_DATASET) + || attributeKey.equals(DATA_STREAM_NAMESPACE); + } + + /** + * Determines the target index for a data point. + * + * @param type The data stream type (e.g., "metrics", "logs"). + * @param attributes The attributes associated with the data point. + * @param receiverName The name of the receiver, which may influence the dataset (receiver-based routing). + * @param scopeAttributes Attributes associated with the scope. + * @param resourceAttributes Attributes associated with the resource. + * @return A TargetIndex instance representing the target index for the data point. + */ + public static TargetIndex evaluate( + String type, + List attributes, + @Nullable String receiverName, + List scopeAttributes, + List resourceAttributes + ) { + // Order: + // 1. elasticsearch.index from attributes, scope.attributes, resource.attributes + // 2. read data_stream.* from attributes, scope.attributes, resource.attributes + // 3. receiver-based routing based on scope.name + // 4. use default hardcoded data_stream.* (-generic-default) + TargetIndex target = new TargetIndex(); + target.populateFrom(attributes); + target.populateFrom(scopeAttributes); + target.populateFrom(resourceAttributes); + if (target.index == null) { + target.type = type; + if (target.dataset == null && receiverName != null) { + target.dataset = receiverName; + } + target.dataset = DataStream.sanitizeDataset(target.dataset); + if (target.dataset == null) { + target.dataset = DEFAULT_DATASET; + } + // add otel suffix to match OTel index template + target.dataset = target.dataset + OTEL_DATASET_SUFFIX; + target.namespace = DataStream.sanitizeNamespace(target.namespace); + + if (target.namespace == null) { + target.namespace = DEFAULT_NAMESPACE; + } + target.index = target.type + "-" + target.dataset + "-" + target.namespace; + } else { + target.type = null; + target.dataset = null; + target.namespace = null; + } + return target; + } + + private TargetIndex() {} + + private void populateFrom(List attributes) { + if (isPopulated()) { + return; + } + for (int i = 0, size = attributes.size(); i < size; i++) { + KeyValue attr = attributes.get(i); + if (attr.getKey().equals(ELASTICSEARCH_INDEX)) { + index = attr.getValue().getStringValue(); + } else if (dataset == null && attr.getKey().equals(DATA_STREAM_DATASET)) { + dataset = attr.getValue().getStringValue(); + } else if (namespace == null && attr.getKey().equals(DATA_STREAM_NAMESPACE)) { + namespace = attr.getValue().getStringValue(); + } + } + } + + private boolean isPopulated() { + return (dataset != null && namespace != null) || index != null; + } + + public boolean isDataStream() { + return type != null && dataset != null && namespace != null; + } + + public String index() { + return index; + } + + public String type() { + return type; + } + + public String dataset() { + return dataset; + } + + public String namespace() { + return namespace; + } + + @Override + public String toString() { + return index; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + + TargetIndex that = (TargetIndex) o; + return index.equals(that.index); + } + + @Override + public int hashCode() { + return index.hashCode(); + } +} diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MappingHints.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MappingHints.java index 1c13bd4473eee..da630bed95d8e 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MappingHints.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MappingHints.java @@ -27,8 +27,7 @@ * In these cases, the behavior is undefined but does not lead to data loss. */ public record MappingHints(boolean aggregateMetricDouble, boolean docCount) { - public static final String MAPPING_HINTS = "elasticsearch.mapping.hints"; - + private static final String MAPPING_HINTS = "elasticsearch.mapping.hints"; private static final MappingHints EMPTY = new MappingHints(false, false); private static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double"; private static final String DOC_COUNT = "_doc_count"; @@ -62,4 +61,8 @@ public static MappingHints fromAttributes(List attributes) { public static MappingHints empty() { return EMPTY; } + + public static boolean isMappingHintsAttribute(String attributeKey) { + return attributeKey.equals(MAPPING_HINTS); + } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java index 6ca7a8e7a82d3..9d289a7f8b2b3 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilder.java @@ -19,6 +19,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; @@ -49,6 +50,7 @@ public HashMap buildMetricDocument(XContentBuilder builder, Data builder.field("start_timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getStartTimestampUnixNano())); } buildResource(dataPointGroup.resource(), dataPointGroup.resourceSchemaUrl(), builder); + buildDataStream(builder, dataPointGroup.targetIndex()); buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope()); buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit()); builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash(hasher)); @@ -110,6 +112,17 @@ private void buildDataPointAttributes(XContentBuilder builder, List at } } + private void buildDataStream(XContentBuilder builder, TargetIndex targetIndex) throws IOException { + if (targetIndex.isDataStream() == false) { + return; + } + builder.startObject("data_stream"); + builder.field("type", targetIndex.type()); + builder.field("dataset", targetIndex.dataset()); + builder.field("namespace", targetIndex.namespace()); + builder.endObject(); + } + private void buildAttributes(XContentBuilder builder, List attributes) throws IOException { for (int i = 0, size = attributes.size(); i < size; i++) { KeyValue attribute = attributes.get(i); @@ -130,7 +143,7 @@ private void buildAttributes(XContentBuilder builder, List attributes) * @return true if the attribute is ignored, false otherwise */ public static boolean isIgnoredAttribute(String attributeKey) { - return attributeKey.equals(MappingHints.MAPPING_HINTS); + return TargetIndex.isTargetIndexAttribute(attributeKey) || MappingHints.isMappingHintsAttribute(attributeKey); } private void attributeValue(XContentBuilder builder, AnyValue value) throws IOException { diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 2981a390ea416..bdb8f2ec66593 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +26,7 @@ import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; public class DataPointGroupingContextTests extends ESTestCase { @@ -57,6 +59,33 @@ public void testGroupingSameGroup() throws Exception { assertEquals(1, groupCount.get()); } + public void testGroupingDifferentTargetIndex() throws Exception { + // Group data points + ExportMetricsServiceRequest metricsRequest = createMetricsRequest( + List.of( + createGaugeMetric( + "system.cpu.usage", + "", + List.of(createDoubleDataPoint(nowUnixNanos, nowUnixNanos, List.of(keyValue("data_stream.dataset", "custom")))) + ), + createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))) + ) + ); + context.groupDataPoints(metricsRequest); + assertEquals(2, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + AtomicInteger groupCount = new AtomicInteger(0); + List targetIndexes = new ArrayList<>(); + context.consume(dataPointGroup -> { + groupCount.incrementAndGet(); + targetIndexes.add(dataPointGroup.targetIndex().index()); + }); + assertEquals(2, groupCount.get()); + assertThat(targetIndexes, containsInAnyOrder("metrics-custom.otel-default", "metrics-generic.otel-default")); + } + public void testGroupingDifferentGroupUnit() throws Exception { // Group data points ExportMetricsServiceRequest metricsRequest = createMetricsRequest( @@ -212,4 +241,51 @@ public void testGroupingDifferentGroupAttributes() throws Exception { assertEquals(2, groupCount.get()); } + public void testReceiverBasedRouting() throws Exception { + String scopeName = + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper"; + ResourceMetrics resource = createResourceMetrics( + List.of(keyValue("service.name", "test-service_1")), + List.of( + createScopeMetrics( + scopeName, + "1.0.0", + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos)))) + ) + ) + ); + + context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build()); + assertEquals(1, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + List targetIndexes = new ArrayList<>(); + context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index())); + assertThat(targetIndexes, containsInAnyOrder("metrics-hostmetricsreceiver.otel-default")); + } + + public void testReceiverBasedRoutingWithoutTrailingSlash() throws Exception { + String scopeName = "/receiver/foo"; + ResourceMetrics resource = createResourceMetrics( + List.of(keyValue("service.name", "test-service_1")), + List.of( + createScopeMetrics( + scopeName, + "1.0.0", + List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos)))) + ) + ) + ); + + context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build()); + assertEquals(1, context.totalDataPoints()); + assertEquals(0, context.getIgnoredDataPoints()); + assertEquals("", context.getIgnoredDataPointsMessage()); + + List targetIndexes = new ArrayList<>(); + context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index())); + assertThat(targetIndexes, containsInAnyOrder("metrics-foo.otel-default")); + } + } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndexTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndexTests.java new file mode 100644 index 0000000000000..db675b547ee79 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/TargetIndexTests.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp.datapoint; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + +import org.elasticsearch.test.ESTestCase; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class TargetIndexTests extends ESTestCase { + + public void testEvaluateWithExplicitIndex() { + List attributes = List.of( + createStringAttribute("elasticsearch.index", "custom-index"), + createStringAttribute("data_stream.dataset", "should-be-ignored"), + createStringAttribute("data_stream.namespace", "should-be-ignored") + ); + + TargetIndex index = TargetIndex.evaluate("metrics", attributes, null, List.of(), List.of()); + + assertThat(index.index(), equalTo("custom-index")); + assertThat(index.isDataStream(), is(false)); + assertThat(index.type(), nullValue()); + assertThat(index.dataset(), nullValue()); + assertThat(index.namespace(), nullValue()); + } + + public void testEvaluateWithDataStreamAttributes() { + List attributes = List.of( + createStringAttribute("data_stream.dataset", "custom-dataset"), + createStringAttribute("data_stream.namespace", "custom-namespace") + ); + + TargetIndex index = TargetIndex.evaluate("metrics", attributes, null, List.of(), List.of()); + + // DataStream.sanitizeDataset replaces hyphens with underscores + assertThat(index.index(), equalTo("metrics-custom_dataset.otel-custom-namespace")); + assertThat(index.isDataStream(), is(true)); + assertThat(index.type(), equalTo("metrics")); + assertThat(index.dataset(), equalTo("custom_dataset.otel")); + assertThat(index.namespace(), equalTo("custom-namespace")); + } + + public void testEvaluateWithScopeAttributes() { + List scopeAttributes = List.of( + createStringAttribute("data_stream.dataset", "scope-dataset"), + createStringAttribute("data_stream.namespace", "scope-namespace") + ); + + TargetIndex index = TargetIndex.evaluate("metrics", List.of(), null, scopeAttributes, List.of()); + + // DataStream.sanitizeDataset replaces hyphens with underscores + assertThat(index.index(), equalTo("metrics-scope_dataset.otel-scope-namespace")); + assertThat(index.isDataStream(), is(true)); + assertThat(index.type(), equalTo("metrics")); + assertThat(index.dataset(), equalTo("scope_dataset.otel")); + assertThat(index.namespace(), equalTo("scope-namespace")); + } + + public void testEvaluateWithResourceAttributes() { + List resourceAttributes = List.of( + createStringAttribute("data_stream.dataset", "resource-dataset"), + createStringAttribute("data_stream.namespace", "resource-namespace") + ); + + TargetIndex index = TargetIndex.evaluate("metrics", List.of(), null, List.of(), resourceAttributes); + + // DataStream.sanitizeDataset replaces hyphens with underscores + assertThat(index.index(), equalTo("metrics-resource_dataset.otel-resource-namespace")); + assertThat(index.isDataStream(), is(true)); + assertThat(index.type(), equalTo("metrics")); + assertThat(index.dataset(), equalTo("resource_dataset.otel")); + assertThat(index.namespace(), equalTo("resource-namespace")); + } + + public void testAttributePrecedence() { + // The order of precedence should be: attributes > scopeAttributes > resourceAttributes + List attributes = List.of(createStringAttribute("data_stream.dataset", "attr-dataset")); + + List scopeAttributes = List.of( + createStringAttribute("data_stream.dataset", "scope-dataset"), + createStringAttribute("data_stream.namespace", "scope-namespace") + ); + + List resourceAttributes = List.of( + createStringAttribute("data_stream.dataset", "resource-dataset"), + createStringAttribute("data_stream.namespace", "resource-namespace") + ); + + TargetIndex index = TargetIndex.evaluate("metrics", attributes, null, scopeAttributes, resourceAttributes); + + // DataStream.sanitizeDataset replaces hyphens with underscores + assertThat(index.index(), equalTo("metrics-attr_dataset.otel-scope-namespace")); + assertThat(index.isDataStream(), is(true)); + assertThat(index.type(), equalTo("metrics")); + assertThat(index.dataset(), equalTo("attr_dataset.otel")); + assertThat(index.namespace(), equalTo("scope-namespace")); + } + + public void testEvaluateWithReceiverInScopeName() { + TargetIndex index = TargetIndex.evaluate("metrics", List.of(), "hostmetrics-receiver", List.of(), List.of()); + + assertThat(index.index(), equalTo("metrics-hostmetrics_receiver.otel-default")); + assertThat(index.isDataStream(), is(true)); + assertThat(index.type(), equalTo("metrics")); + assertThat(index.dataset(), equalTo("hostmetrics_receiver.otel")); + assertThat(index.namespace(), equalTo("default")); + } + + public void testEvaluateWithDefaultValues() { + TargetIndex index = TargetIndex.evaluate("metrics", List.of(), null, List.of(), List.of()); + + assertThat(index.index(), equalTo("metrics-generic.otel-default")); + assertThat(index.isDataStream(), is(true)); + assertThat(index.type(), equalTo("metrics")); + assertThat(index.dataset(), equalTo("generic.otel")); + assertThat(index.namespace(), equalTo("default")); + } + + public void testDataStreamSanitization() { + List attributes = List.of( + createStringAttribute("data_stream.dataset", "Some-Dataset"), + createStringAttribute("data_stream.namespace", "Some*Namespace") + ); + + TargetIndex index = TargetIndex.evaluate("metrics", attributes, null, List.of(), List.of()); + + // DataStream.sanitizeDataset and DataStream.sanitizeNamespace should be applied + assertThat(index.dataset(), equalTo("some_dataset.otel")); + assertThat(index.namespace(), equalTo("some_namespace")); + } + + private KeyValue createStringAttribute(String key, String value) { + return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue(value).build()).build(); + } +} diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java index ce86395384746..396dee3380bfb 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint; import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.TargetIndex; import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; @@ -73,7 +74,7 @@ public void testBuildMetricDocument() throws IOException { scopeSchemaUrl, dataPointAttributes, "{test}", - "metrics-generic.otel-default" + TargetIndex.defaultMetrics() ); dataPointGroup.addDataPoint( Set.of(), @@ -102,6 +103,9 @@ public void testBuildMetricDocument() throws IOException { assertThat(doc.evaluate("@timestamp"), equalTo(TimeUnit.NANOSECONDS.toMillis(timestamp))); assertThat(doc.evaluate("start_timestamp"), equalTo(TimeUnit.NANOSECONDS.toMillis(startTimestamp))); + assertThat(doc.evaluate("data_stream.type"), equalTo("metrics")); + assertThat(doc.evaluate("data_stream.dataset"), equalTo("generic.otel")); + assertThat(doc.evaluate("data_stream.namespace"), equalTo("default")); assertThat(doc.evaluate("resource.schema_url"), equalTo("https://opentelemetry.io/schemas/1.0.0")); assertThat(doc.evaluate("resource.dropped_attributes_count"), equalTo(1)); assertThat(doc.evaluate("resource.attributes.service\\.name"), equalTo("test-service")); @@ -139,7 +143,7 @@ public void testAttributeTypes() throws IOException { null, List.of(), "", - "metrics-generic.otel-default" + TargetIndex.defaultMetrics() ); dataPointGroup.addDataPoint( Set.of(), @@ -171,7 +175,7 @@ public void testEmptyFields() throws IOException { null, List.of(), "", - "metrics-generic.otel-default" + TargetIndex.defaultMetrics() ); dataPointGroup.addDataPoint( diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java index 5db5517356481..4d081714477aa 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java @@ -15,6 +15,7 @@ import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValueList; +import static org.hamcrest.Matchers.equalTo; public class AttributeListTsidFunnelTests extends ESTestCase { @@ -56,4 +57,16 @@ public void testNestedAttributes() { assertEquals(plainBuilder.hash(), funnelBuilder.hash()); } + public void testIgnoredAttributes() { + funnel.add( + List.of( + keyValue("elasticsearch.index", "index"), + keyValue("data_stream.dataset", "dataset"), + keyValue("data_stream.namespace", "namespace") + ), + funnelBuilder + ); + assertThat(funnelBuilder.size(), equalTo(0)); + } + }