Skip to content

Commit 9f393c0

Browse files
committed
refactor(kafka): fix && support the experimental span attribute
1 parent 07e095d commit 9f393c0

File tree

38 files changed

+744
-234
lines changed

38 files changed

+744
-234
lines changed

docs/instrumentation-list.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1428,7 +1428,7 @@ libraries:
14281428
type: boolean
14291429
default: true
14301430
- name: otel.instrumentation.kafka.experimental-span-attributes
1431-
description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms"
1431+
description: Enables the capture of the experimental consumer attributes "kafka.record.queue_time_ms" and "messaging.kafka.bootstrap.servers"
14321432
type: boolean
14331433
default: false
14341434
kotlinx:

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
import io.opentelemetry.context.Context;
1818
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
1919
import io.opentelemetry.instrumentation.api.internal.Timer;
20-
import io.opentelemetry.instrumentation.api.util.VirtualField;
2120
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
2221
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
22+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
2323
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
2424
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2525
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2626
import java.time.Duration;
27+
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Properties;
2930
import net.bytebuddy.asm.Advice;
@@ -44,11 +45,7 @@ public ElementMatcher<TypeDescription> typeMatcher() {
4445
@Override
4546
public void transform(TypeTransformer transformer) {
4647
transformer.applyAdviceToMethod(
47-
isConstructor().and(takesArgument(0, Map.class)),
48-
this.getClass().getName() + "$ConstructorAdvice");
49-
50-
transformer.applyAdviceToMethod(
51-
isConstructor().and(takesArgument(0, Properties.class)),
48+
isConstructor().and(takesArgument(0, Map.class).or(takesArgument(0, Properties.class))),
5249
this.getClass().getName() + "$ConstructorAdvice");
5350

5451
transformer.applyAdviceToMethod(
@@ -67,23 +64,15 @@ public static class ConstructorAdvice {
6764
public static void onExit(
6865
@Advice.This Consumer<?, ?> consumer, @Advice.Argument(0) Object configs) {
6966

70-
String bootstrapServers = null;
67+
Object bootstrapServersConfig = null;
7168
if (configs instanceof Map) {
72-
Object servers = ((Map<?, ?>) configs).get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
73-
if (servers != null) {
74-
bootstrapServers = servers.toString();
75-
}
76-
} else if (configs instanceof Properties) {
77-
bootstrapServers =
78-
((Properties) configs).getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
69+
bootstrapServersConfig = ((Map<?, ?>) configs).get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
7970
}
8071

81-
if (bootstrapServers != null) {
82-
VirtualField<Consumer<?, ?>, String> consumerStringVirtualField =
83-
VirtualField.find(Consumer.class, String.class);
84-
if (consumerStringVirtualField.get(consumer) == null) {
85-
consumerStringVirtualField.set(consumer, bootstrapServers);
86-
}
72+
if (bootstrapServersConfig != null
73+
&& KafkaSingletons.CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(consumer) == null) {
74+
List<String> bootstrapServers = KafkaUtil.parseBootstrapServers(bootstrapServersConfig);
75+
KafkaSingletons.CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.set(consumer, bootstrapServers);
8776
}
8877
}
8978
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414

1515
import io.opentelemetry.context.Context;
1616
import io.opentelemetry.context.Scope;
17-
import io.opentelemetry.instrumentation.api.util.VirtualField;
1817
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
1918
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation;
19+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
2020
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
2121
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2222
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
23+
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Properties;
2526
import net.bytebuddy.asm.Advice;
@@ -41,11 +42,7 @@ public ElementMatcher<TypeDescription> typeMatcher() {
4142
@Override
4243
public void transform(TypeTransformer transformer) {
4344
transformer.applyAdviceToMethod(
44-
isConstructor().and(takesArgument(0, Map.class)),
45-
this.getClass().getName() + "$ConstructorAdvice");
46-
47-
transformer.applyAdviceToMethod(
48-
isConstructor().and(takesArgument(0, Properties.class)),
45+
isConstructor().and(takesArgument(0, Map.class).or(takesArgument(0, Properties.class))),
4946
this.getClass().getName() + "$ConstructorAdvice");
5047

5148
transformer.applyAdviceToMethod(
@@ -64,23 +61,15 @@ public static class ConstructorAdvice {
6461
public static void onExit(
6562
@Advice.This Producer<?, ?> producer, @Advice.Argument(0) Object configs) {
6663

67-
String bootstrapServers = null;
64+
Object bootstrapServersConfig = null;
6865
if (configs instanceof Map) {
69-
Object servers = ((Map<?, ?>) configs).get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
70-
if (servers != null) {
71-
bootstrapServers = servers.toString();
72-
}
73-
} else if (configs instanceof Properties) {
74-
bootstrapServers =
75-
((Properties) configs).getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
66+
bootstrapServersConfig = ((Map<?, ?>) configs).get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
7667
}
7768

78-
if (bootstrapServers != null) {
79-
VirtualField<Producer<?, ?>, String> producerStringVirtualField =
80-
VirtualField.find(Producer.class, String.class);
81-
if (producerStringVirtualField.get(producer) == null) {
82-
producerStringVirtualField.set(producer, bootstrapServers);
83-
}
69+
if (bootstrapServersConfig != null
70+
&& KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(producer) == null) {
71+
List<String> bootstrapServers = KafkaUtil.parseBootstrapServers(bootstrapServersConfig);
72+
KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.set(producer, bootstrapServers);
8473
}
8574
}
8675
}
@@ -98,7 +87,8 @@ public static KafkaProducerRequest onEnter(
9887
@Advice.Local("otelContext") Context context,
9988
@Advice.Local("otelScope") Scope scope) {
10089

101-
String bootstrapServers = VirtualField.find(Producer.class, String.class).get(producer);
90+
List<String> bootstrapServers =
91+
KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(producer);
10292
KafkaProducerRequest request =
10393
KafkaProducerRequest.create(record, clientId, bootstrapServers);
10494
Context parentContext = Java8BytecodeBridge.currentContext();

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
99
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
10+
import io.opentelemetry.instrumentation.api.util.VirtualField;
1011
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaInstrumenterFactory;
1112
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
1213
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
1314
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
1415
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;
1516
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
17+
import java.util.List;
18+
import org.apache.kafka.clients.consumer.Consumer;
19+
import org.apache.kafka.clients.producer.Producer;
1620
import org.apache.kafka.clients.producer.RecordMetadata;
1721

1822
public final class KafkaSingletons {
@@ -26,6 +30,11 @@ public final class KafkaSingletons {
2630
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
2731
private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER;
2832

33+
public static final VirtualField<Consumer<?, ?>, List<String>>
34+
CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD = VirtualField.find(Consumer.class, List.class);
35+
public static final VirtualField<Producer<?, ?>, List<String>>
36+
PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD = VirtualField.find(Producer.class, List.class);
37+
2938
static {
3039
KafkaInstrumenterFactory instrumenterFactory =
3140
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)

instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ configurations:
44
type: boolean
55
default: true
66
- name: otel.instrumentation.kafka.experimental-span-attributes
7-
description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms"
7+
description: Enables the capture of the experimental consumer attributes "kafka.record.queue_time_ms" and "messaging.kafka.bootstrap.servers"
88
type: boolean
99
default: false

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public abstract class KafkaClientBaseTest {
6666
protected static final String SHARED_TOPIC = "shared.topic";
6767
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
6868
AttributeKey.stringKey("messaging.client_id");
69-
protected static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
70-
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");
69+
protected static final AttributeKey<List<String>> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
70+
AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers");
7171

7272
private KafkaContainer kafka;
7373
protected Producer<Integer, String> producer;
@@ -179,7 +179,6 @@ protected static List<AttributeAssertion> sendAttributes(
179179
equalTo(MESSAGING_SYSTEM, "kafka"),
180180
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
181181
equalTo(MESSAGING_OPERATION, "publish"),
182-
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
183182
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
184183
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
185184
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
@@ -189,6 +188,17 @@ protected static List<AttributeAssertion> sendAttributes(
189188
if (messageValue == null) {
190189
assertions.add(equalTo(MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true));
191190
}
191+
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
192+
assertions.add(
193+
satisfies(
194+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
195+
listAssert ->
196+
listAssert
197+
.isNotEmpty()
198+
.allSatisfy(
199+
server ->
200+
org.assertj.core.api.Assertions.assertThat(server).isNotEmpty())));
201+
}
192202
if (testHeaders) {
193203
assertions.add(
194204
equalTo(
@@ -206,13 +216,23 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
206216
equalTo(MESSAGING_SYSTEM, "kafka"),
207217
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
208218
equalTo(MESSAGING_OPERATION, "receive"),
209-
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
210219
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
211220
satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive)));
212221
// consumer group is not available in version 0.11
213222
if (Boolean.getBoolean("testLatestDeps")) {
214223
assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
215224
}
225+
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
226+
assertions.add(
227+
satisfies(
228+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
229+
listAssert ->
230+
listAssert
231+
.isNotEmpty()
232+
.allSatisfy(
233+
server ->
234+
org.assertj.core.api.Assertions.assertThat(server).isNotEmpty())));
235+
}
216236
if (testHeaders) {
217237
assertions.add(
218238
equalTo(
@@ -231,7 +251,6 @@ protected static List<AttributeAssertion> processAttributes(
231251
equalTo(MESSAGING_SYSTEM, "kafka"),
232252
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
233253
equalTo(MESSAGING_OPERATION, "process"),
234-
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
235254
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
236255
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
237256
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
@@ -242,6 +261,17 @@ protected static List<AttributeAssertion> processAttributes(
242261
if (Boolean.getBoolean("testLatestDeps")) {
243262
assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
244263
}
264+
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
265+
assertions.add(
266+
satisfies(
267+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
268+
listAssert ->
269+
listAssert
270+
.isNotEmpty()
271+
.allSatisfy(
272+
server ->
273+
org.assertj.core.api.Assertions.assertThat(server).isNotEmpty())));
274+
}
245275
if (messageKey != null) {
246276
assertions.add(equalTo(MESSAGING_KAFKA_MESSAGE_KEY, messageKey));
247277
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ <K, V> ConsumerRecords<K, V> addTracing(
213213
* @param record the producer record to inject span info.
214214
*/
215215
<K, V> void buildAndInjectSpan(
216-
ProducerRecord<K, V> record, String clientId, String bootstrapServers) {
216+
ProducerRecord<K, V> record, String clientId, List<String> bootstrapServers) {
217217
Context parentContext = Context.current();
218218

219219
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId, bootstrapServers);
@@ -274,7 +274,7 @@ <K, V> Context buildAndFinishSpan(
274274
ConsumerRecords<K, V> records,
275275
String consumerGroup,
276276
String clientId,
277-
String bootstrapServers,
277+
List<String> bootstrapServers,
278278
Timer timer) {
279279
if (records.isEmpty()) {
280280
return null;

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import io.opentelemetry.instrumentation.api.internal.Timer;
1313
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
1414
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
15+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
16+
import java.util.List;
1517
import java.util.Map;
1618
import java.util.Objects;
1719
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -34,7 +36,7 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
3436
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
3537
.build();
3638

37-
private String bootstrapServers;
39+
private List<String> bootstrapServers;
3840
private String consumerGroup;
3941
private String clientId;
4042

@@ -61,7 +63,8 @@ public void close() {}
6163

6264
@Override
6365
public void configure(Map<String, ?> configs) {
64-
bootstrapServers = Objects.toString(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), null);
66+
bootstrapServers =
67+
KafkaUtil.parseBootstrapServers(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
6568
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6669
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6770

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import com.google.errorprone.annotations.CanIgnoreReturnValue;
99
import io.opentelemetry.api.GlobalOpenTelemetry;
10+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
11+
import java.util.List;
1012
import java.util.Map;
1113
import java.util.Objects;
1214
import javax.annotation.Nullable;
@@ -24,7 +26,7 @@ public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K,
2426

2527
private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
2628

27-
@Nullable private String bootstrapServers;
29+
@Nullable private List<String> bootstrapServers;
2830

2931
@Nullable private String clientId;
3032

@@ -44,7 +46,8 @@ public void close() {}
4446
@Override
4547
public void configure(Map<String, ?> map) {
4648
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
47-
bootstrapServers = Objects.toString(map.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), null);
49+
bootstrapServers =
50+
KafkaUtil.parseBootstrapServers(map.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
4851

4952
// TODO: support experimental attributes config
5053
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ void assertTraces() {
3838
equalTo(MESSAGING_SYSTEM, "kafka"),
3939
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
4040
equalTo(MESSAGING_OPERATION, "publish"),
41-
satisfies(
42-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
43-
AbstractStringAssert::isNotEmpty),
4441
satisfies(
4542
MESSAGING_CLIENT_ID,
4643
stringAssert -> stringAssert.startsWith("producer"))),
@@ -55,9 +52,6 @@ void assertTraces() {
5552
equalTo(
5653
MESSAGING_MESSAGE_BODY_SIZE,
5754
greeting.getBytes(StandardCharsets.UTF_8).length),
58-
satisfies(
59-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
60-
AbstractStringAssert::isNotEmpty),
6155
satisfies(
6256
MESSAGING_DESTINATION_PARTITION_ID,
6357
AbstractStringAssert::isNotEmpty),

0 commit comments

Comments
 (0)