Skip to content

Commit 4165e79

Browse files
committed
Fix _tsid calculation and add test for _tsid equivalence
1 parent eb52d6b commit 4165e79

File tree

6 files changed

+101
-17
lines changed

6 files changed

+101
-17
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ public void addIntDimension(String path, int value) {
4747
}
4848

4949
public void addLongDimension(String path, long value) {
50-
addDimension(path, new HashValue128(2, value));
50+
addDimension(path, new HashValue128(1, value));
5151
}
5252

5353
public void addDoubleDimension(String path, double value) {
5454
addDimension(path, new HashValue128(2, Double.doubleToLongBits(value)));
5555
}
5656

5757
public void addBooleanDimension(String path, boolean value) {
58-
addDimension(path, new HashValue128(4, value ? 1 : 0));
58+
addDimension(path, new HashValue128(3, value ? 1 : 0));
5959
}
6060

6161
public void addStringDimension(String path, String value) {
@@ -107,6 +107,10 @@ public void addAll(TsidBuilder other) {
107107
}
108108

109109
public HashValue128 hash() {
110+
return hashStream().get();
111+
}
112+
113+
public HashStream128 hashStream() {
110114
if (dimensions.isEmpty()) {
111115
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
112116
}
@@ -118,7 +122,7 @@ public HashValue128 hash() {
118122
hashStream.putLong(dim.value.getMostSignificantBits());
119123
hashStream.putLong(dim.value.getLeastSignificantBits());
120124
}
121-
return hashStream.get();
125+
return hashStream;
122126
}
123127

124128
/**

x-pack/plugin/otel-data/src/internalClusterTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingIT.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,13 @@
3333
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData;
3434
import io.opentelemetry.sdk.resources.Resource;
3535

36+
import org.elasticsearch.action.DocWriteRequest;
3637
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
3738
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
3839
import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
40+
import org.elasticsearch.action.bulk.BulkResponse;
41+
import org.elasticsearch.action.index.IndexRequest;
42+
import org.elasticsearch.action.support.WriteRequest;
3943
import org.elasticsearch.cluster.metadata.MappingMetadata;
4044
import org.elasticsearch.common.settings.Settings;
4145
import org.elasticsearch.common.transport.TransportAddress;
@@ -48,8 +52,10 @@
4852
import org.elasticsearch.painless.PainlessPlugin;
4953
import org.elasticsearch.plugins.Plugin;
5054
import org.elasticsearch.script.field.WriteField;
55+
import org.elasticsearch.search.SearchHits;
5156
import org.elasticsearch.test.ESSingleNodeTestCase;
5257
import org.elasticsearch.test.InternalSettingsPlugin;
58+
import org.elasticsearch.xcontent.XContentType;
5359
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
5460
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
5561
import org.elasticsearch.xpack.constantkeyword.ConstantKeywordMapperPlugin;
@@ -68,6 +74,7 @@
6874
import java.net.InetSocketAddress;
6975
import java.time.Duration;
7076
import java.util.ArrayList;
77+
import java.util.Arrays;
7178
import java.util.Collection;
7279
import java.util.List;
7380
import java.util.Map;
@@ -85,6 +92,7 @@
8592
import static org.hamcrest.Matchers.emptyArray;
8693
import static org.hamcrest.Matchers.equalTo;
8794
import static org.hamcrest.Matchers.greaterThan;
95+
import static org.hamcrest.Matchers.hasSize;
8896
import static org.hamcrest.Matchers.is;
8997
import static org.hamcrest.Matchers.not;
9098

@@ -491,6 +499,75 @@ public void testSummary() throws Exception {
491499
});
492500
}
493501

502+
public void testTsidForBulkIsSame() throws IOException {
503+
// This test is to ensure that the _tsid is the same when indexing via a bulk request or OTLP.
504+
long now = Clock.getDefault().now();
505+
506+
export(
507+
List.of(
508+
createDoubleGauge(
509+
TEST_RESOURCE,
510+
Attributes.builder()
511+
.put("string", "foo")
512+
.put("boolean", true)
513+
.put("long", 42L)
514+
.put("double", 42.0)
515+
.put("host.ip", "127.0.0.1")
516+
.build(),
517+
"metric",
518+
42,
519+
"By",
520+
now
521+
)
522+
)
523+
);
524+
525+
// Index the same metric via a bulk request
526+
BulkResponse bulkResp = client().prepareBulk("metrics-generic.otel-default")
527+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
528+
.add(new IndexRequest().opType(DocWriteRequest.OpType.CREATE).source("""
529+
{
530+
"@timestamp": $time,
531+
"start_timestamp": $time,
532+
"data_stream": {
533+
"type": "metrics",
534+
"dataset": "generic.otel",
535+
"namespace": "default"
536+
},
537+
"_metric_names_hash": "ef2c1a68",
538+
"metrics": {
539+
"metric": 42.0
540+
},
541+
"attributes": {
542+
"string": "foo",
543+
"boolean": true,
544+
"long": 42,
545+
"double": 42.0,
546+
"host.ip": "127.0.0.1"
547+
},
548+
"resource": {
549+
"attributes": {
550+
"service.name": "elasticsearch"
551+
}
552+
},
553+
"scope": {
554+
"name": "io.opentelemetry.example.metrics"
555+
},
556+
"unit": "By"
557+
}
558+
""".replace("$time", Long.toString(TimeUnit.NANOSECONDS.toMillis(now) + 1)), XContentType.JSON))
559+
.get();
560+
561+
assertThat(bulkResp.hasFailures(), equalTo(false));
562+
563+
assertResponse(client().prepareSearch("metrics-generic.otel-default").addDocValueField("_tsid"), resp -> {
564+
SearchHits hits = resp.getHits();
565+
assertThat(hits.getHits(), arrayWithSize(2));
566+
List<String> tsids = Arrays.stream(hits.getHits()).map(h -> h.field("_tsid").<String>getValue()).distinct().toList();
567+
assertThat(tsids, hasSize(1));
568+
});
569+
}
570+
494571
@SuppressWarnings("unchecked")
495572
private static <T> T evaluate(Map<String, Object> map, String path) {
496573
return (T) new WriteField(path, () -> map).get(null);

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,18 @@
77

88
package org.elasticsearch.xpack.oteldata.otlp.datapoint;
99

10+
import com.dynatrace.hash4j.hashing.HashStream32;
11+
import com.dynatrace.hash4j.hashing.HashValue128;
12+
import com.dynatrace.hash4j.hashing.Hasher32;
13+
import com.dynatrace.hash4j.hashing.Hashing;
14+
import com.google.protobuf.ByteString;
1015
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1116
import io.opentelemetry.proto.common.v1.InstrumentationScope;
1217
import io.opentelemetry.proto.common.v1.KeyValue;
1318
import io.opentelemetry.proto.metrics.v1.Metric;
1419
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
1520
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
1621
import io.opentelemetry.proto.resource.v1.Resource;
17-
18-
import com.dynatrace.hash4j.hashing.HashStream32;
19-
import com.dynatrace.hash4j.hashing.HashValue128;
20-
import com.dynatrace.hash4j.hashing.Hasher32;
21-
import com.dynatrace.hash4j.hashing.Hashing;
22-
import com.google.protobuf.ByteString;
23-
2422
import org.elasticsearch.cluster.routing.TsidBuilder;
2523
import org.elasticsearch.core.CheckedConsumer;
2624
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
@@ -198,7 +196,11 @@ public void addDataPoint(DataPoint dataPoint) {
198196

199197
private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
200198
TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(byteStringAccessor, dataPoint);
201-
HashValue128 dataPointGroupHash = dataPointGroupTsidBuilder.hash();
199+
// in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp
200+
HashValue128 dataPointGroupHash = dataPointGroupTsidBuilder.hashStream()
201+
.putLong(dataPoint.getTimestampUnixNano())
202+
.putLong(dataPoint.getStartTimestampUnixNano())
203+
.get();
202204
TargetIndex targetIndex = TargetIndex.route(
203205
"metrics",
204206
dataPoint.getAttributes(),

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public class BufferedByteStringAccessor {
3434
* @param value the value of the dimension as a {@link ByteString}
3535
*/
3636
public void addStringDimension(TsidBuilder tsidBuilder, String dimension, ByteString value) {
37+
if (value.isEmpty()) {
38+
return;
39+
}
3740
tsidBuilder.addStringDimension(dimension, toBytes(value), 0, value.size());
3841
}
3942

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@ private DataPointTsidFunnel(BufferedByteStringAccessor byteStringAccessor) {
2121
}
2222

2323
public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint) {
24-
TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size() + 3);
24+
TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size());
2525
new DataPointTsidFunnel(byteStringAccessor).add(dataPoint, tsidBuilder);
2626
return tsidBuilder;
2727
}
2828

