Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
2a012ae
Initial plan
Copilot Oct 3, 2025
34d68ec
Initial plan for refactoring kafka interceptors
Copilot Oct 3, 2025
3b68915
Refactor kafka interceptors to be configurable
Copilot Oct 3, 2025
ce0eafc
Update README with interceptor configuration documentation
Copilot Oct 3, 2025
b187791
Follow OpenTelemetryMetricsReporter pattern for property validation
Copilot Oct 3, 2025
85ec089
Avoid changing KafkaTelemetry - move config keys to interceptors
Copilot Oct 3, 2025
5f6294b
Use OpenTelemetrySupplier instead of KafkaTelemetrySupplier
Copilot Oct 3, 2025
e5ca107
Clean up unused code and use imports instead of FQCN
Copilot Oct 3, 2025
e791fc8
Add helper methods to KafkaTelemetry for interceptor config
Copilot Oct 3, 2025
97cbf9d
Include INTERCEPTOR_CLASSES_CONFIG in helper methods
Copilot Oct 3, 2025
4f3d30d
Use KafkaTelemetrySupplier to pass exact KafkaTelemetry instance
Copilot Oct 3, 2025
a31210c
Remove experimental from config key, simplify fallback, add test
Copilot Oct 3, 2025
84e69ad
Refactor kafka-clients interceptors to be configurable
Copilot Oct 3, 2025
5419f03
Restore system property checks in fallback with TODO comment
Copilot Oct 3, 2025
f4e568a
Use import instead of FQCN for Supplier in test
Copilot Oct 3, 2025
82248c4
Apply suggestions from code review
trask Oct 3, 2025
f1e7dde
./gradlew spotlessApply
otelbot[bot] Oct 3, 2025
7703b1d
Create new OpenTelemetry interceptors, keep old Tracing ones for back…
Copilot Oct 8, 2025
95d1723
Mark TracingConsumerInterceptor and TracingProducerInterceptor as dep…
Copilot Oct 8, 2025
3fdd463
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
f72ae47
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
2bc5492
Test deprecated classes until they're removed
trask Oct 9, 2025
f24a0a4
fix
trask Oct 9, 2025
45c7510
fix
trask Oct 9, 2025
85aaaaa
fix
trask Oct 9, 2025
9b9b075
./gradlew spotlessApply
otelbot[bot] Oct 9, 2025
6aa6306
fix
trask Oct 9, 2025
a541ee6
fix
trask Oct 9, 2025
06184d1
docs
trask Oct 9, 2025
30bc6dd
internal
trask Oct 10, 2025
b207b25
simplify
trask Oct 10, 2025
a2b44cc
Apply suggestions from code review
trask Oct 10, 2025
d861b88
KafkaHelper
trask Oct 10, 2025
3c4f97f
fixes
trask Oct 10, 2025
ff96054
split
trask Oct 10, 2025
61813b5
split2
trask Oct 10, 2025
41c767c
split test
trask Oct 10, 2025
810c222
more
trask Oct 10, 2025
a2286d8
up
trask Oct 10, 2025
536c1cd
up
trask Oct 10, 2025
bbd21cb
fix
trask Oct 10, 2025
385bc15
up
trask Oct 10, 2025
84c9212
up
trask Oct 10, 2025
686de0c
up
trask Oct 10, 2025
eb7ecf9
up
trask Oct 10, 2025
3fe741b
up
trask Oct 10, 2025
4f58c8d
revert
trask Oct 11, 2025
45c3e11
more
trask Oct 11, 2025
96bc1c9
./gradlew spotlessApply
otelbot[bot] Oct 11, 2025
44975a6
javadoc
trask Oct 11, 2025
d31e213
Merge remote-tracking branch 'upstream/main' into kafka-config
trask Oct 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,59 @@ The Kafka clients API provides a way to "intercept" messages before they are sen
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
The interceptor class has to be set in the properties bag used to create the Kafka client.

Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.
##### Recommended approach: Configuring interceptors with KafkaTelemetry

The recommended way to use interceptors is to configure them with a `KafkaTelemetry` instance.
Interceptors will use system properties for additional configuration like captured headers and receive telemetry settings.

For the producer:

```java
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putAll(telemetry.producerInterceptorConfigProperties());

Producer<String, String> producer = new KafkaProducer<>(props);
```

For the consumer:

```java
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.putAll(telemetry.consumerInterceptorConfigProperties());

Consumer<String, String> consumer = new KafkaConsumer<>(props);
```

##### Alternative: Using interceptors with global OpenTelemetry

If you don't explicitly configure the interceptors with a `KafkaTelemetry` instance, they will fall back to using
`GlobalOpenTelemetry.get()` and system properties for configuration.

Use the `OpenTelemetryProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.

```java
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryProducerInterceptor.class.getName());
```

Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.
Use the `OpenTelemetryConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.

```java
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryConsumerInterceptor.class.getName());
```

Note: The `TracingProducerInterceptor` and `TracingConsumerInterceptor` classes are still available for backwards compatibility, but new code should use the `OpenTelemetry*` variants.

The interceptors will use the following system properties for configuration:
- `otel.instrumentation.messaging.experimental.receive-telemetry.enabled` - Enable receive telemetry (default: false)
- `otel.instrumentation.messaging.experimental.capture-headers` - List of headers to capture as span attributes

