Skip to content

Commit bd3f6f5

Browse files
Remove high cardinality tags from Data Streams hashes (#10287)
* fix high cardinality hashes * Add unit tests
1 parent de09bad commit bd3f6f5

File tree

2 files changed

+150
-13
lines changed

2 files changed

+150
-13
lines changed

internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ public DataStreamsTags(
334334
}
335335
}
336336

337-
// hashable tags are 0-4
337+
// hashable tags are 0-6: bus, direction, exchange, topic, type, subscription, kafkaClusterId
338338
for (int i = 0; i < 7; i++) {
339339
String tag = this.tagByIndex(i);
340340
if (tag != null) {
@@ -343,9 +343,9 @@ public DataStreamsTags(
343343
}
344344
}
345345

346-
// aggregation tags are 5-7
346+
// aggregation tags are 7-11: datasetName, datasetNamespace, isManual, group, consumerGroup
347347
this.aggregationHash = this.hash;
348-
for (int i = 7; i < 10; i++) {
348+
for (int i = 7; i < 12; i++) {
349349
String tag = this.tagByIndex(i);
350350
if (tag != null) {
351351
this.nonNullSize++;
@@ -354,9 +354,9 @@ public DataStreamsTags(
354354
}
355355
}
356356

357-
// the rest are values
357+
// values are 12-13: partition, hasRoutingKey
358358
this.completeHash = aggregationHash;
359-
for (int i = 10; i < this.size(); i++) {
359+
for (int i = 12; i < this.size(); i++) {
360360
String tag = this.tagByIndex(i);
361361
if (tag != null) {
362362
this.nonNullSize++;
@@ -370,6 +370,8 @@ public int size() {
370370
return 14;
371371
}
372372

373+
// WARNING: DO NOT REORDER! Indices 0-6 are hash tags, 7-11 are aggregation tags, 12-13 are
374+
// values. Reordering breaks hashing!
373375
public String tagByIndex(int index) {
374376
switch (index) {
375377
case 0:
@@ -385,21 +387,21 @@ public String tagByIndex(int index) {
385387
case 5:
386388
return this.subscription;
387389
case 6:
388-
return this.datasetName;
390+
return this.kafkaClusterId;
389391
case 7:
390-
return this.datasetNamespace;
392+
return this.datasetName;
391393
case 8:
392-
return this.isManual;
394+
return this.datasetNamespace;
393395
case 9:
394-
return this.group;
396+
return this.isManual;
395397
case 10:
396-
return this.consumerGroup;
398+
return this.group;
397399
case 11:
398-
return this.hasRoutingKey;
400+
return this.consumerGroup;
399401
case 12:
400-
return this.kafkaClusterId;
401-
case 13:
402402
return this.partition;
403+
case 13:
404+
return this.hasRoutingKey;
403405
default:
404406
return null;
405407
}

internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,139 @@ class DataStreamsTagsTest extends Specification {
119119
five.hasAllTags("type:type", "direction:out", "topic:topic", "ds.name:dataset", "ds.namespace:namespace")
120120
six.hasAllTags("type:type", "direction:in", "subscription:subscription")
121121
}
122+
123+
def 'test tagByIndex returns tags in correct order'() {
124+
setup:
125+
def tags = getTags(0)
126+
127+
expect: "Hash tags (0-6): bus, direction, exchange, topic, type, subscription, kafkaClusterId"
128+
tags.tagByIndex(0) == "bus:bus0"
129+
tags.tagByIndex(1) == "direction:out"
130+
tags.tagByIndex(2) == "exchange:exchange0"
131+
tags.tagByIndex(3) == "topic:topic0"
132+
tags.tagByIndex(4) == "type:type0"
133+
tags.tagByIndex(5) == "subscription:subscription0"
134+
tags.tagByIndex(6) == "kafka_cluster_id:kafka_cluster_id0"
135+
136+
and: "Aggregation tags (7-11): datasetName, datasetNamespace, isManual, group, consumerGroup"
137+
tags.tagByIndex(7) == "ds.name:dataset_name0"
138+
tags.tagByIndex(8) == "ds.namespace:dataset_namespace0"
139+
tags.tagByIndex(9) == "manual_checkpoint:true"
140+
tags.tagByIndex(10) == "group:group0"
141+
tags.tagByIndex(11) == "consumer_group:consumer_group0"
142+
143+
and: "Values (12-13): partition, hasRoutingKey"
144+
tags.tagByIndex(12) == "partition:partition0"
145+
tags.tagByIndex(13) == "has_routing_key:true"
146+
147+
and: "Out of bounds returns null"
148+
tags.tagByIndex(14) == null
149+
tags.tagByIndex(-1) == null
150+
}
151+
152+
def 'test only hash tags affect primary hash'() {
153+
setup: "Create base tags with all hash tags (0-6) set"
154+
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
155+
"type", "subscription", null, null, null, null, null, null, "cluster", null)
156+
157+
when: "Change only aggregation tag (datasetName)"
158+
def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
159+
"type", "subscription", "dataset", null, null, null, null, null, "cluster", null)
160+
161+
then: "Primary hash should be the same (aggregation tag doesn't affect it)"
162+
base.getHash() == withDataset.getHash()
163+
164+
when: "Change only a value tag (partition)"
165+
def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
166+
"type", "subscription", null, null, null, null, null, null, "cluster", "partition")
167+
168+
then: "Primary hash should still be the same (value tag doesn't affect it)"
169+
base.getHash() == withPartition.getHash()
170+
171+
when: "Change a hash tag (topic)"
172+
def withDifferentTopic = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic2",
173+
"type", "subscription", null, null, null, null, null, null, "cluster", null)
174+
175+
then: "Primary hash should be different"
176+
base.getHash() != withDifferentTopic.getHash()
177+
}
178+
179+
def 'test aggregation tags affect aggregation hash but not primary hash'() {
180+
setup: "Create base tags"
181+
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
182+
"type", "subscription", null, null, null, null, null, null, "cluster", null)
183+
184+
when: "Add aggregation tag (datasetName)"
185+
def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
186+
"type", "subscription", "dataset", null, null, null, null, null, "cluster", null)
187+
188+
then: "Primary hash is same, but aggregation hash is different"
189+
base.getHash() == withDataset.getHash()
190+
base.getAggregationHash() != withDataset.getAggregationHash()
191+
192+
when: "Add different aggregation tag (group)"
193+
def withGroup = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
194+
"type", "subscription", null, null, null, "group", null, null, "cluster", null)
195+
196+
then: "Primary hash is same, but aggregation hash is different"
197+
base.getHash() == withGroup.getHash()
198+
base.getAggregationHash() != withGroup.getAggregationHash()
199+
}
200+
201+
def 'test values affect only complete hash'() {
202+
setup: "Create base tags"
203+
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
204+
"type", "subscription", null, null, null, null, null, null, "cluster", null)
205+
206+
when: "Add value tag (partition)"
207+
def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
208+
"type", "subscription", null, null, null, null, null, null, "cluster", "partition")
209+
210+
then: "Primary and aggregation hashes are same, complete hash is different (via equals)"
211+
base.getHash() == withPartition.getHash()
212+
base.getAggregationHash() == withPartition.getAggregationHash()
213+
base != withPartition // equals uses completeHash
214+
215+
when: "Add different value tag (hasRoutingKey)"
216+
def withRoutingKey = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
217+
"type", "subscription", null, null, null, null, null, true, "cluster", null)
218+
219+
then: "Primary and aggregation hashes are same, but objects are different"
220+
base.getHash() == withRoutingKey.getHash()
221+
base.getAggregationHash() == withRoutingKey.getAggregationHash()
222+
base != withRoutingKey
223+
}
224+
225+
def 'test all three hash levels are different when appropriate tags change'() {
226+
setup:
227+
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic",
228+
"type", null, null, null, null, null, null, null, null, null)
229+
230+
when: "Add hash tag -> all hashes change"
231+
def withExchange = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
232+
"type", null, null, null, null, null, null, null, null, null)
233+
234+
then:
235+
base.getHash() != withExchange.getHash()
236+
base.getAggregationHash() != withExchange.getAggregationHash()
237+
base != withExchange
238+
239+
when: "Add aggregation tag -> only aggregation and complete hashes change"
240+
def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic",
241+
"type", null, "dataset", null, null, null, null, null, null, null)
242+
243+
then:
244+
base.getHash() == withDataset.getHash() // primary hash unchanged
245+
base.getAggregationHash() != withDataset.getAggregationHash()
246+
base != withDataset
247+
248+
when: "Add value tag -> only complete hash changes"
249+
def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic",
250+
"type", null, null, null, null, null, null, null, null, "partition")
251+
252+
then:
253+
base.getHash() == withPartition.getHash() // primary hash unchanged
254+
base.getAggregationHash() == withPartition.getAggregationHash() // aggregation hash unchanged
255+
base != withPartition // but complete hash changed
256+
}
122257
}

0 commit comments

Comments
 (0)