Skip to content

Commit edb6690

Browse files
authored
OTLP: determine target data stream based on attributes and scope name (elastic#133903)
1 parent 4c4e49b commit edb6690

File tree

10 files changed

+483
-31
lines changed

10 files changed

+483
-31
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.metadata.DataStream;
1213
import org.elasticsearch.cluster.metadata.ProjectId;
1314
import org.elasticsearch.core.Nullable;
1415
import org.elasticsearch.ingest.AbstractProcessor;
@@ -17,11 +18,9 @@
1718
import org.elasticsearch.ingest.Processor;
1819

1920
import java.util.List;
20-
import java.util.Locale;
2121
import java.util.Map;
2222
import java.util.Objects;
2323
import java.util.function.Function;
24-
import java.util.regex.Pattern;
2524

2625
import static org.elasticsearch.core.Strings.format;
2726
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@@ -221,11 +220,6 @@ public RerouteProcessor create(
221220
*/
222221
static final class DataStreamValueSource {
223222

224-
private static final int MAX_LENGTH = 100;
225-
private static final String REPLACEMENT = "_";
226-
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
227-
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
228-
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
229223
static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}");
230224
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
231225
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");
@@ -235,24 +229,15 @@ static final class DataStreamValueSource {
235229
private final Function<String, String> sanitizer;
236230

237231
public static DataStreamValueSource type(String type) {
238-
return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE));
232+
return new DataStreamValueSource(type, DataStream::sanitizeType);
239233
}
240234

241235
public static DataStreamValueSource dataset(String dataset) {
242-
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
236+
return new DataStreamValueSource(dataset, DataStream::sanitizeDataset);
243237
}
244238

245239
public static DataStreamValueSource namespace(String namespace) {
246-
return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE));
247-
}
248-
249-
private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
250-
if (s == null) {
251-
return null;
252-
}
253-
s = s.toLowerCase(Locale.ROOT);
254-
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
255-
return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
240+
return new DataStreamValueSource(namespace, DataStream::sanitizeNamespace);
256241
}
257242

