Skip to content

Commit 82431fe

Browse files
committed
Fixed all tags
1 parent 30b796e commit 82431fe

File tree

7 files changed

+54
-53
lines changed

7 files changed

+54
-53
lines changed

dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/test/groovy/ArmeriaGrpcTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ abstract class ArmeriaGrpcTest extends VersionedNamingTestBase {
254254
if (isDataStreamsEnabled()) {
255255
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
256256
verifyAll(first) {
257-
tags.hasAllTags("direction:in", "topic:somequeue", "type:grpc")
257+
tags.hasAllTags("direction:out", "type:grpc")
258258
}
259259

260260
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }

dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,7 @@
1212
import datadog.trace.api.datastreams.DataStreamsContext;
1313
import datadog.trace.api.datastreams.DataStreamsTags;
1414
import datadog.trace.api.naming.SpanNaming;
15-
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
16-
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
17-
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
18-
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
19-
import datadog.trace.bootstrap.instrumentation.api.Tags;
20-
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
15+
import datadog.trace.bootstrap.instrumentation.api.*;
2116
import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator;
2217
import java.util.function.Function;
2318
import java.util.function.Supplier;
@@ -130,7 +125,7 @@ public AgentSpan onConsume(final PubsubMessage message, final String subscriptio
130125
final AgentSpan span = startSpan(PUBSUB_CONSUME, spanContext);
131126
final CharSequence parsedSubscription = extractSubscription(subscription);
132127
DataStreamsTags tags =
133-
DataStreamsTags.create(
128+
DataStreamsTags.createWithSubscription(
134129
"google-pubsub", DataStreamsTags.Direction.Inbound, parsedSubscription.toString());
135130
final Timestamp publishTime = message.getPublishTime();
136131
// FIXME: use full nanosecond resolution when this method will accept nanos

dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
1-
import datadog.trace.api.datastreams.DataStreamsTags
2-
3-
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
4-
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
5-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled
6-
71
import datadog.trace.agent.test.asserts.TraceAssert
82
import datadog.trace.agent.test.naming.VersionedNamingTestBase
93
import datadog.trace.api.Config
104
import datadog.trace.api.DDTags
5+
import datadog.trace.api.datastreams.DataStreamsTags
116
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
127
import datadog.trace.bootstrap.instrumentation.api.Tags
138
import datadog.trace.common.writer.ListWriter
@@ -23,15 +18,19 @@ import org.junit.Rule
2318
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
2419
import org.springframework.kafka.listener.KafkaMessageListenerContainer
2520
import org.springframework.kafka.listener.MessageListener
21+
import org.springframework.kafka.test.EmbeddedKafkaBroker
22+
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
2623
import org.springframework.kafka.test.utils.ContainerTestUtils
2724
import org.springframework.kafka.test.utils.KafkaTestUtils
28-
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
29-
import org.springframework.kafka.test.EmbeddedKafkaBroker
3025
import spock.lang.Shared
3126

3227
import java.util.concurrent.LinkedBlockingQueue
3328
import java.util.concurrent.TimeUnit
3429

30+
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
31+
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
32+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled
33+
3534
abstract class KafkaClientTestBase extends VersionedNamingTestBase {
3635
static final SHARED_TOPIC = "shared.topic"
3736

@@ -233,10 +232,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
233232
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}"
234233

235234
if (isDataStreamsEnabled()) {
236-
def val = DataStreamsTags.hasAllTags("direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka")
237235
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
238236
verifyAll(first) {
239-
tags.toString() == val.toString()
237+
tags.hasAllTags("direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka")
240238
}
241239

242240
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
@@ -249,22 +247,18 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
249247
"type:kafka"
250248
)
251249
}
252-
List<String> produce = [
253-
"kafka_cluster_id:$clusterId",
254-
"partition:" + received.partition(),
255-
"topic:" + SHARED_TOPIC,
256-
"type:kafka_produce"
257-
]
258-
List<String> commit = [
259-
"consumer_group:sender",
260-
"kafka_cluster_id:$clusterId",
261-
"partition:" + received.partition(),
262-
"topic:" + SHARED_TOPIC,
263-
"type:kafka_commit"
264-
]
265-
verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) {
266-
contains(new AbstractMap.SimpleEntry<List<String>, Long>(commit, 1).toString())
267-
contains(new AbstractMap.SimpleEntry<List<String>, Long>(produce, 0).toString())
250+
def sorted = new ArrayList<DataStreamsTags>(TEST_DATA_STREAMS_WRITER.backlogs).sort()
251+
verifyAll(sorted) {
252+
size() == 2
253+
get(0).hasAllTags("consumer_group:sender",
254+
"kafka_cluster_id:$clusterId",
255+
"partition:" + received.partition(),
256+
"topic:" + SHARED_TOPIC,
257+
"type:kafka_commit")
258+
get(1).hasAllTags("kafka_cluster_id:$clusterId",
259+
"partition:" + received.partition(),
260+
"topic:" + SHARED_TOPIC,
261+
"type:kafka_produce")
268262
}
269263
}
270264

dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import datadog.trace.agent.test.AgentTestRunner
2-
import datadog.trace.api.datastreams.DataStreamsTags
32
import datadog.trace.core.datastreams.StatsGroup
43
import org.apache.kafka.clients.admin.AdminClient
54
import org.apache.kafka.clients.admin.AdminClientConfig
@@ -14,12 +13,12 @@ import org.apache.kafka.common.utils.Time
1413
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
1514
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
1615
import org.apache.kafka.connect.runtime.Herder
17-
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo
18-
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig
19-
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder
2016
import org.apache.kafka.connect.runtime.Worker
2117
import org.apache.kafka.connect.runtime.WorkerConfig
2218
import org.apache.kafka.connect.runtime.isolation.Plugins
19+
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo
20+
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig
21+
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder
2322
import org.apache.kafka.connect.storage.FileOffsetBackingStore
2423
import org.apache.kafka.connect.util.Callback
2524
import org.springframework.kafka.test.EmbeddedKafkaBroker
@@ -162,12 +161,8 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner {
162161
}
163162

164163
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
165-
def control = DataStreamsTags.hasAllTags("direction:in", "group:test-consumer-group", "topic:test-topic", "type:kafka")
166164
verifyAll(second) {
167-
tags.direction == control.direction
168-
tags.group == control.group
169-
tags.topic == control.topic
170-
tags.type == control.type
165+
tags.hasAllTags("direction:in", "group:test-consumer-group", "topic:test-topic", "type:kafka")
171166
}
172167
TEST_DATA_STREAMS_WRITER.getServices().contains('file-source-connector')
173168

@@ -289,13 +284,8 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner {
289284
}
290285

291286
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
292-
def control = DataStreamsTags.hasAllTags("direction:in", "group:connect-file-sink-connector", "topic:test-topic", "type:kafka", "kafka_cluster_id:" + clusterId)
293287
verifyAll(second) {
294-
tags.direction == control.direction
295-
tags.group == control.group
296-
tags.topic == control.topic
297-
tags.type == control.type
298-
tags.kafkaClusterId == control.kafkaClusterId
288+
tags.hasAllTags("direction:in", "group:connect-file-sink-connector", "topic:test-topic", "type:kafka", "kafka_cluster_id:" + clusterId)
299289
}
300290
TEST_DATA_STREAMS_WRITER.getServices().contains('file-sink-connector')
301291

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.agent.test.datastreams
22

3+
import datadog.trace.api.datastreams.DataStreamsTags
34
import datadog.trace.core.datastreams.DatastreamsPayloadWriter
45
import datadog.trace.core.datastreams.StatsBucket
56
import datadog.trace.core.datastreams.StatsGroup
@@ -16,7 +17,7 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
1617
private final List<StatsGroup> groups = []
1718

1819
@SuppressWarnings('UnusedPrivateField')
19-
private final Set<String> backlogs = []
20+
private final Set<DataStreamsTags> backlogs = []
2021

2122
private final Set<String> serviceNameOverrides = []
2223

@@ -28,8 +29,8 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
2829
data.each { this.@groups.addAll(it.groups) }
2930
for (StatsBucket bucket : data) {
3031
if (bucket.backlogs != null) {
31-
for (Map.Entry<List<String>, Long> backlog : bucket.backlogs) {
32-
this.@backlogs.add(backlog.toString())
32+
for (Map.Entry<DataStreamsTags, Long> backlog : bucket.backlogs) {
33+
this.@backlogs.add(backlog.getKey())
3334
}
3435
}
3536
}
@@ -47,7 +48,7 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
4748
Collections.unmodifiableList(new ArrayList<>(this.@groups))
4849
}
4950

50-
synchronized List<String> getBacklogs() {
51+
synchronized List<DataStreamsTags> getBacklogs() {
5152
Collections.unmodifiableList(new ArrayList<>(this.@backlogs))
5253
}
5354

internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,25 @@ public static DataStreamsTags create(String type, Direction direction, String to
7575
return DataStreamsTags.createWithGroup(type, direction, topic, null);
7676
}
7777

78+
public static DataStreamsTags createWithSubscription(
79+
String type, Direction direction, String subscription) {
80+
return new DataStreamsTags(
81+
null,
82+
direction,
83+
null,
84+
null,
85+
type,
86+
subscription,
87+
null,
88+
null,
89+
null,
90+
null,
91+
null,
92+
null,
93+
null,
94+
null);
95+
}
96+
7897
public static DataStreamsTags create(
7998
String type, Direction direction, String topic, String group, String kafkaClusterId) {
8099
return new DataStreamsTags(

internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,13 @@ class DataStreamsTagsTest extends Specification {
110110
def three = DataStreamsTags.create("type", DataStreamsTags.Direction.Outbound, "topic", "group", "cluster")
111111
def four = DataStreamsTags.createWithPartition("type", "topic", "partition", "cluster", "group")
112112
def five = DataStreamsTags.createWithDataset("type", DataStreamsTags.Direction.Outbound, "topic", "dataset", "namespace")
113+
def six = DataStreamsTags.createWithSubscription("type", DataStreamsTags.Direction.Inbound, "subscription")
113114
expect:
114115
one.hasAllTags("type:type", "direction:out")
115116
two.hasAllTags("type:type", "direction:out", "topic:topic")
116117
three.hasAllTags("type:type", "direction:out", "topic:topic", "group:group", "kafka_cluster_id:cluster")
117118
four.hasAllTags("type:type", "topic:topic", "partition:partition", "kafka_cluster_id:cluster", "consumer_group:group")
118119
five.hasAllTags("type:type", "direction:out", "topic:topic", "ds.name:dataset", "ds.namespace:namespace")
120+
six.hasAllTags("type:type", "direction:in", "subscription:subscription")
119121
}
120122
}

0 commit comments

Comments
 (0)