Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.ingest.AbstractProcessor;
Expand All @@ -17,11 +18,9 @@
import org.elasticsearch.ingest.Processor;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Pattern;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
Expand Down Expand Up @@ -221,11 +220,6 @@ public RerouteProcessor create(
*/
static final class DataStreamValueSource {

private static final int MAX_LENGTH = 100;
private static final String REPLACEMENT = "_";
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}");
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");
Expand All @@ -235,24 +229,15 @@ static final class DataStreamValueSource {
private final Function<String, String> sanitizer;

public static DataStreamValueSource type(String type) {
return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE));
return new DataStreamValueSource(type, DataStream::sanitizeType);
}

public static DataStreamValueSource dataset(String dataset) {
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
return new DataStreamValueSource(dataset, DataStream::sanitizeDataset);
}

public static DataStreamValueSource namespace(String namespace) {
return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE));
}

private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
if (s == null) {
return null;
}
s = s.toLowerCase(Locale.ROOT);
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
return new DataStreamValueSource(namespace, DataStream::sanitizeNamespace);
}

private DataStreamValueSource(String value, Function<String, String> sanitizer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
Expand All @@ -98,6 +99,33 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";

private static final int MAX_LENGTH = 100;
private static final String REPLACEMENT = "_";
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");

public static String sanitizeType(String type) {
return sanitizeDataStreamField(type, DISALLOWED_IN_TYPE);
}

public static String sanitizeDataset(String dataset) {
return sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET);
}

public static String sanitizeNamespace(String namespace) {
return sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE);
}

private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
if (s == null) {
return null;
}
s = s.toLowerCase(Locale.ROOT);
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
}

// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.hash.BufferedMurmur3Hasher;
import org.elasticsearch.common.hash.MurmurHash3.Hash128;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
import org.elasticsearch.xpack.oteldata.otlp.tsid.DataPointTsidFunnel;
import org.elasticsearch.xpack.oteldata.otlp.tsid.ResourceTsidFunnel;
Expand Down Expand Up @@ -160,17 +161,36 @@ public <E extends Exception> void forEach(CheckedConsumer<DataPointGroup, E> con
}

class ScopeGroup {
private static final String RECEIVER = "/receiver/";

private final ResourceGroup resourceGroup;
private final InstrumentationScope scope;
private final ByteString scopeSchemaUrl;
@Nullable
private final String receiverName;
// index -> timestamp -> dataPointGroupHash -> DataPointGroup
private final Map<String, Map<Hash128, Map<Hash128, DataPointGroup>>> dataPointGroupsByIndexAndTimestamp;
private final Map<TargetIndex, Map<Hash128, Map<Hash128, DataPointGroup>>> dataPointGroupsByIndexAndTimestamp;

ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) {
this.resourceGroup = resourceGroup;
this.scope = scope;
this.scopeSchemaUrl = scopeSchemaUrl;
this.dataPointGroupsByIndexAndTimestamp = new HashMap<>();
this.receiverName = extractReceiverName(scope);
}

private @Nullable String extractReceiverName(InstrumentationScope scope) {
String scopeName = scope.getName();
int indexOfReceiver = scopeName.indexOf(RECEIVER);
if (indexOfReceiver >= 0) {
int beginIndex = indexOfReceiver + RECEIVER.length();
int endIndex = scopeName.indexOf('/', beginIndex);
if (endIndex < 0) {
endIndex = scopeName.length();
}
return scopeName.substring(beginIndex, endIndex);
}
return null;
}

