Skip to content

Commit 7703b1d

Browse files
Copilottrask
andcommitted
Create new OpenTelemetry interceptors, keep old Tracing ones for backwards compatibility, move KafkaTelemetrySupplier
Co-authored-by: trask <[email protected]>
1 parent f1e7dde commit 7703b1d

File tree

8 files changed

+219
-106
lines changed

8 files changed

+219
-106
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,20 @@ Consumer<String, String> consumer = new KafkaConsumer<>(props);
7070
If you don't explicitly configure the interceptors with a `KafkaTelemetry` instance, they will fall back to using
7171
`GlobalOpenTelemetry.get()` and system properties for configuration.
7272

73-
Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.
73+
Use the `OpenTelemetryProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.
7474

7575
```java
76-
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
76+
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryProducerInterceptor.class.getName());
7777
```
7878

79-
Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.
79+
Use the `OpenTelemetryConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.
8080

8181
```java
82-
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
82+
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryConsumerInterceptor.class.getName());
8383
```
8484

85+
Note: The `TracingProducerInterceptor` and `TracingConsumerInterceptor` classes are still available for backwards compatibility, but new code should use the `OpenTelemetry*` variants.
86+
8587
The interceptors will use the following system properties for configuration:
8688
- `otel.instrumentation.messaging.experimental.receive-telemetry.enabled` - Enable receive telemetry (default: false)
8789
- `otel.instrumentation.messaging.experimental.capture-headers` - List of headers to capture as span attributes

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
2222
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
2323
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
24-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier;
2524
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
2625
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.MetricsReporterList;
2726
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
@@ -211,9 +210,9 @@ <K, V> ConsumerRecords<K, V> addTracing(
211210
}
212211

213212
/**
214-
* Returns configuration properties that can be used to enable tracing via {@code
215-
* TracingProducerInterceptor}. Add these resulting properties to the configuration map used to
216-
* initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}.
213+
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
214+
* {@code OpenTelemetryProducerInterceptor}. Add these resulting properties to the configuration
215+
* map used to initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}.
217216
*
218217
* <p>Example usage:
219218
*
@@ -230,17 +229,18 @@ <K, V> ConsumerRecords<K, V> addTracing(
230229
public Map<String, ?> producerInterceptorConfigProperties() {
231230
Map<String, Object> config = new HashMap<>();
232231
config.put(
233-
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
232+
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
233+
OpenTelemetryProducerInterceptor.class.getName());
234234
config.put(
235-
TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
235+
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
236236
new KafkaTelemetrySupplier(this));
237237
return Collections.unmodifiableMap(config);
238238
}
239239

240240
/**
241-
* Returns configuration properties that can be used to enable tracing via {@code
242-
* TracingConsumerInterceptor}. Add these resulting properties to the configuration map used to
243-
* initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
241+
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
242+
* {@code OpenTelemetryConsumerInterceptor}. Add these resulting properties to the configuration
243+
* map used to initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
244244
*
245245
* <p>Example usage:
246246
*
@@ -257,9 +257,10 @@ <K, V> ConsumerRecords<K, V> addTracing(
257257
public Map<String, ?> consumerInterceptorConfigProperties() {
258258
Map<String, Object> config = new HashMap<>();
259259
config.put(
260-
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
260+
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
261+
OpenTelemetryConsumerInterceptor.class.getName());
261262
config.put(
262-
TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
263+
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
263264
new KafkaTelemetrySupplier(this));
264265
return Collections.unmodifiableMap(config);
265266
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

88
import java.io.Serializable;
99
import java.util.Objects;
@@ -16,11 +16,11 @@
1616
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
1717
* at any time.
1818
*/
19-
public final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
19+
final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
2020
private static final long serialVersionUID = 1L;
2121
private final transient Object kafkaTelemetry;
2222

