Skip to content

Commit f5f0070

Browse files
committed
feat(kafka-clients): add messaging.kafka.bootstrap.servers attribute
- Add VirtualFieldStore for managing bootstrap servers storage - Instrument KafkaConsumer and KafkaProducer constructors - Include bootstrap.servers attribute in all relevant spans Addresses part of #14031 #10647
1 parent 70588cd commit f5f0070

File tree

17 files changed

+250
-41
lines changed

17 files changed

+250
-41
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
99
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter;
10+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
1011
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1112
import static net.bytebuddy.matcher.ElementMatchers.named;
1213
import static net.bytebuddy.matcher.ElementMatchers.returns;
@@ -16,16 +17,20 @@
1617
import io.opentelemetry.context.Context;
1718
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
1819
import io.opentelemetry.instrumentation.api.internal.Timer;
20+
import io.opentelemetry.instrumentation.api.util.VirtualField;
1921
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
2022
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
2123
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
2224
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2325
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2426
import java.time.Duration;
27+
import java.util.Map;
28+
import java.util.Properties;
2529
import net.bytebuddy.asm.Advice;
2630
import net.bytebuddy.description.type.TypeDescription;
2731
import net.bytebuddy.matcher.ElementMatcher;
2832
import org.apache.kafka.clients.consumer.Consumer;
33+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2934
import org.apache.kafka.clients.consumer.ConsumerRecord;
3035
import org.apache.kafka.clients.consumer.ConsumerRecords;
3136