public <T> void addDataPoints(Metric metric, List<T> dataPoints, BiFunction<T, Metric, DataPoint> createDataPoint) {
Expand All @@ -197,8 +217,13 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash();
// in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp
Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano());
// TODO determine based on attributes and scope name
String targetIndex = "metrics-generic.otel-default";
TargetIndex targetIndex = TargetIndex.evaluate(
TargetIndex.TYPE_METRICS,
dataPoint.getAttributes(),
receiverName,
scope.getAttributesList(),
resourceGroup.resource.getAttributesList()
);
var dataPointGroupsByTimestamp = dataPointGroupsByIndexAndTimestamp.computeIfAbsent(targetIndex, k -> new HashMap<>());
var dataPointGroups = dataPointGroupsByTimestamp.computeIfAbsent(timestamp, k -> new HashMap<>());
DataPointGroup dataPointGroup = dataPointGroups.get(dataPointGroupHash);
Expand Down Expand Up @@ -237,7 +262,7 @@ public static final class DataPointGroup {
private final String unit;
private final Set<String> metricNames = new HashSet<>();
private final List<DataPoint> dataPoints = new ArrayList<>();
private final String targetIndex;
private final TargetIndex targetIndex;
private String metricNamesHash;

public DataPointGroup(
Expand All @@ -247,7 +272,7 @@ public DataPointGroup(
ByteString scopeSchemaUrl,
List<KeyValue> dataPointAttributes,
String unit,
String targetIndex
TargetIndex targetIndex
) {
this.resource = resource;
this.resourceSchemaUrl = resourceSchemaUrl;
Expand Down Expand Up @@ -318,7 +343,7 @@ public List<DataPoint> dataPoints() {
return dataPoints;
}

public String targetIndex() {
public TargetIndex targetIndex() {
return targetIndex;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.oteldata.otlp.datapoint;

import io.opentelemetry.proto.common.v1.KeyValue;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.core.Nullable;

import java.util.List;

/**
* Represents the target index for a data point, which can be either a specific index or a data stream.
* The index is determined based on attributes, scope name, and default values.
*/
public final class TargetIndex {

public static final String TYPE_METRICS = "metrics";

private static final String ELASTICSEARCH_INDEX = "elasticsearch.index";
private static final String DATA_STREAM_DATASET = "data_stream.dataset";
private static final String DATA_STREAM_NAMESPACE = "data_stream.namespace";
private static final String DEFAULT_DATASET = "generic";
private static final String OTEL_DATASET_SUFFIX = ".otel";
private static final String DEFAULT_NAMESPACE = "default";
private static final TargetIndex DEFAULT_METRICS_TARGET = evaluate(TYPE_METRICS, List.of(), null, List.of(), List.of());

private String index;
private String type;
private String dataset;
private String namespace;

public static TargetIndex defaultMetrics() {
return DEFAULT_METRICS_TARGET;
}

public static boolean isTargetIndexAttribute(String attributeKey) {
return attributeKey.equals(ELASTICSEARCH_INDEX)
|| attributeKey.equals(DATA_STREAM_DATASET)
|| attributeKey.equals(DATA_STREAM_NAMESPACE);
}

/**
* Determines the target index for a data point.
*
* @param type The data stream type (e.g., "metrics", "logs").
* @param attributes The attributes associated with the data point.
* @param receiverName The name of the receiver, which may influence the dataset (receiver-based routing).
* @param scopeAttributes Attributes associated with the scope.
* @param resourceAttributes Attributes associated with the resource.
* @return A TargetIndex instance representing the target index for the data point.
*/
public static TargetIndex evaluate(
String type,
List<KeyValue> attributes,
@Nullable String receiverName,
List<KeyValue> scopeAttributes,
List<KeyValue> resourceAttributes
) {
// Order:
// 1. elasticsearch.index from attributes, scope.attributes, resource.attributes
// 2. read data_stream.* from attributes, scope.attributes, resource.attributes
// 3. receiver-based routing based on scope.name
// 4. use default hardcoded data_stream.* (<type>-generic-default)
TargetIndex target = new TargetIndex();
target.populateFrom(attributes);
target.populateFrom(scopeAttributes);
target.populateFrom(resourceAttributes);
if (target.index == null) {
target.type = type;
if (target.dataset == null && receiverName != null) {
target.dataset = receiverName;
}
target.dataset = DataStream.sanitizeDataset(target.dataset);
if (target.dataset == null) {
target.dataset = DEFAULT_DATASET;
}
// add otel suffix to match OTel index template
target.dataset = target.dataset + OTEL_DATASET_SUFFIX;
target.namespace = DataStream.sanitizeNamespace(target.namespace);

if (target.namespace == null) {
target.namespace = DEFAULT_NAMESPACE;
}
target.index = target.type + "-" + target.dataset + "-" + target.namespace;
} else {
target.type = null;
target.dataset = null;
target.namespace = null;
}
return target;
}

private TargetIndex() {}

private void populateFrom(List<KeyValue> attributes) {
if (isPopulated()) {
return;
}
for (int i = 0, size = attributes.size(); i < size; i++) {
KeyValue attr = attributes.get(i);
if (attr.getKey().equals(ELASTICSEARCH_INDEX)) {
index = attr.getValue().getStringValue();
} else if (dataset == null && attr.getKey().equals(DATA_STREAM_DATASET)) {
dataset = attr.getValue().getStringValue();
} else if (namespace == null && attr.getKey().equals(DATA_STREAM_NAMESPACE)) {
namespace = attr.getValue().getStringValue();
}
}
}

private boolean isPopulated() {
return (dataset != null && namespace != null) || index != null;
}

public boolean isDataStream() {
return type != null && dataset != null && namespace != null;
}

public String index() {
return index;
}

public String type() {
return type;
}

public String dataset() {
return dataset;
}

public String namespace() {
return namespace;
}

@Override
public String toString() {
return index;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;

TargetIndex that = (TargetIndex) o;
return index.equals(that.index);
}

@Override
public int hashCode() {
return index.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
* In these cases, the behavior is undefined but does not lead to data loss.
*/
public record MappingHints(boolean aggregateMetricDouble, boolean docCount) {
public static final String MAPPING_HINTS = "elasticsearch.mapping.hints";

private static final String MAPPING_HINTS = "elasticsearch.mapping.hints";
private static final MappingHints EMPTY = new MappingHints(false, false);
private static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double";
private static final String DOC_COUNT = "_doc_count";
Expand Down Expand Up @@ -62,4 +61,8 @@ public static MappingHints fromAttributes(List<KeyValue> attributes) {
public static MappingHints empty() {
return EMPTY;
}

public static boolean isMappingHintsAttribute(String attributeKey) {
return attributeKey.equals(MAPPING_HINTS);
}
}
Loading