2929
@Override
3030
public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) {
31-
tsidBuilder.addLongDimension("@timestamp", dataPoint.getTimestampUnixNano());
3231
tsidBuilder.add(dataPoint, DataPointDimensionsTsidFunnel.get(byteStringAccessor));
33-
tsidBuilder.addLongDimension("start_timestamp", dataPoint.getStartTimestampUnixNano());
3432
}
3533
}

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public static TsidBuilder forScope(BufferedByteStringAccessor byteStringAccessor
3333
@Override
3434
public void add(ScopeMetrics scopeMetrics, TsidBuilder tsidBuilder) {
3535
List<KeyValue> resourceAttributes = scopeMetrics.getScope().getAttributesList();
36-
byteStringAccessor.addStringDimension(tsidBuilder, "name", scopeMetrics.getScope().getNameBytes());
37-
byteStringAccessor.addStringDimension(tsidBuilder, "schema_url", scopeMetrics.getSchemaUrlBytes());
36+
byteStringAccessor.addStringDimension(tsidBuilder, "scope.name", scopeMetrics.getScope().getNameBytes());
37+
byteStringAccessor.addStringDimension(tsidBuilder, "scope.schema_url", scopeMetrics.getSchemaUrlBytes());
3838
tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "scope.attributes."));
39-
byteStringAccessor.addStringDimension(tsidBuilder, "version", scopeMetrics.getScope().getVersionBytes());
39+
byteStringAccessor.addStringDimension(tsidBuilder, "scope.version", scopeMetrics.getScope().getVersionBytes());
4040
}
4141
}

0 commit comments

Comments
 (0)