Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2a012ae
Initial plan
Copilot Oct 3, 2025
34d68ec
Initial plan for refactoring kafka interceptors
Copilot Oct 3, 2025
3b68915
Refactor kafka interceptors to be configurable
Copilot Oct 3, 2025
ce0eafc
Update README with interceptor configuration documentation
Copilot Oct 3, 2025
b187791
Follow OpenTelemetryMetricsReporter pattern for property validation
Copilot Oct 3, 2025
85ec089
Avoid changing KafkaTelemetry - move config keys to interceptors
Copilot Oct 3, 2025
5f6294b
Use OpenTelemetrySupplier instead of KafkaTelemetrySupplier
Copilot Oct 3, 2025
e5ca107
Clean up unused code and use imports instead of FQCN
Copilot Oct 3, 2025
e791fc8
Add helper methods to KafkaTelemetry for interceptor config
Copilot Oct 3, 2025
97cbf9d
Include INTERCEPTOR_CLASSES_CONFIG in helper methods
Copilot Oct 3, 2025
4f3d30d
Use KafkaTelemetrySupplier to pass exact KafkaTelemetry instance
Copilot Oct 3, 2025
a31210c
Remove experimental from config key, simplify fallback, add test
Copilot Oct 3, 2025
84e69ad
Refactor kafka-clients interceptors to be configurable
Copilot Oct 3, 2025
5419f03
Restore system property checks in fallback with TODO comment
Copilot Oct 3, 2025
f4e568a
Use import instead of FQCN for Supplier in test
Copilot Oct 3, 2025
82248c4
Apply suggestions from code review
trask Oct 3, 2025
f1e7dde
./gradlew spotlessApply
otelbot[bot] Oct 3, 2025
7703b1d
Create new OpenTelemetry interceptors, keep old Tracing ones for back…
Copilot Oct 8, 2025
95d1723
Mark TracingConsumerInterceptor and TracingProducerInterceptor as dep…
Copilot Oct 8, 2025
3fdd463
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
f72ae47
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,51 @@ The Kafka clients API provides a way to "intercept" messages before they are sen
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
The interceptor class has to be set in the properties bag used to create the Kafka client.

##### Recommended approach: Configuring interceptors with KafkaTelemetry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot update the doc, only need to mention the new approach, no need to mention the now deprecated approach (Alternative: Using interceptors with global OpenTelemetry)


The recommended way to use interceptors is to configure them with a `KafkaTelemetry` instance.
This gives you full control over the configuration, including which `OpenTelemetry` instance to use,
whether to enable receive telemetry, and which headers to capture.

For the producer, configure the interceptor with the KafkaTelemetry instance:

```java
KafkaTelemetry telemetry = KafkaTelemetry.builder(openTelemetry)
.setCapturedHeaders(Arrays.asList("custom-header"))
.build();

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
props.put(TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier(telemetry));

Producer<String, String> producer = new KafkaProducer<>(props);
```

For the consumer, configure the interceptor with the KafkaTelemetry instance:

```java
KafkaTelemetry telemetry = KafkaTelemetry.builder(openTelemetry)
.setMessagingReceiveInstrumentationEnabled(true)
.setCapturedHeaders(Arrays.asList("custom-header"))
.build();

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.put(TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier(telemetry));

Consumer<String, String> consumer = new KafkaConsumer<>(props);
```

##### Alternative: Using interceptors with global OpenTelemetry

If you don't explicitly configure the interceptors with a `KafkaTelemetry` instance, they will fall back to using
`GlobalOpenTelemetry.get()` and system properties for configuration.

Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.

```java
Expand All @@ -47,6 +92,10 @@ Use the `TracingConsumerInterceptor` for the consumer in order to create a "rece
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
```

The interceptors will use the following system properties for configuration:
- `otel.instrumentation.messaging.experimental.receive-telemetry.enabled` - Enable receive telemetry (default: false)
- `otel.instrumentation.messaging.experimental.capture-headers` - List of headers to capture as span attributes

#### Wrapping clients

The other way is by wrapping the Kafka client with a tracing enabled Kafka client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -29,22 +31,19 @@
*/
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

