From 1078efde0be44a4193349db8efbb6318bda06edf Mon Sep 17 00:00:00 2001 From: Minje Park Date: Fri, 25 Jul 2025 14:16:32 +0900 Subject: [PATCH 1/3] filter null header value --- .../v0_11/KafkaClientDefaultTest.java | 73 +++++++++++++++++++ .../KafkaConsumerAttributesGetter.java | 1 + .../internal/KafkaConsumerRecordGetter.java | 1 + .../KafkaProducerAttributesGetter.java | 1 + .../KafkaReceiveAttributesGetter.java | 1 + 5 files changed, 77 insertions(+) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index b784017a9987..51bc193fc525 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -8,6 +8,7 @@ import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientPropagationBaseTest; @@ -204,4 +205,76 @@ void testRecordsWithTopicPartitionKafkaConsume() .hasAttributesSatisfyingExactly( processAttributes(null, greeting, false, false)))); } + + @DisplayName("test kafka null header") + @Test + void testKafkaHeaderNull() throws Exception { + String greeting = "Hello Kafka with null header!"; + testing.runWithSpan( + "parent", + () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, 10, greeting); + producerRecord.headers().add("test-message-header", null); + producer + .send( + producerRecord, + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }) + .get(5, TimeUnit.SECONDS); + }); + + awaitUntilConsumerIsReady(); + ConsumerRecords records = poll(Duration.ofSeconds(5)); + assertThat(records.count()).isEqualTo(1); + + for (ConsumerRecord record : records) { + testing.runWithSpan( + "processing", + () -> { + assertThat(record.key()).isEqualTo(10); + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.headers().lastHeader("test-message-header").value()).isNull(); + }); + } + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(SHARED_TOPIC + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes("10", greeting, false)), + span -> + span.hasName("producer callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0))); + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes(false)) + .hasAttribute(AttributeKey.stringKey("test-message-header"), null), + span -> + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + processAttributes("10", greeting, false, false)) + .hasAttribute(AttributeKey.stringKey("test-message-header"), null), + span -> span.hasName("processing").hasParent(trace.getSpan(1)))); + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java index 0be1bca37c6a..1340f6c3505c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java @@ -81,6 +81,7 @@ public Long getBatchMessageCount(KafkaProcessRequest request, @Nullable Void unu @Override public List getMessageHeader(KafkaProcessRequest request, String name) { return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java index a9dd6187e44c..cb0ab35940ad 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java @@ -40,6 +40,7 @@ public String get(@Nullable KafkaProcessRequest carrier, String key) { @Override public Iterator getAll(@Nullable KafkaProcessRequest carrier, String key) { return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .iterator(); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java index feff5150f338..b36840b8085a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java @@ -88,6 +88,7 @@ public Long getBatchMessageCount( @Override public List getMessageHeader(KafkaProducerRequest request, String name) { return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java index a36f7dba11c8..21e7efb0fbc4 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java @@ -90,6 +90,7 @@ public List getMessageHeader(KafkaReceiveRequest request, String name) { .flatMap( consumerRecord -> StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false)) + .filter(header -> header.value() != null) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); } From eab5c931725d3f5e87090bd59a74c082b309f190 Mon Sep 17 00:00:00 2001 From: Minje Park Date: Mon, 28 Jul 2025 16:55:19 +0900 Subject: [PATCH 2/3] fix testKafkaHeaderNull assertions --- .../kafkaclients/v0_11/KafkaClientDefaultTest.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index 51bc193fc525..2321fe3105f6 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; -import static org.assertj.core.api.Assertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; @@ -266,7 +266,11 @@ void testKafkaHeaderNull() throws Exception { .hasKind(SpanKind.CONSUMER) .hasNoParent() .hasAttributesSatisfyingExactly(receiveAttributes(false)) - .hasAttribute(AttributeKey.stringKey("test-message-header"), null), + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .doesNotContainKey( + AttributeKey.stringKey("test-message-header"))), span -> span.hasName(SHARED_TOPIC + " process") .hasKind(SpanKind.CONSUMER) @@ -274,7 +278,11 @@ void testKafkaHeaderNull() throws Exception { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( processAttributes("10", greeting, false, false)) - .hasAttribute(AttributeKey.stringKey("test-message-header"), null), + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .doesNotContainKey( + AttributeKey.stringKey("test-message-header"))), span -> span.hasName("processing").hasParent(trace.getSpan(1)))); } } From 1e8fc0009e6d52dc43a8e1abb48f05f0c83cbffa Mon Sep 17 00:00:00 2001 From: Minje Park Date: Mon, 28 Jul 2025 22:21:14 +0900 Subject: [PATCH 3/3] doesNotContainKey is not needed in testKafkaHeaderNull --- .../v0_11/KafkaClientDefaultTest.java | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index 2321fe3105f6..98e3a8d43553 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -8,7 +8,6 @@ import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientPropagationBaseTest; @@ -265,24 +264,14 @@ void testKafkaHeaderNull() throws Exception { span.hasName(SHARED_TOPIC + " receive") .hasKind(SpanKind.CONSUMER) .hasNoParent() - .hasAttributesSatisfyingExactly(receiveAttributes(false)) - .hasAttributesSatisfying( - attrs -> - assertThat(attrs) - .doesNotContainKey( - AttributeKey.stringKey("test-message-header"))), + .hasAttributesSatisfyingExactly(receiveAttributes(false)), span -> span.hasName(SHARED_TOPIC + " process") .hasKind(SpanKind.CONSUMER) .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( - processAttributes("10", greeting, false, false)) - .hasAttributesSatisfying( - attrs -> - assertThat(attrs) - .doesNotContainKey( - AttributeKey.stringKey("test-message-header"))), + processAttributes("10", greeting, false, false)), span -> span.hasName("processing").hasParent(trace.getSpan(1)))); } }