258243
private DataStreamValueSource(String value, Function<String, String> sanitizer) {

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.util.function.Function;
7575
import java.util.function.LongSupplier;
7676
import java.util.function.Predicate;
77+
import java.util.regex.Pattern;
7778
import java.util.stream.Collectors;
7879

7980
import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
@@ -98,6 +99,33 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
9899
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
99100
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
100101

102+
private static final int MAX_LENGTH = 100;
103+
private static final String REPLACEMENT = "_";
104+
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
105+
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
106+
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
107+
108+
public static String sanitizeType(String type) {
109+
return sanitizeDataStreamField(type, DISALLOWED_IN_TYPE);
110+
}
111+
112+
public static String sanitizeDataset(String dataset) {
113+
return sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET);
114+
}
115+
116+
public static String sanitizeNamespace(String namespace) {
117+
return sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE);
118+
}
119+
120+
private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
121+
if (s == null) {
122+
return null;
123+
}
124+
s = s.toLowerCase(Locale.ROOT);
125+
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
126+
return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
127+
}
128+
101129
// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
102130
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
103131
try {

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.hash.BufferedMurmur3Hasher;
2222
import org.elasticsearch.common.hash.MurmurHash3.Hash128;
2323
import org.elasticsearch.core.CheckedConsumer;
24+
import org.elasticsearch.core.Nullable;
2425
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
2526
import org.elasticsearch.xpack.oteldata.otlp.tsid.DataPointTsidFunnel;
2627
import org.elasticsearch.xpack.oteldata.otlp.tsid.ResourceTsidFunnel;
@@ -160,17 +161,36 @@ public <E extends Exception> void forEach(CheckedConsumer<DataPointGroup, E> con
160161
}
161162

162163
class ScopeGroup {
164+
private static final String RECEIVER = "/receiver/";
165+
163166
private final ResourceGroup resourceGroup;
164167
private final InstrumentationScope scope;
165168
private final ByteString scopeSchemaUrl;
169+
@Nullable
170+
private final String receiverName;
166171
// index -> timestamp -> dataPointGroupHash -> DataPointGroup
167-
private final Map<String, Map<Hash128, Map<Hash128, DataPointGroup>>> dataPointGroupsByIndexAndTimestamp;
172+
private final Map<TargetIndex, Map<Hash128, Map<Hash128, DataPointGroup>>> dataPointGroupsByIndexAndTimestamp;
168173

169174
ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) {
170175
this.resourceGroup = resourceGroup;
171176
this.scope = scope;
172177
this.scopeSchemaUrl = scopeSchemaUrl;
173178
this.dataPointGroupsByIndexAndTimestamp = new HashMap<>();
179+
this.receiverName = extractReceiverName(scope);
180+
}
181+
182+
private @Nullable String extractReceiverName(InstrumentationScope scope) {
183+
String scopeName = scope.getName();
184+
int indexOfReceiver = scopeName.indexOf(RECEIVER);
185+
if (indexOfReceiver >= 0) {
186+
int beginIndex = indexOfReceiver + RECEIVER.length();
187+
int endIndex = scopeName.indexOf('/', beginIndex);
188+
if (endIndex < 0) {
189+
endIndex = scopeName.length();
190+
}
191+
return scopeName.substring(beginIndex, endIndex);
192+
}
193+
return null;
174194
}
175195

176196
public <T> void addDataPoints(Metric metric, List<T> dataPoints, BiFunction<T, Metric, DataPoint> createDataPoint) {
@@ -197,8 +217,13 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
197217
Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash();
198218
// in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp
199219
Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano());
200-
// TODO determine based on attributes and scope name
201-
String targetIndex = "metrics-generic.otel-default";
220+
TargetIndex targetIndex = TargetIndex.evaluate(
221+
TargetIndex.TYPE_METRICS,
222+
dataPoint.getAttributes(),
223+
receiverName,
224+
scope.getAttributesList(),
225+
resourceGroup.resource.getAttributesList()
226+
);
202227
var dataPointGroupsByTimestamp = dataPointGroupsByIndexAndTimestamp.computeIfAbsent(targetIndex, k -> new HashMap<>());
203228
var dataPointGroups = dataPointGroupsByTimestamp.computeIfAbsent(timestamp, k -> new HashMap<>());
204229
DataPointGroup dataPointGroup = dataPointGroups.get(dataPointGroupHash);
@@ -237,7 +262,7 @@ public static final class DataPointGroup {
237262
private final String unit;
238263
private final Set<String> metricNames = new HashSet<>();
239264
private final List<DataPoint> dataPoints = new ArrayList<>();
240-
private final String targetIndex;
265+
private final TargetIndex targetIndex;
241266
private String metricNamesHash;
242267

243268
public DataPointGroup(
@@ -247,7 +272,7 @@ public DataPointGroup(
247272
ByteString scopeSchemaUrl,
248273
List<KeyValue> dataPointAttributes,
249274
String unit,
250-
String targetIndex
275+
TargetIndex targetIndex
251276
) {
252277
this.resource = resource;
253278
this.resourceSchemaUrl = resourceSchemaUrl;
@@ -318,7 +343,7 @@ public List<DataPoint> dataPoints() {
318343
return dataPoints;
319344
}
320345

321-
public String targetIndex() {
346+
public TargetIndex targetIndex() {
322347
return targetIndex;
323348
}
324349
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.oteldata.otlp.datapoint;
9+
10+
import io.opentelemetry.proto.common.v1.KeyValue;
11+
12+
import org.elasticsearch.cluster.metadata.DataStream;
13+
import org.elasticsearch.core.Nullable;
14+
15+
import java.util.List;
16+
17+
/**
18+
* Represents the target index for a data point, which can be either a specific index or a data stream.
19+
* The index is determined based on attributes, scope name, and default values.
20+
*/
21+
public final class TargetIndex {
22+
23+
public static final String TYPE_METRICS = "metrics";
24+
25+
private static final String ELASTICSEARCH_INDEX = "elasticsearch.index";
26+
private static final String DATA_STREAM_DATASET = "data_stream.dataset";
27+
private static final String DATA_STREAM_NAMESPACE = "data_stream.namespace";
28+
private static final String DEFAULT_DATASET = "generic";
29+
private static final String OTEL_DATASET_SUFFIX = ".otel";
30+
private static final String DEFAULT_NAMESPACE = "default";
31+
private static final TargetIndex DEFAULT_METRICS_TARGET = evaluate(TYPE_METRICS, List.of(), null, List.of(), List.of());
32+
33+
private String index;
34+
private String type;
35+
private String dataset;
36+
private String namespace;
37+
38+
public static TargetIndex defaultMetrics() {
39+
return DEFAULT_METRICS_TARGET;
40+
}
41+
42+
public static boolean isTargetIndexAttribute(String attributeKey) {
43+
return attributeKey.equals(ELASTICSEARCH_INDEX)
44+
|| attributeKey.equals(DATA_STREAM_DATASET)
45+
|| attributeKey.equals(DATA_STREAM_NAMESPACE);
46+
}
47+
48+
/**
49+
* Determines the target index for a data point.
50+
*
51+
* @param type The data stream type (e.g., "metrics", "logs").
52+
* @param attributes The attributes associated with the data point.
53+
* @param receiverName The name of the receiver, which may influence the dataset (receiver-based routing).
54+
* @param scopeAttributes Attributes associated with the scope.
55+
* @param resourceAttributes Attributes associated with the resource.
56+
* @return A TargetIndex instance representing the target index for the data point.
57+
*/
58+
public static TargetIndex evaluate(
59+
String type,
60+
List<KeyValue> attributes,
61+
@Nullable String receiverName,
62+
List<KeyValue> scopeAttributes,
63+
List<KeyValue> resourceAttributes
64+
) {
65+
// Order:
66+
// 1. elasticsearch.index from attributes, scope.attributes, resource.attributes
67+
// 2. read data_stream.* from attributes, scope.attributes, resource.attributes
68+
// 3. receiver-based routing based on scope.name
69+
// 4. use default hardcoded data_stream.* (<type>-generic-default)
70+
TargetIndex target = new TargetIndex();
71+
target.populateFrom(attributes);
72+
target.populateFrom(scopeAttributes);
73+
target.populateFrom(resourceAttributes);
74+
if (target.index == null) {
75+
target.type = type;
76+
if (target.dataset == null && receiverName != null) {
77+
target.dataset = receiverName;
78+
}
79+
target.dataset = DataStream.sanitizeDataset(target.dataset);
80+
if (target.dataset == null) {
81+
target.dataset = DEFAULT_DATASET;
82+
}
83+
// add otel suffix to match OTel index template
84+
target.dataset = target.dataset + OTEL_DATASET_SUFFIX;
85+
target.namespace = DataStream.sanitizeNamespace(target.namespace);
86+
87+
if (target.namespace == null) {
88+
target.namespace = DEFAULT_NAMESPACE;
89+
}
90+
target.index = target.type + "-" + target.dataset + "-" + target.namespace;
91+
} else {
92+
target.type = null;
93+
target.dataset = null;
94+
target.namespace = null;
95+
}
96+
return target;
97+
}
98+
99+
private TargetIndex() {}
100+
101+
private void populateFrom(List<KeyValue> attributes) {
102+
if (isPopulated()) {
103+
return;
104+
}
105+
for (int i = 0, size = attributes.size(); i < size; i++) {
106+
KeyValue attr = attributes.get(i);
107+
if (attr.getKey().equals(ELASTICSEARCH_INDEX)) {
108+
index = attr.getValue().getStringValue();
109+
} else if (dataset == null && attr.getKey().equals(DATA_STREAM_DATASET)) {
110+
dataset = attr.getValue().getStringValue();
111+
} else if (namespace == null && attr.getKey().equals(DATA_STREAM_NAMESPACE)) {
112+
namespace = attr.getValue().getStringValue();
113+
}
114+
}
115+
}
116+
117+
private boolean isPopulated() {
118+
return (dataset != null && namespace != null) || index != null;
119+
}
120+
121+
public boolean isDataStream() {
122+
return type != null && dataset != null && namespace != null;
123+
}
124+
125+
public String index() {
126+
return index;
127+
}
128+
129+
public String type() {
130+
return type;
131+
}
132+
133+
public String dataset() {
134+
return dataset;
135+
}
136+
137+
public String namespace() {
138+
return namespace;
139+
}
140+
141+
@Override
142+
public String toString() {
143+
return index;
144+
}
145+
146+
@Override
147+
public boolean equals(Object o) {
148+
if (o == null || getClass() != o.getClass()) return false;
149+
150+
TargetIndex that = (TargetIndex) o;
151+
return index.equals(that.index);
152+
}
153+
154+
@Override
155+
public int hashCode() {
156+
return index.hashCode();
157+
}
158+
}

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MappingHints.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
* In these cases, the behavior is undefined but does not lead to data loss.
2828
*/
2929
public record MappingHints(boolean aggregateMetricDouble, boolean docCount) {
30-
public static final String MAPPING_HINTS = "elasticsearch.mapping.hints";
31-
30+
private static final String MAPPING_HINTS = "elasticsearch.mapping.hints";
3231
private static final MappingHints EMPTY = new MappingHints(false, false);
3332
private static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double";
3433
private static final String DOC_COUNT = "_doc_count";
@@ -62,4 +61,8 @@ public static MappingHints fromAttributes(List<KeyValue> attributes) {
6261
public static MappingHints empty() {
6362
return EMPTY;
6463
}
64+
65+
public static boolean isMappingHintsAttribute(String attributeKey) {
66+
return attributeKey.equals(MAPPING_HINTS);
67+
}
6568
}

0 commit comments

Comments
 (0)