diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index 17dadd3bc579..eb5a22e48c86 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -36,6 +36,7 @@ tasks { excludeTestsMatching("WrapperSuppressReceiveSpansTest") } jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=baggage") } check { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 368767c1022a..8f60b07e6a88 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static java.util.Collections.emptyList; + import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; @@ -32,6 +34,9 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor implements ProducerInterceptor { - private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); + private static final KafkaTelemetry telemetry = + KafkaTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders( + ConfigPropertiesUtil.getList( + "otel.instrumentation.messaging.experimental.capture-headers", emptyList())) + .build(); @Nullable private String clientId; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 8f4f65dc2262..85dbb2cfe60c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static io.opentelemetry.api.common.AttributeKey.stringArrayKey; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; @@ -16,12 +17,15 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; @@ -42,6 +46,11 @@ void assertTraces() { .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( + equalTo( + stringArrayKey("messaging.header.baggage"), + asList( + "test-baggage-key-1=test-baggage-value-1", + "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), @@ -64,6 +73,11 @@ void assertTraces() { .hasNoParent() .hasLinksSatisfying(links -> assertThat(links).isEmpty()) .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringArrayKey("messaging.header.baggage"), + Arrays.asList( + "test-baggage-key-1=test-baggage-value-1", + "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "receive"), @@ -78,6 +92,11 @@ void assertTraces() { .hasParent(trace.getSpan(0)) .hasLinks(LinkData.create(producerSpanContext.get())) .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringArrayKey("messaging.header.baggage"), + Arrays.asList( + "test-baggage-key-1=test-baggage-value-1", + "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "process"), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml index 5898d2e01872..652cea4da3f3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml @@ -1,2 +1,13 @@ description: > - This instrumentation provides a library integeration that enables messaging spans and metrics for Apache Kafka 2.6+ clients. + This instrumentation provides a library integrations that enables messaging spans and metrics for Apache Kafka 2.6+ clients. +configurations: + - name: otel.instrumentation.messaging.experimental.capture-headers + description: A comma-separated list of header names to capture as span attributes. + type: list + default: '' + - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled + description: > + Enables experimental receive telemetry, which will cause consumers to start a new trace, with + only a span link connecting it to the producer trace. + type: boolean + default: false