private static final KafkaTelemetry telemetry =
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
"opentelemetry.experimental.kafka-telemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
private String consumerGroup;
private String clientId;

@Override
@CanIgnoreReturnValue
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
if (telemetry == null) {
return records;
}
// timer should be started before fetching ConsumerRecords, but there is no callback for that
Timer timer = Timer.start();
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
Expand All @@ -67,6 +66,37 @@ public void configure(Map<String, ?> configs) {
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);

// TODO: support experimental attributes config
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
if (telemetrySupplier == null) {
// Fallback to GlobalOpenTelemetry if not configured
this.telemetry =
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled",
false))
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
return;
}

if (!(telemetrySupplier instanceof Supplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " is not instance of Supplier");
}

Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " supplier does not return KafkaTelemetry instance");
}

this.telemetry = (KafkaTelemetry) kafkaTelemetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
Expand All @@ -25,19 +26,18 @@
*/
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

private static final KafkaTelemetry telemetry =
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
"opentelemetry.experimental.kafka-telemetry.supplier";

@Nullable private KafkaTelemetry telemetry;
@Nullable private String clientId;

@Override
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
if (telemetry != null) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
}
return producerRecord;
}

Expand All @@ -48,9 +48,36 @@ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}
public void close() {}

@Override
public void configure(Map<String, ?> map) {
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
public void configure(Map<String, ?> configs) {
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);

// TODO: support experimental attributes config
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
if (telemetrySupplier == null) {
// Fallback to GlobalOpenTelemetry if not configured
this.telemetry =
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build();
return;
}

if (!(telemetrySupplier instanceof Supplier)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " is not instance of Supplier");
}

Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
+ " supplier does not return KafkaTelemetry instance");
}

this.telemetry = (KafkaTelemetry) kafkaTelemetry;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot make a copy of this named AbstractDeprecatedInterceptorsTest

and that class should be the same as this class used to be (testing the now deprecated interceptors)

and create copies of the subclasses OldInterceptorsTest and OldInterceptorsSuppressReceiveSpansTest and have those extend AbstractOldInterceptorsTest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot try again

Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,44 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {

static final String greeting = "Hello Kafka!";

private static KafkaTelemetry kafkaTelemetry;

protected static KafkaTelemetry createKafkaTelemetry() {
return KafkaTelemetry.builder(testing.getOpenTelemetry())
.setCapturedHeaders(java.util.Collections.singletonList("Test-Message-Header"))
.setMessagingReceiveInstrumentationEnabled(true)
.build();
}

@Override
public Map<String, Object> producerProps() {
if (kafkaTelemetry == null) {
kafkaTelemetry = createKafkaTelemetry();
}
Map<String, Object> props = super.producerProps();
props.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
props.put(
TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
.KafkaTelemetrySupplier(
kafkaTelemetry));
return props;
}

@Override
public Map<String, Object> consumerProps() {
if (kafkaTelemetry == null) {
kafkaTelemetry = createKafkaTelemetry();
}
Map<String, Object> props = super.consumerProps();
props.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.put(
TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal
.KafkaTelemetrySupplier(
kafkaTelemetry));
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {

protected static KafkaTelemetry createKafkaTelemetry() {
return KafkaTelemetry.builder(testing.getOpenTelemetry())
.setMessagingReceiveInstrumentationEnabled(false)
.build();
}

@SuppressWarnings("deprecation") // using deprecated semconv
@Override
void assertTraces() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;

import java.io.Serializable;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Wrapper for KafkaTelemetry that can be injected into kafka configuration without breaking
* serialization. Used by kafka interceptors to get a configured KafkaTelemetry instance.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
private static final long serialVersionUID = 1L;
private final transient Object kafkaTelemetry;

public KafkaTelemetrySupplier(Object kafkaTelemetry) {
Objects.requireNonNull(kafkaTelemetry);
this.kafkaTelemetry = kafkaTelemetry;
}

@Override
public Object get() {
return kafkaTelemetry;
}

private Object writeReplace() {
// serialize this object to null
return null;
}
}