Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2a012ae
Initial plan
Copilot Oct 3, 2025
34d68ec
Initial plan for refactoring kafka interceptors
Copilot Oct 3, 2025
3b68915
Refactor kafka interceptors to be configurable
Copilot Oct 3, 2025
ce0eafc
Update README with interceptor configuration documentation
Copilot Oct 3, 2025
b187791
Follow OpenTelemetryMetricsReporter pattern for property validation
Copilot Oct 3, 2025
85ec089
Avoid changing KafkaTelemetry - move config keys to interceptors
Copilot Oct 3, 2025
5f6294b
Use OpenTelemetrySupplier instead of KafkaTelemetrySupplier
Copilot Oct 3, 2025
e5ca107
Clean up unused code and use imports instead of FQCN
Copilot Oct 3, 2025
e791fc8
Add helper methods to KafkaTelemetry for interceptor config
Copilot Oct 3, 2025
97cbf9d
Include INTERCEPTOR_CLASSES_CONFIG in helper methods
Copilot Oct 3, 2025
4f3d30d
Use KafkaTelemetrySupplier to pass exact KafkaTelemetry instance
Copilot Oct 3, 2025
a31210c
Remove experimental from config key, simplify fallback, add test
Copilot Oct 3, 2025
84e69ad
Refactor kafka-clients interceptors to be configurable
Copilot Oct 3, 2025
5419f03
Restore system property checks in fallback with TODO comment
Copilot Oct 3, 2025
f4e568a
Use import instead of FQCN for Supplier in test
Copilot Oct 3, 2025
82248c4
Apply suggestions from code review
trask Oct 3, 2025
f1e7dde
./gradlew spotlessApply
otelbot[bot] Oct 3, 2025
7703b1d
Create new OpenTelemetry interceptors, keep old Tracing ones for back…
Copilot Oct 8, 2025
95d1723
Mark TracingConsumerInterceptor and TracingProducerInterceptor as dep…
Copilot Oct 8, 2025
3fdd463
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
f72ae47
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,59 @@ The Kafka clients API provides a way to "intercept" messages before they are sen
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
The interceptor class has to be set in the properties bag used to create the Kafka client.

Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.
##### Recommended approach: Configuring interceptors with KafkaTelemetry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot update the doc, only need to mention the new approach, no need to mention the now deprecated approach (Alternative: Using interceptors with global OpenTelemetry)


The recommended way to use interceptors is to configure them with a `KafkaTelemetry` instance.
Interceptors will use system properties for additional configuration like captured headers and receive telemetry settings.

For the producer:

```java
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putAll(telemetry.producerInterceptorConfigProperties());

Producer<String, String> producer = new KafkaProducer<>(props);
```

For the consumer:

```java
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.putAll(telemetry.consumerInterceptorConfigProperties());

Consumer<String, String> consumer = new KafkaConsumer<>(props);
```

##### Alternative: Using interceptors with global OpenTelemetry

If you don't explicitly configure the interceptors with a `KafkaTelemetry` instance, they will fall back to using
`GlobalOpenTelemetry.get()` and system properties for configuration.

Use the `OpenTelemetryProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.

```java
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryProducerInterceptor.class.getName());
```

Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.
Use the `OpenTelemetryConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.

```java
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, OpenTelemetryConsumerInterceptor.class.getName());
```

Note: The `TracingProducerInterceptor` and `TracingConsumerInterceptor` classes are still available for backwards compatibility, but new code should use the `OpenTelemetry*` variants.

The interceptors will use the following system properties for configuration:
- `otel.instrumentation.messaging.experimental.receive-telemetry.enabled` - Enable receive telemetry (default: false)
- `otel.instrumentation.messaging.experimental.capture-headers` - List of headers to capture as span attributes

#### Wrapping clients

The other way is by wrapping the Kafka client with a tracing enabled Kafka client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import java.util.logging.Logger;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -207,6 +209,62 @@ <K, V> ConsumerRecords<K, V> addTracing(
return Collections.unmodifiableMap(config);
}

/**
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
* {@code OpenTelemetryProducerInterceptor}. Add these resulting properties to the configuration
* map used to initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}.
*
* <p>Example usage:
*
* <pre>{@code
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
* // Map<String, Object> config = new HashMap<>();
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(telemetry.producerInterceptorConfigProperties());
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
* }</pre>
*
* @return the kafka producer interceptor config properties
*/
public Map<String, ?> producerInterceptorConfigProperties() {
Map<String, Object> config = new HashMap<>();
config.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
OpenTelemetryProducerInterceptor.class.getName());
config.put(
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new KafkaTelemetrySupplier(this));
return Collections.unmodifiableMap(config);
}