#### Wrapping clients

The other way is by wrapping the Kafka client with a tracing enabled Kafka client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,33 @@ tasks {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

val testReceiveSpansDisabled by registering(Test::class) {
test {
filter {
excludeTestsMatching("*Deprecated*")
}
}

val testDeprecated by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath
filter {
includeTestsMatching("InterceptorsSuppressReceiveSpansTest")
includeTestsMatching("WrapperSuppressReceiveSpansTest")
includeTestsMatching("*DeprecatedInterceptorsTest")
}
include("**/InterceptorsSuppressReceiveSpansTest.*", "**/WrapperSuppressReceiveSpansTest.*")
forkEvery = 1 // to avoid system properties polluting other tests
systemProperty("otel.instrumentation.messaging.experimental.receive-telemetry.enabled", "true")
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header")
}

test {
val testDeprecatedSuppressReceiveSpans by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath
filter {
excludeTestsMatching("InterceptorsSuppressReceiveSpansTest")
excludeTestsMatching("WrapperSuppressReceiveSpansTest")
includeTestsMatching("*DeprecatedInterceptorsSuppressReceiveSpansTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header")
}

check {
dependsOn(testReceiveSpansDisabled)
dependsOn(testDeprecated, testDeprecatedSuppressReceiveSpans)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import java.util.logging.Logger;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -207,6 +209,62 @@ <K, V> ConsumerRecords<K, V> addTracing(
return Collections.unmodifiableMap(config);
}

/**
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
* {@code OpenTelemetryProducerInterceptor}. Add these resulting properties to the configuration
* map used to initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}.
*
* <p>Example usage:
*
* <pre>{@code
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
* // Map<String, Object> config = new HashMap<>();
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(telemetry.producerInterceptorConfigProperties());
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
* }</pre>
*
* @return the kafka producer interceptor config properties
*/
public Map<String, ?> producerInterceptorConfigProperties() {
Map<String, Object> config = new HashMap<>();
config.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
OpenTelemetryProducerInterceptor.class.getName());
config.put(
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new KafkaTelemetrySupplier(this));
return Collections.unmodifiableMap(config);
}

/**
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
* {@code OpenTelemetryConsumerInterceptor}. Add these resulting properties to the configuration
* map used to initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
*
* <p>Example usage:
*
* <pre>{@code
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
* // Map<String, Object> config = new HashMap<>();
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(telemetry.consumerInterceptorConfigProperties());
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
* }</pre>
*
* @return the kafka consumer interceptor config properties
*/
public Map<String, ?> consumerInterceptorConfigProperties() {
Map<String, Object> config = new HashMap<>();
config.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
OpenTelemetryConsumerInterceptor.class.getName());
config.put(
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new KafkaTelemetrySupplier(this));
return Collections.unmodifiableMap(config);
}

/**
* Build and inject span into record.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import java.io.Serializable;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Wrapper for KafkaTelemetry that can be injected into kafka configuration without breaking
* serialization.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
private static final long serialVersionUID = 1L;
private final transient Object kafkaTelemetry;

KafkaTelemetrySupplier(Object kafkaTelemetry) {
Objects.requireNonNull(kafkaTelemetry);
this.kafkaTelemetry = kafkaTelemetry;
}

@Override
public Object get() {
return kafkaTelemetry;
}

private Object writeReplace() {
// serialize this object to null
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/**
* A ConsumerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name
* or class via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to
* get it instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc.
*
* <p>To configure the interceptor, use {@link KafkaTelemetry#consumerInterceptorConfigProperties}
* to obtain the configuration properties and add them to your consumer configuration.
*
* @see KafkaTelemetry#consumerInterceptorConfigProperties()
*/
public class OpenTelemetryConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
"opentelemetry.kafka-telemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
private String consumerGroup;
private String clientId;

@Override
@CanIgnoreReturnValue
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
if (telemetry == null) {
return records;
}
// timer should be started before fetching ConsumerRecords, but there is no callback for that
Timer timer = Timer.start();
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
if (receiveContext == null) {
receiveContext = Context.current();
}
KafkaConsumerContext consumerContext =
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
return telemetry.addTracing(records, consumerContext);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);

Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
if (telemetrySupplier == null) {
return;
}

if (!(telemetrySupplier instanceof Supplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " is not instance of Supplier");
}

Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " supplier does not return KafkaTelemetry instance");
}

this.telemetry = (KafkaTelemetry) kafkaTelemetry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
* A ProducerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name
* or class via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to
* get it instantiated and used. See more details on ProducerInterceptor usage in its Javadoc.
*
* <p>To configure the interceptor, use {@link KafkaTelemetry#producerInterceptorConfigProperties}
* to obtain the configuration properties and add them to your producer configuration.
*
* @see KafkaTelemetry#producerInterceptorConfigProperties()
*/
public class OpenTelemetryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
"opentelemetry.kafka-telemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
@Nullable private String clientId;

@Override
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
if (telemetry != null) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
}
return producerRecord;
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);

Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
if (telemetrySupplier == null) {
return;
}

if (!(telemetrySupplier instanceof Supplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " is not instance of Supplier");
}

Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " supplier does not return KafkaTelemetry instance");
}

this.telemetry = (KafkaTelemetry) kafkaTelemetry;
}
}
Loading
Loading