Skip to content
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
2a012ae
Initial plan
Copilot Oct 3, 2025
34d68ec
Initial plan for refactoring kafka interceptors
Copilot Oct 3, 2025
3b68915
Refactor kafka interceptors to be configurable
Copilot Oct 3, 2025
ce0eafc
Update README with interceptor configuration documentation
Copilot Oct 3, 2025
b187791
Follow OpenTelemetryMetricsReporter pattern for property validation
Copilot Oct 3, 2025
85ec089
Avoid changing KafkaTelemetry - move config keys to interceptors
Copilot Oct 3, 2025
5f6294b
Use OpenTelemetrySupplier instead of KafkaTelemetrySupplier
Copilot Oct 3, 2025
e5ca107
Clean up unused code and use imports instead of FQCN
Copilot Oct 3, 2025
e791fc8
Add helper methods to KafkaTelemetry for interceptor config
Copilot Oct 3, 2025
97cbf9d
Include INTERCEPTOR_CLASSES_CONFIG in helper methods
Copilot Oct 3, 2025
4f3d30d
Use KafkaTelemetrySupplier to pass exact KafkaTelemetry instance
Copilot Oct 3, 2025
a31210c
Remove experimental from config key, simplify fallback, add test
Copilot Oct 3, 2025
84e69ad
Refactor kafka-clients interceptors to be configurable
Copilot Oct 3, 2025
5419f03
Restore system property checks in fallback with TODO comment
Copilot Oct 3, 2025
f4e568a
Use import instead of FQCN for Supplier in test
Copilot Oct 3, 2025
82248c4
Apply suggestions from code review
trask Oct 3, 2025
f1e7dde
./gradlew spotlessApply
otelbot[bot] Oct 3, 2025
7703b1d
Create new OpenTelemetry interceptors, keep old Tracing ones for back…
Copilot Oct 8, 2025
95d1723
Mark TracingConsumerInterceptor and TracingProducerInterceptor as dep…
Copilot Oct 8, 2025
3fdd463
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
f72ae47
Update instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/…
trask Oct 8, 2025
2bc5492
Test deprecated classes until they're removed
trask Oct 9, 2025
f24a0a4
fix
trask Oct 9, 2025
45c7510
fix
trask Oct 9, 2025
85aaaaa
fix
trask Oct 9, 2025
9b9b075
./gradlew spotlessApply
otelbot[bot] Oct 9, 2025
6aa6306
fix
trask Oct 9, 2025
a541ee6
fix
trask Oct 9, 2025
06184d1
docs
trask Oct 9, 2025
30bc6dd
internal
trask Oct 10, 2025
b207b25
simplify
trask Oct 10, 2025
a2b44cc
Apply suggestions from code review
trask Oct 10, 2025
d861b88
KafkaHelper
trask Oct 10, 2025
3c4f97f
fixes
trask Oct 10, 2025
ff96054
split
trask Oct 10, 2025
61813b5
split2
trask Oct 10, 2025
41c767c
split test
trask Oct 10, 2025
810c222
more
trask Oct 10, 2025
a2286d8
up
trask Oct 10, 2025
536c1cd
up
trask Oct 10, 2025
bbd21cb
fix
trask Oct 10, 2025
385bc15
up
trask Oct 10, 2025
84c9212
up
trask Oct 10, 2025
686de0c
up
trask Oct 10, 2025
eb7ecf9
up
trask Oct 10, 2025
3fe741b
up
trask Oct 10, 2025
4f58c8d
revert
trask Oct 11, 2025
45c3e11
more
trask Oct 11, 2025
96bc1c9
./gradlew spotlessApply
otelbot[bot] Oct 11, 2025
44975a6
javadoc
trask Oct 11, 2025
d31e213
Merge remote-tracking branch 'upstream/main' into kafka-config
trask Oct 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,31 @@ There are two options for capturing traces, either using interceptors or wrappin

#### Using interceptors

The Kafka clients API provides a way to "intercept" messages before they are sent to the brokers as well as messages received from the broker before being passed to the application.
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
The interceptor class has to be set in the properties bag used to create the Kafka client.
The Kafka clients API provides a way to intercept messages before they are sent to the brokers as well as messages received from the broker before being passed to the application.

Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.
To intercept messages and emit telemetry for a `KafkaProducer`:

```java
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putAll(telemetry.producerInterceptorConfigProperties());

Producer<String, String> producer = new KafkaProducer<>(props);
```

Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.
To intercept messages and emit telemetry for a `KafkaConsumer`:

```java
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.putAll(telemetry.consumerInterceptorConfigProperties());

Consumer<String, String> consumer = new KafkaConsumer<>(props);
```

#### Wrapping clients
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,33 @@ tasks {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

val testReceiveSpansDisabled by registering(Test::class) {
test {
filter {
excludeTestsMatching("*Deprecated*")
}
}

val testDeprecated by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath
filter {
includeTestsMatching("InterceptorsSuppressReceiveSpansTest")
includeTestsMatching("WrapperSuppressReceiveSpansTest")
includeTestsMatching("*DeprecatedInterceptorsTest")
}
include("**/InterceptorsSuppressReceiveSpansTest.*", "**/WrapperSuppressReceiveSpansTest.*")
forkEvery = 1 // to avoid system properties polluting other tests
systemProperty("otel.instrumentation.messaging.experimental.receive-telemetry.enabled", "true")
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header")
}

test {
val testDeprecatedSuppressReceiveSpans by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath
filter {
excludeTestsMatching("InterceptorsSuppressReceiveSpansTest")
excludeTestsMatching("WrapperSuppressReceiveSpansTest")
includeTestsMatching("*DeprecatedInterceptorsSuppressReceiveSpansTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header")
}

check {
dependsOn(testReceiveSpansDisabled)
dependsOn(testDeprecated, testDeprecatedSuppressReceiveSpans)
}
}

Expand Down
Loading
Loading