/**
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
* {@code OpenTelemetryConsumerInterceptor}. Add these resulting properties to the configuration
* map used to initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
*
* <p>Example usage:
*
* <pre>{@code
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
* // Map<String, Object> config = new HashMap<>();
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(telemetry.consumerInterceptorConfigProperties());
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
* }</pre>
*
* @return the kafka consumer interceptor config properties
*/
public Map<String, ?> consumerInterceptorConfigProperties() {
Map<String, Object> config = new HashMap<>();
config.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
OpenTelemetryConsumerInterceptor.class.getName());
config.put(
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new KafkaTelemetrySupplier(this));
return Collections.unmodifiableMap(config);
}

/**
* Build and inject span into record.
*
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot looks like this class is only used in kafka-client-2.6:library module? if so can we move it there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved KafkaTelemetrySupplier from kafka-clients-common-0.11/library/...internal/ to kafka-clients-2.6/library/.../v2_6/internal/ since it's only used in the kafka-clients-2.6 module.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import java.io.Serializable;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Wrapper for KafkaTelemetry that can be injected into kafka configuration without breaking
* serialization.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
private static final long serialVersionUID = 1L;
private final transient Object kafkaTelemetry;

KafkaTelemetrySupplier(Object kafkaTelemetry) {
Objects.requireNonNull(kafkaTelemetry);
this.kafkaTelemetry = kafkaTelemetry;
}

@Override
public Object get() {
return kafkaTelemetry;
}

private Object writeReplace() {
// serialize this object to null
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/**
* A ConsumerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name
* or class via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to
* get it instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc.
*
* <p>To configure the interceptor, use {@link KafkaTelemetry#consumerInterceptorConfigProperties}
* to obtain the configuration properties and add them to your consumer configuration.
*
* @see KafkaTelemetry#consumerInterceptorConfigProperties()
*/
public class OpenTelemetryConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot mark TracingConsumerIntercepter and TracingProducerInterceptor as deprecated and recommend to use these new classes instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marked both TracingConsumerInterceptor and TracingProducerInterceptor as @Deprecated with javadoc comments recommending the new OpenTelemetry* classes. See commit [pending].


public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
"opentelemetry.kafka-telemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
private String consumerGroup;
private String clientId;

@Override
@CanIgnoreReturnValue
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
if (telemetry == null) {
return records;
}
// timer should be started before fetching ConsumerRecords, but there is no callback for that
Timer timer = Timer.start();
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
if (receiveContext == null) {
receiveContext = Context.current();
}
KafkaConsumerContext consumerContext =
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
return telemetry.addTracing(records, consumerContext);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);

Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
if (telemetrySupplier == null) {
return;
}

if (!(telemetrySupplier instanceof Supplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " is not instance of Supplier");
}

Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " supplier does not return KafkaTelemetry instance");
}

this.telemetry = (KafkaTelemetry) kafkaTelemetry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
* A ProducerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name
* or class via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to
* get it instantiated and used. See more details on ProducerInterceptor usage in its Javadoc.
*
* <p>To configure the interceptor, use {@link KafkaTelemetry#producerInterceptorConfigProperties}
* to obtain the configuration properties and add them to your producer configuration.
*
* @see KafkaTelemetry#producerInterceptorConfigProperties()
*/
public class OpenTelemetryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
"opentelemetry.kafka-telemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
@Nullable private String clientId;

@Override
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
if (telemetry != null) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
}
return producerRecord;
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);

Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
if (telemetrySupplier == null) {
return;
}

if (!(telemetrySupplier instanceof Supplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " is not instance of Supplier");
}

Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " supplier does not return KafkaTelemetry instance");
}

this.telemetry = (KafkaTelemetry) kafkaTelemetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
* A ConsumerInterceptor that adds tracing capability. Add this interceptor's class name or class
* via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to get it
* instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc.
*
* @deprecated Use {@link OpenTelemetryConsumerInterceptor} instead.
*/
@Deprecated
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

private static final KafkaTelemetry telemetry =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
* A ProducerInterceptor that adds tracing capability. Add this interceptor's class name or class
* via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to get it
* instantiated and used. See more details on ProducerInterceptor usage in its Javadoc.
*
* @deprecated Use {@link OpenTelemetryProducerInterceptor} instead.
*/
@Deprecated
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

private static final KafkaTelemetry telemetry =
Expand Down
Loading
Loading