Skip to content

Commit 5f6294b

Browse files
Copilottrask
andcommitted
Use OpenTelemetrySupplier instead of KafkaTelemetrySupplier
Co-authored-by: trask <[email protected]>
1 parent 85ec089 commit 5f6294b

File tree

5 files changed

+60
-122
lines changed

5 files changed

+60
-122
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,42 +35,32 @@ The Kafka clients API provides a way to "intercept" messages before they are sen
3535
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
3636
The interceptor class has to be set in the properties bag used to create the Kafka client.
3737

38-
##### Recommended approach: Configuring interceptors with KafkaTelemetry
38+
##### Recommended approach: Configuring interceptors with OpenTelemetry
3939

40-
The recommended way to use interceptors is to configure them with a `KafkaTelemetry` instance.
41-
This gives you full control over the configuration, including which `OpenTelemetry` instance to use,
42-
whether to enable receive telemetry, and which headers to capture.
40+
The recommended way to use interceptors is to configure them with an `OpenTelemetry` instance.
41+
Interceptors will use system properties for additional configuration like captured headers and receive telemetry settings.
4342

44-
For the producer, configure the interceptor with the KafkaTelemetry instance:
43+
For the producer:
4544

4645
```java
47-
KafkaTelemetry telemetry = KafkaTelemetry.builder(openTelemetry)
48-
.setCapturedHeaders(Arrays.asList("custom-header"))
49-
.build();
50-
5146
Map<String, Object> props = new HashMap<>();
5247
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
5348
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
54-
props.put(TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
55-
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier(telemetry));
49+
props.put(TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
50+
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier(openTelemetry));
5651

5752
Producer<String, String> producer = new KafkaProducer<>(props);
5853
```
5954

60-
For the consumer, configure the interceptor with the KafkaTelemetry instance:
55+
For the consumer:
6156

6257
```java
63-
KafkaTelemetry telemetry = KafkaTelemetry.builder(openTelemetry)
64-
.setMessagingReceiveInstrumentationEnabled(true)
65-
.setCapturedHeaders(Arrays.asList("custom-header"))
66-
.build();
67-
6858
Map<String, Object> props = new HashMap<>();
6959
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
7060
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
7161
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
72-
props.put(TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
73-
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier(telemetry));
62+
props.put(TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
63+
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier(openTelemetry));
7464

7565
Consumer<String, String> consumer = new KafkaConsumer<>(props);
7666
```

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99

1010
import com.google.errorprone.annotations.CanIgnoreReturnValue;
1111
import io.opentelemetry.api.GlobalOpenTelemetry;
12+
import io.opentelemetry.api.OpenTelemetry;
1213
import io.opentelemetry.context.Context;
1314
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
1415
import io.opentelemetry.instrumentation.api.internal.Timer;
1516
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
1617
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
18+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
1719
import java.util.Map;
1820
import java.util.Objects;
19-
import java.util.function.Supplier;
2021
import javax.annotation.Nullable;
2122
import org.apache.kafka.clients.consumer.ConsumerConfig;
2223
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
@@ -31,8 +32,7 @@
3132
*/
3233
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
3334

34-
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
35-
"opentelemetry.experimental.kafka-telemetry.supplier";
35+
public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier";
3636