@@ -38,6 +43,14 @@ public ElementMatcher<TypeDescription> typeMatcher() {
3843

3944
@Override
4045
public void transform(TypeTransformer transformer) {
46+
transformer.applyAdviceToMethod(
47+
isConstructor().and(takesArgument(0, Map.class)),
48+
this.getClass().getName() + "$ConstructorAdvice");
49+
50+
transformer.applyAdviceToMethod(
51+
isConstructor().and(takesArgument(0, Properties.class)),
52+
this.getClass().getName() + "$ConstructorAdvice");
53+
4154
transformer.applyAdviceToMethod(
4255
named("poll")
4356
.and(isPublic())
@@ -47,6 +60,34 @@ public void transform(TypeTransformer transformer) {
4760
this.getClass().getName() + "$PollAdvice");
4861
}
4962

63+
@SuppressWarnings("unused")
64+
public static class ConstructorAdvice {
65+
66+
@Advice.OnMethodExit(suppress = Throwable.class)
67+
public static void onExit(
68+
@Advice.This Consumer<?, ?> consumer, @Advice.Argument(0) Object configs) {
69+
70+
String bootstrapServers = null;
71+
if (configs instanceof Map) {
72+
Object servers = ((Map<?, ?>) configs).get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
73+
if (servers != null) {
74+
bootstrapServers = servers.toString();
75+
}
76+
} else if (configs instanceof Properties) {
77+
bootstrapServers =
78+
((Properties) configs).getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
79+
}
80+
81+
if (bootstrapServers != null) {
82+
VirtualField<Consumer<?, ?>, String> consumerStringVirtualField =
83+
VirtualField.find(Consumer.class, String.class);
84+
if (consumerStringVirtualField.get(consumer) == null) {
85+
consumerStringVirtualField.set(consumer, bootstrapServers);
86+
}
87+
}
88+
}
89+
}
90+
5091
@SuppressWarnings("unused")
5192
public static class PollAdvice {
5293
@Advice.OnMethodEnter(suppress = Throwable.class)

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,29 @@
66
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
77

88
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter;
9+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
910
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
1011
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1112
import static net.bytebuddy.matcher.ElementMatchers.named;
1213
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1314

1415
import io.opentelemetry.context.Context;
1516
import io.opentelemetry.context.Scope;
17+
import io.opentelemetry.instrumentation.api.util.VirtualField;
1618
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
1719
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation;
1820
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
1921
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2022
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
23+
import java.util.Map;
24+
import java.util.Properties;
2125
import net.bytebuddy.asm.Advice;
2226
import net.bytebuddy.description.type.TypeDescription;
2327
import net.bytebuddy.matcher.ElementMatcher;
2428
import org.apache.kafka.clients.ApiVersions;
2529
import org.apache.kafka.clients.producer.Callback;
30+
import org.apache.kafka.clients.producer.Producer;
31+
import org.apache.kafka.clients.producer.ProducerConfig;
2632
import org.apache.kafka.clients.producer.ProducerRecord;
2733

2834
public class KafkaProducerInstrumentation implements TypeInstrumentation {
@@ -34,6 +40,14 @@ public ElementMatcher<TypeDescription> typeMatcher() {
3440

3541
@Override
3642
public void transform(TypeTransformer transformer) {
43+
transformer.applyAdviceToMethod(
44+
isConstructor().and(takesArgument(0, Map.class)),
45+
this.getClass().getName() + "$ConstructorAdvice");
46+
47+
transformer.applyAdviceToMethod(
48+
isConstructor().and(takesArgument(0, Properties.class)),
49+
this.getClass().getName() + "$ConstructorAdvice");
50+
3751
transformer.applyAdviceToMethod(
3852
isMethod()
3953
.and(isPublic())
@@ -43,19 +57,50 @@ public void transform(TypeTransformer transformer) {
4357
KafkaProducerInstrumentation.class.getName() + "$SendAdvice");
4458
}
4559

60+
@SuppressWarnings("unused")
61+
public static class ConstructorAdvice {
62+
63+
@Advice.OnMethodExit(suppress = Throwable.class)
64+
public static void onExit(
65+
@Advice.This Producer<?, ?> producer, @Advice.Argument(0) Object configs) {
66+
67+
String bootstrapServers = null;
68+
if (configs instanceof Map) {
69+
Object servers = ((Map<?, ?>) configs).get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
70+
if (servers != null) {
71+
bootstrapServers = servers.toString();
72+
}
73+
} else if (configs instanceof Properties) {
74+
bootstrapServers =
75+
((Properties) configs).getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
76+
}
77+
78+
if (bootstrapServers != null) {
79+
VirtualField<Producer<?, ?>, String> producerStringVirtualField =
80+
VirtualField.find(Producer.class, String.class);
81+
if (producerStringVirtualField.get(producer) == null) {
82+
producerStringVirtualField.set(producer, bootstrapServers);
83+
}
84+
}
85+
}
86+
}
87+
4688
@SuppressWarnings("unused")
4789
public static class SendAdvice {
4890

4991
@Advice.OnMethodEnter(suppress = Throwable.class)
5092
public static KafkaProducerRequest onEnter(
93+
@Advice.This Producer<?, ?> producer,
5194
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
5295
@Advice.FieldValue("clientId") String clientId,
5396
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
5497
@Advice.Argument(value = 1, readOnly = false) Callback callback,
5598
@Advice.Local("otelContext") Context context,
5699
@Advice.Local("otelScope") Scope scope) {
57100

58-
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
101+
String bootstrapServers = VirtualField.find(Producer.class, String.class).get(producer);
102+
KafkaProducerRequest request =
103+
KafkaProducerRequest.create(record, clientId, bootstrapServers);
59104
Context parentContext = Java8BytecodeBridge.currentContext();
60105
if (!producerInstrumenter().shouldStart(parentContext, request)) {
61106
return null;

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public abstract class KafkaClientBaseTest {
6666
protected static final String SHARED_TOPIC = "shared.topic";
6767
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
6868
AttributeKey.stringKey("messaging.client_id");
69+
protected static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
70+
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");
6971

7072
private KafkaContainer kafka;
7173
protected Producer<Integer, String> producer;
@@ -177,6 +179,7 @@ protected static List<AttributeAssertion> sendAttributes(
177179
equalTo(MESSAGING_SYSTEM, "kafka"),
178180
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
179181
equalTo(MESSAGING_OPERATION, "publish"),
182+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
180183
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
181184
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
182185
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
@@ -203,6 +206,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
203206
equalTo(MESSAGING_SYSTEM, "kafka"),
204207
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
205208
equalTo(MESSAGING_OPERATION, "receive"),
209+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
206210
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
207211
satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive)));
208212
// consumer group is not available in version 0.11
@@ -227,6 +231,7 @@ protected static List<AttributeAssertion> processAttributes(
227231
equalTo(MESSAGING_SYSTEM, "kafka"),
228232
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
229233
equalTo(MESSAGING_OPERATION, "process"),
234+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
230235
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
231236
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
232237
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,11 @@ <K, V> ConsumerRecords<K, V> addTracing(
212212
*
213213
* @param record the producer record to inject span info.
214214
*/
215-
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
215+
<K, V> void buildAndInjectSpan(
216+
ProducerRecord<K, V> record, String clientId, String bootstrapServers) {
216217
Context parentContext = Context.current();
217218

218-
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
219+
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId, bootstrapServers);
219220
if (!producerInstrumenter.shouldStart(parentContext, request)) {
220221
return;
221222
}
@@ -262,16 +263,25 @@ <K, V> Future<RecordMetadata> buildAndInjectSpan(
262263
private <K, V> Context buildAndFinishSpan(
263264
ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
264265
return buildAndFinishSpan(
265-
records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer);
266+
records,
267+
KafkaUtil.getConsumerGroup(consumer),
268+
KafkaUtil.getClientId(consumer),
269+
KafkaUtil.getBootstrapServers(consumer),
270+
timer);
266271
}
267272

268273
<K, V> Context buildAndFinishSpan(
269-
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
274+
ConsumerRecords<K, V> records,
275+
String consumerGroup,
276+
String clientId,
277+
String bootstrapServers,
278+
Timer timer) {
270279
if (records.isEmpty()) {
271280
return null;
272281
}
273282
Context parentContext = Context.current();
274-
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
283+
KafkaReceiveRequest request =
284+
KafkaReceiveRequest.create(records, consumerGroup, clientId, bootstrapServers);
275285
Context context = null;
276286
if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
277287
context =

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
3434
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
3535
.build();
3636

37+
private String bootstrapServers;
3738
private String consumerGroup;
3839
private String clientId;
3940

@@ -42,12 +43,13 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
4243
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
4344
// timer should be started before fetching ConsumerRecords, but there is no callback for that
4445
Timer timer = Timer.start();
45-
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
46+
Context receiveContext =
47+
telemetry.buildAndFinishSpan(records, consumerGroup, clientId, bootstrapServers, timer);
4648
if (receiveContext == null) {
4749
receiveContext = Context.current();
4850
}
4951
KafkaConsumerContext consumerContext =
50-
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
52+
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId, bootstrapServers);
5153
return telemetry.addTracing(records, consumerContext);
5254
}
5355

@@ -59,6 +61,7 @@ public void close() {}
5961

6062
@Override
6163
public void configure(Map<String, ?> configs) {
64+
bootstrapServers = Objects.toString(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), null);
6265
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6366
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6467

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K,
2424

2525
private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
2626

27+
@Nullable private String bootstrapServers;
28+
2729
@Nullable private String clientId;
2830

2931
@Override
3032
@CanIgnoreReturnValue
3133
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
32-
telemetry.buildAndInjectSpan(producerRecord, clientId);
34+
telemetry.buildAndInjectSpan(producerRecord, clientId, bootstrapServers);
3335
return producerRecord;
3436
}
3537

@@ -42,6 +44,7 @@ public void close() {}
4244
@Override
4345
public void configure(Map<String, ?> map) {
4446
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
47+
bootstrapServers = Objects.toString(map.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), null);
4548

4649
// TODO: support experimental attributes config
4750
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractKafkaConsumerRequest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ abstract class AbstractKafkaConsumerRequest {
1111

1212
@Nullable private final String consumerGroup;
1313
@Nullable private final String clientId;
14+
@Nullable private final String bootstrapServers;
1415

15-
AbstractKafkaConsumerRequest(String consumerGroup, String clientId) {
16+
AbstractKafkaConsumerRequest(String consumerGroup, String clientId, String bootstrapServers) {
1617
this.consumerGroup = consumerGroup;
1718
this.clientId = clientId;
19+
this.bootstrapServers = bootstrapServers;
1820
}
1921

2022
@Nullable
@@ -27,6 +29,11 @@ public String getClientId() {
2729
return clientId;
2830
}
2931

32+
@Nullable
33+
public String getBootstrapServers() {
34+
return bootstrapServers;
35+
}
36+
3037
@Nullable
3138
public String getConsumerId() {
3239
if (consumerGroup != null) {

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ public void extract(
3131
singleRecordLinkExtractor.extract(
3232
spanLinks,
3333
Context.root(),
34-
KafkaProcessRequest.create(record, request.getConsumerGroup(), request.getClientId()));
34+
KafkaProcessRequest.create(
35+
record,
36+
request.getConsumerGroup(),
37+
request.getClientId(),
38+
request.getBootstrapServers()));
3539
}
3640
}
3741
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesExtractor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ final class KafkaConsumerAttributesExtractor
2727
AttributeKey.longKey("messaging.kafka.message.offset");
2828
private static final AttributeKey<Boolean> MESSAGING_KAFKA_MESSAGE_TOMBSTONE =
2929
AttributeKey.booleanKey("messaging.kafka.message.tombstone");
30+
private static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
31+
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");
3032

3133
@Override
3234
public void onStart(
@@ -49,6 +51,11 @@ public void onStart(
4951
if (consumerGroup != null) {
5052
attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, consumerGroup);
5153
}
54+
55+
String bootstrapServers = request.getBootstrapServers();
56+
if (bootstrapServers != null) {
57+
attributes.put(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
58+
}
5259
}
5360

5461
private static boolean canSerialize(Class<?> keyClass) {

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerContext.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
public abstract class KafkaConsumerContext {
1818

1919
static KafkaConsumerContext create(
20-
@Nullable Context context, @Nullable String consumerGroup, @Nullable String clientId) {
21-
return new AutoValue_KafkaConsumerContext(context, consumerGroup, clientId);
20+
@Nullable Context context,
21+
@Nullable String consumerGroup,
22+
@Nullable String clientId,
23+
@Nullable String bootstrapServers) {
24+
return new AutoValue_KafkaConsumerContext(context, consumerGroup, clientId, bootstrapServers);
2225
}
2326

2427
@Nullable
@@ -29,4 +32,7 @@ static KafkaConsumerContext create(
2932

3033
@Nullable
3134
abstract String getClientId();
35+
36+
@Nullable
37+
abstract String getBootstrapServers();
3238
}

0 commit comments

Comments
 (0)