Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2a012ae
Initial plan
Copilot Oct 3, 2025
34d68ec
Initial plan for refactoring kafka interceptors
Copilot Oct 3, 2025
3b68915
Refactor kafka interceptors to be configurable
Copilot Oct 3, 2025
ce0eafc
Update README with interceptor configuration documentation
Copilot Oct 3, 2025
b187791
Follow OpenTelemetryMetricsReporter pattern for property validation
Copilot Oct 3, 2025
85ec089
Avoid changing KafkaTelemetry - move config keys to interceptors
Copilot Oct 3, 2025
5f6294b
Use OpenTelemetrySupplier instead of KafkaTelemetrySupplier
Copilot Oct 3, 2025
e5ca107
Clean up unused code and use imports instead of FQCN
Copilot Oct 3, 2025
e791fc8
Add helper methods to KafkaTelemetry for interceptor config
Copilot Oct 3, 2025
97cbf9d
Include INTERCEPTOR_CLASSES_CONFIG in helper methods
Copilot Oct 3, 2025
4f3d30d
Use KafkaTelemetrySupplier to pass exact KafkaTelemetry instance
Copilot Oct 3, 2025
a31210c
Remove experimental from config key, simplify fallback, add test
Copilot Oct 3, 2025
84e69ad
Refactor kafka-clients interceptors to be configurable
Copilot Oct 3, 2025
5419f03
Restore system property checks in fallback with TODO comment
Copilot Oct 3, 2025
f4e568a
Use import instead of FQCN for Supplier in test
Copilot Oct 3, 2025
82248c4
Apply suggestions from code review
trask Oct 3, 2025
f1e7dde
./gradlew spotlessApply
otelbot[bot] Oct 3, 2025
7703b1d
Create new OpenTelemetry interceptors, keep old Tracing ones for back…
Copilot Oct 8, 2025
95d1723
Mark TracingConsumerInterceptor and TracingProducerInterceptor as dep…
Copilot Oct 8, 2025
3fdd463
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
f72ae47
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,41 @@ The Kafka clients API provides a way to "intercept" messages before they are sen
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
The interceptor class has to be set in the properties bag used to create the Kafka client.

##### Recommended approach: Configuring interceptors with OpenTelemetry

The recommended way to use interceptors is to configure them with an `OpenTelemetry` instance.
Interceptors will use system properties for additional configuration like captured headers and receive telemetry settings.

For the producer:

```java
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
props.put(TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier(openTelemetry));

Producer<String, String> producer = new KafkaProducer<>(props);
```

For the consumer:

```java
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.put(TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier(openTelemetry));

Consumer<String, String> consumer = new KafkaConsumer<>(props);
```

##### Alternative: Using interceptors with global OpenTelemetry

If you don't explicitly configure the interceptors with a `KafkaTelemetry` instance, they will fall back to using
`GlobalOpenTelemetry.get()` and system properties for configuration.

Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.

```java
Expand All @@ -47,6 +82,10 @@ Use the `TracingConsumerInterceptor` for the consumer in order to create a "rece
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
```

The interceptors will use the following system properties for configuration:
- `otel.instrumentation.messaging.experimental.receive-telemetry.enabled` - Enable receive telemetry (default: false)
- `otel.instrumentation.messaging.experimental.capture-headers` - List of headers to capture as span attributes

#### Wrapping clients

The other way is by wrapping the Kafka client with a tracing enabled Kafka client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -29,22 +32,18 @@
*/
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

private static final KafkaTelemetry telemetry =
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
private String consumerGroup;
private String clientId;

@Override
@CanIgnoreReturnValue
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
if (telemetry == null) {
return records;
}
// timer should be started before fetching ConsumerRecords, but there is no callback for that
Timer timer = Timer.start();
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
Expand All @@ -67,6 +66,29 @@ public void configure(Map<String, ?> configs) {
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);

// TODO: support experimental attributes config
OpenTelemetry openTelemetry;
Object openTelemetrySupplier = configs.get(CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
if (openTelemetrySupplier == null) {
// Fallback to GlobalOpenTelemetry if not configured
openTelemetry = GlobalOpenTelemetry.get();
} else {
if (!(openTelemetrySupplier instanceof OpenTelemetrySupplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_OPENTELEMETRY_SUPPLIER
+ " is not instance of OpenTelemetrySupplier");
}
openTelemetry = ((OpenTelemetrySupplier) openTelemetrySupplier).get();
}

this.telemetry =
KafkaTelemetry.builder(openTelemetry)
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
Expand All @@ -25,19 +27,17 @@
*/
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

private static final KafkaTelemetry telemetry =
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
@Nullable private String clientId;

@Override
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
if (telemetry != null) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
}
return producerRecord;
}

Expand All @@ -48,9 +48,29 @@ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}
public void close() {}

@Override
public void configure(Map<String, ?> map) {
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
public void configure(Map<String, ?> configs) {
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);

// TODO: support experimental attributes config
OpenTelemetry openTelemetry;
Object openTelemetrySupplier = configs.get(CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
if (openTelemetrySupplier == null) {
// Fallback to GlobalOpenTelemetry if not configured
openTelemetry = GlobalOpenTelemetry.get();
} else {
if (!(openTelemetrySupplier instanceof OpenTelemetrySupplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_OPENTELEMETRY_SUPPLIER
+ " is not instance of OpenTelemetrySupplier");
}
openTelemetry = ((OpenTelemetrySupplier) openTelemetrySupplier).get();
}

this.telemetry =
KafkaTelemetry.builder(openTelemetry)
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot make a copy of this named AbstractDeprecatedInterceptorsTest

and that class should be the same as this class used to be (testing the now deprecated interceptors)

and create copies of the subclasses OldInterceptorsTest and OldInterceptorsSuppressReceiveSpansTest and have those extend AbstractOldInterceptorsTest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot try again

Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,44 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {

static final String greeting = "Hello Kafka!";

private static KafkaTelemetry kafkaTelemetry;

protected static KafkaTelemetry createKafkaTelemetry() {
return KafkaTelemetry.builder(testing.getOpenTelemetry())
.setCapturedHeaders(java.util.Collections.singletonList("Test-Message-Header"))
.setMessagingReceiveInstrumentationEnabled(true)
.build();
}

@Override
public Map<String, Object> producerProps() {
if (kafkaTelemetry == null) {
kafkaTelemetry = createKafkaTelemetry();
}
Map<String, Object> props = super.producerProps();
props.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
props.put(
TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
.OpenTelemetrySupplier(
testing.getOpenTelemetry()));
return props;
}

@Override
public Map<String, Object> consumerProps() {
if (kafkaTelemetry == null) {
kafkaTelemetry = createKafkaTelemetry();
}
Map<String, Object> props = super.consumerProps();
props.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.put(
TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
.OpenTelemetrySupplier(
testing.getOpenTelemetry()));
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {

protected static KafkaTelemetry createKafkaTelemetry() {
return KafkaTelemetry.builder(testing.getOpenTelemetry())
.setMessagingReceiveInstrumentationEnabled(false)
.build();
}

@SuppressWarnings("deprecation") // using deprecated semconv
@Override
void assertTraces() {
Expand Down