Skip to content

Commit 9decdbe

Browse files
authored
Fix NPE when consuming kafka messages with null header (#14332)
1 parent 112f2a3 commit 9decdbe

File tree

5 files changed

+75
-1
lines changed

5 files changed

+75
-1
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
77

88
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
9-
import static org.assertj.core.api.Assertions.assertThat;
9+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
1010

1111
import io.opentelemetry.api.trace.SpanKind;
1212
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest;
@@ -204,4 +204,74 @@ void testRecordsWithTopicPartitionKafkaConsume()
204204
.hasAttributesSatisfyingExactly(
205205
processAttributes(null, greeting, false, false))));
206206
}
207+
208+
@DisplayName("test kafka null header")
209+
@Test
210+
void testKafkaHeaderNull() throws Exception {
211+
String greeting = "Hello Kafka with null header!";
212+
testing.runWithSpan(
213+
"parent",
214+
() -> {
215+
ProducerRecord<Integer, String> producerRecord =
216+
new ProducerRecord<>(SHARED_TOPIC, 10, greeting);
217+
producerRecord.headers().add("test-message-header", null);
218+
producer
219+
.send(
220+
producerRecord,
221+
(meta, ex) -> {
222+
if (ex == null) {
223+
testing.runWithSpan("producer callback", () -> {});
224+
} else {
225+
testing.runWithSpan("producer exception: " + ex, () -> {});
226+
}
227+
})
228+
.get(5, TimeUnit.SECONDS);
229+
});
230+
231+
awaitUntilConsumerIsReady();
232+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
233+
assertThat(records.count()).isEqualTo(1);
234+
235+
for (ConsumerRecord<?, ?> record : records) {
236+
testing.runWithSpan(
237+
"processing",
238+
() -> {
239+
assertThat(record.key()).isEqualTo(10);
240+
assertThat(record.value()).isEqualTo(greeting);
241+
assertThat(record.headers().lastHeader("test-message-header").value()).isNull();
242+
});
243+
}
244+
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
245+
testing.waitAndAssertSortedTraces(
246+
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
247+
trace -> {
248+
trace.hasSpansSatisfyingExactly(
249+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
250+
span ->
251+
span.hasName(SHARED_TOPIC + " publish")
252+
.hasKind(SpanKind.PRODUCER)
253+
.hasParent(trace.getSpan(0))
254+
.hasAttributesSatisfyingExactly(sendAttributes("10", greeting, false)),
255+
span ->
256+
span.hasName("producer callback")
257+
.hasKind(SpanKind.INTERNAL)
258+
.hasParent(trace.getSpan(0)));
259+
producerSpan.set(trace.getSpan(1));
260+
},
261+
trace ->
262+
trace.hasSpansSatisfyingExactly(
263+
span ->
264+
span.hasName(SHARED_TOPIC + " receive")
265+
.hasKind(SpanKind.CONSUMER)
266+
.hasNoParent()
267+
.hasAttributesSatisfyingExactly(receiveAttributes(false)),
268+
span ->
269+
span.hasName(SHARED_TOPIC + " process")
270+
.hasKind(SpanKind.CONSUMER)
271+
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
272+
.hasParent(trace.getSpan(0))
273+
.hasAttributesSatisfyingExactly(
274+
processAttributes("10", greeting, false, false)),
275+
span -> span.hasName("processing").hasParent(trace.getSpan(1))));
276+
}
207277
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public Long getBatchMessageCount(KafkaProcessRequest request, @Nullable Void unu
8181
@Override
8282
public List<String> getMessageHeader(KafkaProcessRequest request, String name) {
8383
return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false)
84+
.filter(header -> header.value() != null)
8485
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
8586
.collect(Collectors.toList());
8687
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public String get(@Nullable KafkaProcessRequest carrier, String key) {
4040
@Override
4141
public Iterator<String> getAll(@Nullable KafkaProcessRequest carrier, String key) {
4242
return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false)
43+
.filter(header -> header.value() != null)
4344
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
4445
.iterator();
4546
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public Long getBatchMessageCount(
8888
@Override
8989
public List<String> getMessageHeader(KafkaProducerRequest request, String name) {
9090
return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false)
91+
.filter(header -> header.value() != null)
9192
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
9293
.collect(Collectors.toList());
9394
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public List<String> getMessageHeader(KafkaReceiveRequest request, String name) {
9090
.flatMap(
9191
consumerRecord ->
9292
StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false))
93+
.filter(header -> header.value() != null)
9394
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
9495
.collect(Collectors.toList());
9596
}

0 commit comments

Comments
 (0)