diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index b784017a9987..98e3a8d43553 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; -import static org.assertj.core.api.Assertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest; @@ -204,4 +204,74 @@ void testRecordsWithTopicPartitionKafkaConsume() .hasAttributesSatisfyingExactly( processAttributes(null, greeting, false, false)))); } + + @DisplayName("test kafka null header") + @Test + void testKafkaHeaderNull() throws Exception { + String greeting = "Hello Kafka with null header!"; + testing.runWithSpan( + "parent", + () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, 10, greeting); + producerRecord.headers().add("test-message-header", null); + producer + .send( + producerRecord, + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }) + .get(5, TimeUnit.SECONDS); + }); + + awaitUntilConsumerIsReady(); + ConsumerRecords records = poll(Duration.ofSeconds(5)); + assertThat(records.count()).isEqualTo(1); + + for (ConsumerRecord record : records) { + testing.runWithSpan( + "processing", + () -> { + assertThat(record.key()).isEqualTo(10); + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.headers().lastHeader("test-message-header").value()).isNull(); + }); + } + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(SHARED_TOPIC + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes("10", greeting, false)), + span -> + span.hasName("producer callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0))); + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes(false)), + span -> + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + processAttributes("10", greeting, false, false)), + span -> span.hasName("processing").hasParent(trace.getSpan(1)))); + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java index 0be1bca37c6a..1340f6c3505c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java @@ -81,6 +81,7 @@ public Long getBatchMessageCount(KafkaProcessRequest request, @Nullable Void unu @Override public List getMessageHeader(KafkaProcessRequest request, String name) { return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java index a9dd6187e44c..cb0ab35940ad 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java @@ -40,6 +40,7 @@ public String get(@Nullable KafkaProcessRequest carrier, String key) { @Override public Iterator getAll(@Nullable KafkaProcessRequest carrier, String key) { return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .iterator(); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java index feff5150f338..b36840b8085a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java @@ -88,6 +88,7 @@ public Long getBatchMessageCount( @Override public List getMessageHeader(KafkaProducerRequest request, String name) { return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java index a36f7dba11c8..21e7efb0fbc4 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java @@ -90,6 +90,7 @@ public List getMessageHeader(KafkaReceiveRequest request, String name) { .flatMap( consumerRecord -> StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false)) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); }