Skip to content

Commit 7aa80c6

Browse files
mznetlaurit
andauthored
add the headers capture feature to Kafka 2.6 interceptors (#14290)
Co-authored-by: Lauri Tulmin <[email protected]>
1 parent 4be9801 commit 7aa80c6

File tree

6 files changed

+39
-1
lines changed

6 files changed

+39
-1
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ tasks {
3636
excludeTestsMatching("WrapperSuppressReceiveSpansTest")
3737
}
3838
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
39+
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
3940
}
4041

4142
check {

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

8+
import static java.util.Collections.emptyList;
9+
810
import com.google.errorprone.annotations.CanIgnoreReturnValue;
911
import io.opentelemetry.api.GlobalOpenTelemetry;
1012
import io.opentelemetry.context.Context;
@@ -32,6 +34,9 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
3234
.setMessagingReceiveInstrumentationEnabled(
3335
ConfigPropertiesUtil.getBoolean(
3436
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
37+
.setCapturedHeaders(
38+
ConfigPropertiesUtil.getList(
39+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
3540
.build();
3641

3742
private String consumerGroup;

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

8+
import static java.util.Collections.emptyList;
9+
810
import com.google.errorprone.annotations.CanIgnoreReturnValue;
911
import io.opentelemetry.api.GlobalOpenTelemetry;
12+
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
1013
import java.util.Map;
1114
import java.util.Objects;
1215
import javax.annotation.Nullable;
@@ -22,7 +25,12 @@
2225
*/
2326
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
2427

25-
private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
28+
private static final KafkaTelemetry telemetry =
29+
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
30+
.setCapturedHeaders(
31+
ConfigPropertiesUtil.getList(
32+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
33+
.build();
2634

2735
@Nullable private String clientId;
2836

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ void testInterceptors() throws InterruptedException {
5353
new ProducerRecord<>(SHARED_TOPIC, greeting);
5454
producerRecord
5555
.headers()
56+
// add header to test capturing header value as span attribute
57+
.add("test-message-header", "test".getBytes(StandardCharsets.UTF_8))
5658
// adding baggage header in w3c baggage format
5759
.add(
5860
"baggage",

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

8+
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
89
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
910
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
1011
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
@@ -16,6 +17,7 @@
1617
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
1718
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
1819
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
20+
import static java.util.Collections.singletonList;
1921
import static org.assertj.core.api.Assertions.assertThat;
2022

2123
import io.opentelemetry.api.trace.SpanContext;
@@ -42,6 +44,9 @@ void assertTraces() {
4244
.hasKind(SpanKind.PRODUCER)
4345
.hasParent(trace.getSpan(0))
4446
.hasAttributesSatisfyingExactly(
47+
equalTo(
48+
stringArrayKey("messaging.header.test_message_header"),
49+
singletonList("test")),
4550
equalTo(MESSAGING_SYSTEM, "kafka"),
4651
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
4752
equalTo(MESSAGING_OPERATION, "publish"),
@@ -64,6 +69,9 @@ void assertTraces() {
6469
.hasNoParent()
6570
.hasLinksSatisfying(links -> assertThat(links).isEmpty())
6671
.hasAttributesSatisfyingExactly(
72+
equalTo(
73+
stringArrayKey("messaging.header.test_message_header"),
74+
singletonList("test")),
6775
equalTo(MESSAGING_SYSTEM, "kafka"),
6876
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
6977
equalTo(MESSAGING_OPERATION, "receive"),
@@ -78,6 +86,9 @@ void assertTraces() {
7886
.hasParent(trace.getSpan(0))
7987
.hasLinks(LinkData.create(producerSpanContext.get()))
8088
.hasAttributesSatisfyingExactly(
89+
equalTo(
90+
stringArrayKey("messaging.header.test_message_header"),
91+
singletonList("test")),
8192
equalTo(MESSAGING_SYSTEM, "kafka"),
8293
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
8394
equalTo(MESSAGING_OPERATION, "process"),
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,13 @@
11
description: >
22
This instrumentation provides a library integration that enables messaging spans and metrics for Apache Kafka 2.6+ clients.
3+
configurations:
4+
- name: otel.instrumentation.messaging.experimental.capture-headers
5+
description: A comma-separated list of header names to capture as span attributes.
6+
type: list
7+
default: ''
8+
- name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled
9+
description: >
10+
Enables experimental receive telemetry, which will cause consumers to start a new trace, with
11+
only a span link connecting it to the producer trace.
12+
type: boolean
13+
default: false

0 commit comments

Comments
 (0)