23-
public KafkaTelemetrySupplier(Object kafkaTelemetry) {
23+
KafkaTelemetrySupplier(Object kafkaTelemetry) {
2424
Objects.requireNonNull(kafkaTelemetry);
2525
this.kafkaTelemetry = kafkaTelemetry;
2626
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9+
import io.opentelemetry.context.Context;
10+
import io.opentelemetry.instrumentation.api.internal.Timer;
11+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
12+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
13+
import java.util.Map;
14+
import java.util.Objects;
15+
import java.util.function.Supplier;
16+
import javax.annotation.Nullable;
17+
import org.apache.kafka.clients.consumer.ConsumerConfig;
18+
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
19+
import org.apache.kafka.clients.consumer.ConsumerRecords;
20+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
/**
24+
* A ConsumerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name
25+
* or class via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to
26+
* get it instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc.
27+
*
28+
* <p>To configure the interceptor, use {@link KafkaTelemetry#consumerInterceptorConfigProperties}
29+
* to obtain the configuration properties and add them to your consumer configuration.
30+
*
31+
* @see KafkaTelemetry#consumerInterceptorConfigProperties()
32+
*/
33+
public class OpenTelemetryConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
34+
35+
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
36+
"opentelemetry.kafka-telemetry.supplier";
37+
38+
@Nullable private KafkaTelemetry telemetry;
39+
private String consumerGroup;
40+
private String clientId;
41+
42+
@Override
43+
@CanIgnoreReturnValue
44+
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
45+
if (telemetry == null) {
46+
return records;
47+
}
48+
// timer should be started before fetching ConsumerRecords, but there is no callback for that
49+
Timer timer = Timer.start();
50+
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
51+
if (receiveContext == null) {
52+
receiveContext = Context.current();
53+
}
54+
KafkaConsumerContext consumerContext =
55+
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
56+
return telemetry.addTracing(records, consumerContext);
57+
}
58+
59+
@Override
60+
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
61+
62+
@Override
63+
public void close() {}
64+
65+
@Override
66+
public void configure(Map<String, ?> configs) {
67+
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
68+
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
69+
70+
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
71+
if (telemetrySupplier == null) {
72+
return;
73+
}
74+
75+
if (!(telemetrySupplier instanceof Supplier)) {
76+
throw new IllegalStateException(
77+
"Configuration property "
78+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
79+
+ " is not instance of Supplier");
80+
}
81+
82+
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
83+
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
84+
throw new IllegalStateException(
85+
"Configuration property "
86+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
87+
+ " supplier does not return KafkaTelemetry instance");
88+
}
89+
90+
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9+
import java.util.Map;
10+
import java.util.Objects;
11+
import java.util.function.Supplier;
12+
import javax.annotation.Nullable;
13+
import org.apache.kafka.clients.producer.ProducerConfig;
14+
import org.apache.kafka.clients.producer.ProducerInterceptor;
15+
import org.apache.kafka.clients.producer.ProducerRecord;
16+
import org.apache.kafka.clients.producer.RecordMetadata;
17+
18+
/**
19+
* A ProducerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name
20+
* or class via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to
21+
* get it instantiated and used. See more details on ProducerInterceptor usage in its Javadoc.
22+
*
23+
* <p>To configure the interceptor, use {@link KafkaTelemetry#producerInterceptorConfigProperties}
24+
* to obtain the configuration properties and add them to your producer configuration.
25+
*
26+
* @see KafkaTelemetry#producerInterceptorConfigProperties()
27+
*/
28+
public class OpenTelemetryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
29+
30+
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
31+
"opentelemetry.kafka-telemetry.supplier";
32+
33+
@Nullable private KafkaTelemetry telemetry;
34+
@Nullable private String clientId;
35+
36+
@Override
37+
@CanIgnoreReturnValue
38+
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
39+
if (telemetry != null) {
40+
telemetry.buildAndInjectSpan(producerRecord, clientId);
41+
}
42+
return producerRecord;
43+
}
44+
45+
@Override
46+
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}
47+
48+
@Override
49+
public void close() {}
50+
51+
@Override
52+
public void configure(Map<String, ?> configs) {
53+
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);
54+
55+
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
56+
if (telemetrySupplier == null) {
57+
return;
58+
}
59+
60+
if (!(telemetrySupplier instanceof Supplier)) {
61+
throw new IllegalStateException(
62+
"Configuration property "
63+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
64+
+ " is not instance of Supplier");
65+
}
66+
67+
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
68+
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
69+
throw new IllegalStateException(
70+
"Configuration property "
71+
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
72+
+ " supplier does not return KafkaTelemetry instance");
73+
}
74+
75+
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
76+
}
77+
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 10 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
1717
import java.util.Map;
1818
import java.util.Objects;
19-
import java.util.function.Supplier;
20-
import javax.annotation.Nullable;
2119
import org.apache.kafka.clients.consumer.ConsumerConfig;
2220
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
2321
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -31,19 +29,22 @@
3129
*/
3230
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
3331

34-
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
35-
"opentelemetry.kafka-telemetry.supplier";
32+
private static final KafkaTelemetry telemetry =
33+
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
34+
.setMessagingReceiveInstrumentationEnabled(
35+
ConfigPropertiesUtil.getBoolean(
36+
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
37+
.setCapturedHeaders(
38+
ConfigPropertiesUtil.getList(
39+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
40+
.build();
3641

37-
@Nullable private KafkaTelemetry telemetry;
3842
private String consumerGroup;
3943
private String clientId;
4044

4145
@Override
4246
@CanIgnoreReturnValue
4347
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
44-
if (telemetry == null) {
45-
return records;
46-
}
4748
// timer should be started before fetching ConsumerRecords, but there is no callback for that
4849
Timer timer = Timer.start();
4950
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
@@ -66,38 +67,6 @@ public void configure(Map<String, ?> configs) {
6667
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6768
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6869

69-
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
70-
if (telemetrySupplier == null) {
71-
// Fallback to GlobalOpenTelemetry if not configured
72-
this.telemetry =
73-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
74-
// TODO: remove now that programmatic configuration is available
75-
.setMessagingReceiveInstrumentationEnabled(
76-
ConfigPropertiesUtil.getBoolean(
77-
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled",
78-
false))
79-
.setCapturedHeaders(
80-
ConfigPropertiesUtil.getList(
81-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
82-
.build();
83-
return;
84-
}
85-
86-
if (!(telemetrySupplier instanceof Supplier)) {
87-
throw new IllegalStateException(
88-
"Configuration property "
89-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
90-
+ " is not instance of Supplier");
91-
}
92-
93-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
94-
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
95-
throw new IllegalStateException(
96-
"Configuration property "
97-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
98-
+ " supplier does not return KafkaTelemetry instance");
99-
}
100-
101-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
70+
// TODO: support experimental attributes config
10271
}
10372
}

0 commit comments

Comments
 (0)