Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ public TsidBuilder addAll(TsidBuilder other) {
* @throws IllegalArgumentException if no dimensions have been added
*/
public MurmurHash3.Hash128 hash() {
throwIfEmpty();
Collections.sort(dimensions);
murmur3Hasher.reset();
for (Dimension dim : dimensions) {
Expand Down Expand Up @@ -299,6 +298,10 @@ public int size() {
return dimensions.size();
}

public void sortDimensions() {
Collections.sort(dimensions);
}

/**
* A functional interface that describes how objects of a complex type are added to a TSID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.cluster.routing;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.hash.MurmurHash3;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.Text;

Expand Down Expand Up @@ -115,11 +116,9 @@ public void testAddAllWithNullOrEmpty() {
}

public void testExceptionWhenNoDimensions() {
// Test that exception is thrown when no dimensions are added
TsidBuilder builder = TsidBuilder.newBuilder();

IllegalArgumentException hashException = expectThrows(IllegalArgumentException.class, builder::hash);
assertTrue(hashException.getMessage().contains("Dimensions are empty"));
assertThat(builder.hash(), equalTo(new MurmurHash3.Hash128()));

IllegalArgumentException tsidException = expectThrows(IllegalArgumentException.class, builder::buildTsid);
assertTrue(tsidException.getMessage().contains("Dimensions are empty"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
Expand All @@ -47,6 +49,7 @@
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.io.IOException;
import java.util.Map;

/**
* Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests.
Expand Down Expand Up @@ -126,11 +129,13 @@ private void addIndexRequest(
DataPointGroupingContext.DataPointGroup dataPointGroup
) throws IOException {
try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) {
var dynamicTemplates = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dataPointGroup);
Map<String, String> dynamicTemplates = Maps.newHashMapWithExpectedSize(dataPointGroup.dataPoints().size());
BytesRef tsid = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dynamicTemplates, dataPointGroup);
bulkRequestBuilder.add(
new IndexRequest(dataPointGroup.targetIndex().index()).opType(DocWriteRequest.OpType.CREATE)
.setRequireDataStream(true)
.source(xContentBuilder)
.tsid(tsid)
.setDynamicTemplates(dynamicTemplates)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics)
Hash128 resourceHash = resourceTsidBuilder.hash();
ResourceGroup resourceGroup = resourceGroups.get(resourceHash);
if (resourceGroup == null) {
resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes());
resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes(), resourceTsidBuilder);
resourceGroups.put(resourceHash, resourceGroup);
}
return resourceGroup;
Expand All @@ -137,20 +137,25 @@ private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics)
class ResourceGroup {
private final Resource resource;
private final ByteString resourceSchemaUrl;
private final TsidBuilder resourceTsidBuilder;
private final Map<Hash128, ScopeGroup> scopes;

ResourceGroup(Resource resource, ByteString resourceSchemaUrl) {
ResourceGroup(Resource resource, ByteString resourceSchemaUrl, TsidBuilder resourceTsidBuilder) {
this.resource = resource;
this.resourceSchemaUrl = resourceSchemaUrl;
this.resourceTsidBuilder = resourceTsidBuilder;
this.scopes = new HashMap<>();
}

public ScopeGroup getOrCreateScope(ScopeMetrics scopeMetrics) {
TsidBuilder scopeTsidBuilder = ScopeTsidFunnel.forScope(byteStringAccessor, scopeMetrics);
Hash128 scopeHash = scopeTsidBuilder.hash();
scopeTsidBuilder.addAll(resourceTsidBuilder);
// functionally not required but may speed up sorting of all dimensions if these are already partially sorted
scopeTsidBuilder.sortDimensions();
ScopeGroup scopeGroup = scopes.get(scopeHash);
if (scopeGroup == null) {
scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes());
scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes(), scopeTsidBuilder);
scopes.put(scopeHash, scopeGroup);
}
return scopeGroup;
Expand All @@ -169,15 +174,17 @@ class ScopeGroup {
private final ResourceGroup resourceGroup;
private final InstrumentationScope scope;
private final ByteString scopeSchemaUrl;
private final TsidBuilder scopeTsidBuilder;
@Nullable
private final String receiverName;
// index -> timestamp -> dataPointGroupHash -> DataPointGroup
private final Map<TargetIndex, Map<Hash128, Map<Hash128, DataPointGroup>>> dataPointGroupsByIndexAndTimestamp;

ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) {
ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl, TsidBuilder scopeTsidBuilder) {
this.resourceGroup = resourceGroup;
this.scope = scope;
this.scopeSchemaUrl = scopeSchemaUrl;
this.scopeTsidBuilder = scopeTsidBuilder;
this.dataPointGroupsByIndexAndTimestamp = new HashMap<>();
this.receiverName = extractReceiverName(scope);
}
Expand Down Expand Up @@ -216,8 +223,13 @@ public void addDataPoint(DataPoint dataPoint) {
}

private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(byteStringAccessor, dataPoint);
TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(
byteStringAccessor,
dataPoint,
scopeTsidBuilder.size()
);
Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash();
dataPointGroupTsidBuilder.addAll(scopeTsidBuilder);
// in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp
Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano());
TargetIndex targetIndex = TargetIndex.evaluate(
Expand All @@ -236,6 +248,7 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
resourceGroup.resourceSchemaUrl,
scope,
scopeSchemaUrl,
dataPointGroupTsidBuilder,
dataPoint.getAttributes(),
dataPoint.getUnit(),
targetIndex
Expand All @@ -261,6 +274,7 @@ public static final class DataPointGroup {
private final ByteString resourceSchemaUrl;
private final InstrumentationScope scope;
private final ByteString scopeSchemaUrl;
private final TsidBuilder tsidBuilder;
private final List<KeyValue> dataPointAttributes;
private final String unit;
private final Set<String> metricNames = new HashSet<>();
Expand All @@ -273,6 +287,7 @@ public DataPointGroup(
ByteString resourceSchemaUrl,
InstrumentationScope scope,
ByteString scopeSchemaUrl,
TsidBuilder tsidBuilder,
List<KeyValue> dataPointAttributes,
String unit,
TargetIndex targetIndex
Expand All @@ -281,6 +296,7 @@ public DataPointGroup(
this.resourceSchemaUrl = resourceSchemaUrl;
this.scope = scope;
this.scopeSchemaUrl = scopeSchemaUrl;
this.tsidBuilder = tsidBuilder;
this.dataPointAttributes = dataPointAttributes;
this.unit = unit;
this.targetIndex = targetIndex;
Expand Down Expand Up @@ -334,6 +350,10 @@ public ByteString scopeSchemaUrl() {
return scopeSchemaUrl;
}

public TsidBuilder tsidBuilder() {
return tsidBuilder;
}

public List<KeyValue> dataPointAttributes() {
return dataPointAttributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import com.google.protobuf.ByteString;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.routing.TsidBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.BufferedMurmur3Hasher;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -23,8 +25,8 @@
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -40,9 +42,11 @@ public MetricDocumentBuilder(BufferedByteStringAccessor byteStringAccessor) {
this.byteStringAccessor = byteStringAccessor;
}

public HashMap<String, String> buildMetricDocument(XContentBuilder builder, DataPointGroupingContext.DataPointGroup dataPointGroup)
throws IOException {
HashMap<String, String> dynamicTemplates = new HashMap<>();
public BytesRef buildMetricDocument(
XContentBuilder builder,
Map<String, String> dynamicTemplates,
DataPointGroupingContext.DataPointGroup dataPointGroup
) throws IOException {
List<DataPoint> dataPoints = dataPointGroup.dataPoints();
builder.startObject();
builder.field("@timestamp", TimeUnit.NANOSECONDS.toMillis(dataPointGroup.getTimestampUnixNano()));
Expand All @@ -53,7 +57,8 @@ public HashMap<String, String> buildMetricDocument(XContentBuilder builder, Data
buildDataStream(builder, dataPointGroup.targetIndex());
buildScope(builder, dataPointGroup.scopeSchemaUrl(), dataPointGroup.scope());
buildDataPointAttributes(builder, dataPointGroup.dataPointAttributes(), dataPointGroup.unit());
builder.field("_metric_names_hash", dataPointGroup.getMetricNamesHash(hasher));
String metricNamesHash = dataPointGroup.getMetricNamesHash(hasher);
builder.field("_metric_names_hash", metricNamesHash);

long docCount = 0;
builder.startObject("metrics");
Expand All @@ -75,7 +80,9 @@ public HashMap<String, String> buildMetricDocument(XContentBuilder builder, Data
builder.field("_doc_count", docCount);
}
builder.endObject();
return dynamicTemplates;
TsidBuilder tsidBuilder = dataPointGroup.tsidBuilder();
tsidBuilder.addStringDimension("_metric_names_hash", metricNamesHash);
return tsidBuilder.buildTsid();
}

private void buildResource(Resource resource, ByteString schemaUrl, XContentBuilder builder) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

public class DataPointTsidFunnel implements TsidFunnel<DataPoint> {

// for "unit" and "_metric_names_hash" that will be added in
// MetricDocumentBuilder once the data point group is complete
private static final int EXTRA_DIMENSIONS_SIZE = 2;
private final BufferedByteStringAccessor byteStringAccessor;

private DataPointTsidFunnel(BufferedByteStringAccessor byteStringAccessor) {
this.byteStringAccessor = byteStringAccessor;
}

public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint) {
TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size());
public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint, int scopeTsidBuilderSize) {
TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size() + scopeTsidBuilderSize + EXTRA_DIMENSIONS_SIZE);
new DataPointTsidFunnel(byteStringAccessor).add(dataPoint, tsidBuilder);
return tsidBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ResourceTsidFunnel(BufferedByteStringAccessor byteStringAccessor) {
}

public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAccessor, ResourceMetrics resourceMetrics) {
TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount() + 1);
TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount());
new ResourceTsidFunnel(byteStringAccessor).add(resourceMetrics, tsidBuilder);
return tsidBuilder;
}
Expand All @@ -34,6 +34,5 @@ public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAcces
public void add(ResourceMetrics resourceMetrics, TsidBuilder tsidBuilder) {
List<KeyValue> resourceAttributes = resourceMetrics.getResource().getAttributesList();
tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "resource.attributes."));
byteStringAccessor.addStringDimension(tsidBuilder, "schema_url", resourceMetrics.getSchemaUrlBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,13 @@ public static SummaryDataPoint createSummaryDataPoint(long timestamp, List<KeyVa
}

public static ExportMetricsServiceRequest createMetricsRequest(List<Metric> metrics) {
return createMetricsRequest(List.of(keyValue("service.name", "test-service")), metrics);
}

public static ExportMetricsServiceRequest createMetricsRequest(List<KeyValue> resourceAttributes, List<Metric> metrics) {
List<ResourceMetrics> resourceMetrics = new ArrayList<>();
for (Metric metric : metrics) {
resourceMetrics.add(
createResourceMetrics(
List.of(keyValue("service.name", "test-service")),
List.of(createScopeMetrics("test", "1.0.0", List.of(metric)))
)
);
resourceMetrics.add(createResourceMetrics(resourceAttributes, List.of(createScopeMetrics("test", "1.0.0", List.of(metric)))));
}

return ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(resourceMetrics).build();
Expand Down
Loading