Skip to content

Commit 7f639fa

Browse files
committed
Fix messaging client id
1 parent 2f03ab3 commit 7f639fa

File tree

12 files changed

+55
-75
lines changed

12 files changed

+55
-75
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public abstract class KafkaClientBaseTest {
5454
private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest.class);
5555

5656
protected static final String SHARED_TOPIC = "shared.topic";
57+
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
58+
AttributeKey.stringKey("messaging.client_id");
5759

5860
private KafkaContainer kafka;
5961
protected Producer<Integer, String> producer;
@@ -162,9 +164,7 @@ protected static List<AttributeAssertion> sendAttributes(
162164
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
163165
equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
164166
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
165-
satisfies(
166-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
167-
stringAssert -> stringAssert.startsWith("producer")),
167+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
168168
satisfies(
169169
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
170170
AbstractStringAssert::isNotEmpty),
@@ -196,9 +196,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
196196
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
197197
equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
198198
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
199-
satisfies(
200-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
201-
stringAssert -> stringAssert.startsWith("consumer")),
199+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
202200
satisfies(
203201
MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT,
204202
AbstractLongAssert::isPositive)));
@@ -224,9 +222,7 @@ protected static List<AttributeAssertion> processAttributes(
224222
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
225223
equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
226224
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
227-
satisfies(
228-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
229-
stringAssert -> stringAssert.startsWith("consumer")),
225+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
230226
satisfies(
231227
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
232228
AbstractStringAssert::isNotEmpty),

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ void assertTraces() {
3434
SHARED_TOPIC),
3535
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
3636
satisfies(
37-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
37+
MESSAGING_CLIENT_ID,
3838
stringAssert -> stringAssert.startsWith("producer"))),
3939
span ->
4040
span.hasName(SHARED_TOPIC + " process")
@@ -59,7 +59,7 @@ void assertTraces() {
5959
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
6060
"test"),
6161
satisfies(
62-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
62+
MESSAGING_CLIENT_ID,
6363
stringAssert -> stringAssert.startsWith("consumer"))),
6464
span ->
6565
span.hasName("process child")

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void assertTraces() {
4141
SHARED_TOPIC),
4242
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
4343
satisfies(
44-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
44+
MESSAGING_CLIENT_ID,
4545
stringAssert -> stringAssert.startsWith("producer"))));
4646
SpanContext spanContext = trace.getSpan(1).getSpanContext();
4747
producerSpanContext.set(
@@ -68,7 +68,7 @@ void assertTraces() {
6868
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
6969
"test"),
7070
satisfies(
71-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
71+
MESSAGING_CLIENT_ID,
7272
stringAssert -> stringAssert.startsWith("consumer")),
7373
equalTo(
7474
MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)),
@@ -96,7 +96,7 @@ void assertTraces() {
9696
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
9797
"test"),
9898
satisfies(
99-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
99+
MESSAGING_CLIENT_ID,
100100
stringAssert -> stringAssert.startsWith("consumer"))),
101101
span ->
102102
span.hasName("process child")

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
6161
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
6262
equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
6363
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
64-
satisfies(
65-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
66-
stringAssert -> stringAssert.startsWith("producer")),
64+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
6765
satisfies(
6866
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
6967
AbstractStringAssert::isNotEmpty),
@@ -101,8 +99,7 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
10199
AbstractLongAssert::isNotNegative),
102100
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
103101
satisfies(
104-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
105-
stringAssert -> stringAssert.startsWith("consumer"))));
102+
MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer"))));
106103
if (testHeaders) {
107104
assertions.add(
108105
equalTo(

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,7 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
8383
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
8484
equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
8585
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
86-
satisfies(
87-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
88-
stringAssert -> stringAssert.startsWith("producer")),
86+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
8987
satisfies(
9088
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
9189
AbstractStringAssert::isNotEmpty),
@@ -123,8 +121,7 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
123121
AbstractLongAssert::isNotNegative),
124122
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
125123
satisfies(
126-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
127-
stringAssert -> stringAssert.startsWith("consumer"))));
124+
MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer"))));
128125
if (testHeaders) {
129126
assertions.add(
130127
equalTo(
@@ -143,9 +140,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
143140
equalTo(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
144141
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
145142
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
146-
satisfies(
147-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
148-
stringAssert -> stringAssert.startsWith("consumer")),
143+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
149144
equalTo(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)));
150145
if (testHeaders) {
151146
assertions.add(

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
100100
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
101101
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
102102
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
103-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("producer") }
103+
"messaging.client_id" { it.startsWith("producer") }
104104
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
105105
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
106106
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
@@ -119,7 +119,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
119119
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
120120
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
121121
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "receive"
122-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
122+
"messaging.client_id" { it.endsWith("consumer") }
123123
"$MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1
124124
if (Boolean.getBoolean("testLatestDeps")) {
125125
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application"
@@ -136,7 +136,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
136136
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
137137
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
138138
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
139-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
139+
"messaging.client_id" { it.endsWith("consumer") }
140140
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
141141
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
142142
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
@@ -157,7 +157,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
157157
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
158158
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
159159
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
160-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("producer") }
160+
"messaging.client_id" { it.endsWith("producer") }
161161
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
162162
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
163163
}
@@ -175,7 +175,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
175175
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
176176
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
177177
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "receive"
178-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
178+
"messaging.client_id" { it.startsWith("consumer") }
179179
"$MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1
180180
if (Boolean.getBoolean("testLatestDeps")) {
181181
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test"
@@ -192,7 +192,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
192192
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
193193
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
194194
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
195-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
195+
"messaging.client_id" { it.startsWith("consumer") }
196196
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
197197
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
198198
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
9595
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
9696
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
9797
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
98-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" "producer-1"
98+
"messaging.client_id" "producer-1"
9999
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
100100
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
101101
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
@@ -110,7 +110,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
110110
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
111111
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
112112
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
113-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
113+
"messaging.client_id" { it.endsWith("consumer") }
114114
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
115115
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
116116
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
@@ -134,7 +134,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
134134
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
135135
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
136136
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
137-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" String
137+
"messaging.client_id" String
138138
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
139139
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
140140
}
@@ -148,7 +148,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
148148
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
149149
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
150150
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
151-
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
151+
"messaging.client_id" { it.startsWith("consumer") }
152152
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
153153
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
154154
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0

0 commit comments

Comments
 (0)