3737
@Nullable private KafkaTelemetry telemetry;
3838
private String consumerGroup;
@@ -66,37 +66,29 @@ public void configure(Map<String, ?> configs) {
6666
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6767
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6868

69-
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
70-
if (telemetrySupplier == null) {
69+
OpenTelemetry openTelemetry;
70+
Object openTelemetrySupplier = configs.get(CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
71+
if (openTelemetrySupplier == null) {
7172
// Fallback to GlobalOpenTelemetry if not configured
72-
this.telemetry =
73-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
74-
.setMessagingReceiveInstrumentationEnabled(
75-
ConfigPropertiesUtil.getBoolean(
76-
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled",
77-
false))
78-
.setCapturedHeaders(
79-
ConfigPropertiesUtil.getList(
80-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
81-
.build();
82-
return;
73+
openTelemetry = GlobalOpenTelemetry.get();
74+
} else {
75+
if (!(openTelemetrySupplier instanceof OpenTelemetrySupplier)) {
76+
throw new IllegalStateException(
77+
"Configuration property "
78+
+ CONFIG_KEY_OPENTELEMETRY_SUPPLIER
79+
+ " is not instance of OpenTelemetrySupplier");
80+
}
81+
openTelemetry = ((OpenTelemetrySupplier) openTelemetrySupplier).get();
8382
}
8483

85-
if (!(telemetrySupplier instanceof Supplier)) {
86-
throw new IllegalStateException(
87-
"Configuration property "
88-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
89-
+ " is not instance of Supplier");
90-
}
91-
92-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
93-
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
94-
throw new IllegalStateException(
95-
"Configuration property "
96-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
97-
+ " supplier does not return KafkaTelemetry instance");
98-
}
99-
100-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
84+
this.telemetry =
85+
KafkaTelemetry.builder(openTelemetry)
86+
.setMessagingReceiveInstrumentationEnabled(
87+
ConfigPropertiesUtil.getBoolean(
88+
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
89+
.setCapturedHeaders(
90+
ConfigPropertiesUtil.getList(
91+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
92+
.build();
10193
}
10294
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99

1010
import com.google.errorprone.annotations.CanIgnoreReturnValue;
1111
import io.opentelemetry.api.GlobalOpenTelemetry;
12+
import io.opentelemetry.api.OpenTelemetry;
1213
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
14+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
1315
import java.util.Map;
1416
import java.util.Objects;
15-
import java.util.function.Supplier;
1617
import javax.annotation.Nullable;
1718
import org.apache.kafka.clients.producer.ProducerConfig;
1819
import org.apache.kafka.clients.producer.ProducerInterceptor;
@@ -26,8 +27,7 @@
2627
*/
2728
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
2829

29-
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
30-
"opentelemetry.experimental.kafka-telemetry.supplier";
30+
public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier";
3131

3232
@Nullable private KafkaTelemetry telemetry;
3333
@Nullable private String clientId;
@@ -51,33 +51,26 @@ public void close() {}
5151
public void configure(Map<String, ?> configs) {
5252
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);
5353

54-
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
55-
if (telemetrySupplier == null) {
54+
OpenTelemetry openTelemetry;
55+
Object openTelemetrySupplier = configs.get(CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
56+
if (openTelemetrySupplier == null) {
5657
// Fallback to GlobalOpenTelemetry if not configured
57-
this.telemetry =
58-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
59-
.setCapturedHeaders(
60-
ConfigPropertiesUtil.getList(
61-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
62-
.build();
63-
return;
58+
openTelemetry = GlobalOpenTelemetry.get();
59+
} else {
60+
if (!(openTelemetrySupplier instanceof OpenTelemetrySupplier)) {
61+
throw new IllegalStateException(
62+
"Configuration property "
63+
+ CONFIG_KEY_OPENTELEMETRY_SUPPLIER
64+
+ " is not instance of OpenTelemetrySupplier");
65+
}
66+
openTelemetry = ((OpenTelemetrySupplier) openTelemetrySupplier).get();
6467
}
6568

66-
if (!(telemetrySupplier instanceof Supplier)) {
67-
throw new IllegalStateException(
68-
"Configuration property "
69-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
70-
+ " is not instance of Supplier");
71-
}
72-
73-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
74-
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
75-
throw new IllegalStateException(
76-
"Configuration property "
77-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
78-
+ " supplier does not return KafkaTelemetry instance");
79-
}
80-
81-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
69+
this.telemetry =
70+
KafkaTelemetry.builder(openTelemetry)
71+
.setCapturedHeaders(
72+
ConfigPropertiesUtil.getList(
73+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
74+
.build();
8275
}
8376
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public Map<String, Object> producerProps() {
4646
props.put(
4747
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
4848
props.put(
49-
TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
49+
TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
5050
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
51-
.KafkaTelemetrySupplier(
52-
kafkaTelemetry));
51+
.OpenTelemetrySupplier(
52+
testing.getOpenTelemetry()));
5353
return props;
5454
}
5555

@@ -62,10 +62,10 @@ public Map<String, Object> consumerProps() {
6262
props.put(
6363
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
6464
props.put(
65-
TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
65+
TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
6666
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
67-
.KafkaTelemetrySupplier(
68-
kafkaTelemetry));
67+
.OpenTelemetrySupplier(
68+
testing.getOpenTelemetry()));
6969
return props;
7070
}
7171

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaTelemetrySupplier.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)