Skip to content

Commit 1078efd

Browse files
committed
filter null header value
1 parent 1115cda commit 1078efd

File tree

5 files changed

+77
-0
lines changed

5 files changed

+77
-0
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
99
import static org.assertj.core.api.Assertions.assertThat;
1010

11+
import io.opentelemetry.api.common.AttributeKey;
1112
import io.opentelemetry.api.trace.SpanKind;
1213
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest;
1314
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientPropagationBaseTest;
@@ -204,4 +205,76 @@ void testRecordsWithTopicPartitionKafkaConsume()
204205
.hasAttributesSatisfyingExactly(
205206
processAttributes(null, greeting, false, false))));
206207
}
208+
209+
@DisplayName("test kafka null header")
210+
@Test
211+
void testKafkaHeaderNull() throws Exception {
212+
String greeting = "Hello Kafka with null header!";
213+
testing.runWithSpan(
214+
"parent",
215+
() -> {
216+
ProducerRecord<Integer, String> producerRecord =
217+
new ProducerRecord<>(SHARED_TOPIC, 10, greeting);
218+
producerRecord.headers().add("test-message-header", null);
219+
producer
220+
.send(
221+
producerRecord,
222+
(meta, ex) -> {
223+
if (ex == null) {
224+
testing.runWithSpan("producer callback", () -> {});
225+
} else {
226+
testing.runWithSpan("producer exception: " + ex, () -> {});
227+
}
228+
})
229+
.get(5, TimeUnit.SECONDS);
230+
});
231+
232+
awaitUntilConsumerIsReady();
233+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
234+
assertThat(records.count()).isEqualTo(1);
235+
236+
for (ConsumerRecord<?, ?> record : records) {
237+
testing.runWithSpan(
238+
"processing",
239+
() -> {
240+
assertThat(record.key()).isEqualTo(10);
241+
assertThat(record.value()).isEqualTo(greeting);
242+
assertThat(record.headers().lastHeader("test-message-header").value()).isNull();
243+
});
244+
}
245+
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
246+
testing.waitAndAssertSortedTraces(
247+
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
248+
trace -> {
249+
trace.hasSpansSatisfyingExactly(
250+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
251+
span ->
252+
span.hasName(SHARED_TOPIC + " publish")
253+
.hasKind(SpanKind.PRODUCER)
254+
.hasParent(trace.getSpan(0))
255+
.hasAttributesSatisfyingExactly(sendAttributes("10", greeting, false)),
256+
span ->
257+
span.hasName("producer callback")
258+
.hasKind(SpanKind.INTERNAL)
259+
.hasParent(trace.getSpan(0)));
260+
producerSpan.set(trace.getSpan(1));
261+
},
262+
trace ->
263+
trace.hasSpansSatisfyingExactly(
264+
span ->
265+
span.hasName(SHARED_TOPIC + " receive")
266+
.hasKind(SpanKind.CONSUMER)
267+
.hasNoParent()
268+
.hasAttributesSatisfyingExactly(receiveAttributes(false))
269+
.hasAttribute(AttributeKey.stringKey("test-message-header"), null),
270+
span ->
271+
span.hasName(SHARED_TOPIC + " process")
272+
.hasKind(SpanKind.CONSUMER)
273+
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
274+
.hasParent(trace.getSpan(0))
275+
.hasAttributesSatisfyingExactly(
276+
processAttributes("10", greeting, false, false))
277+
.hasAttribute(AttributeKey.stringKey("test-message-header"), null),
278+
span -> span.hasName("processing").hasParent(trace.getSpan(1))));
279+
}
207280
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerAttributesGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public Long getBatchMessageCount(KafkaProcessRequest request, @Nullable Void unu
8181
@Override
8282
public List<String> getMessageHeader(KafkaProcessRequest request, String name) {
8383
return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false)
84+
.filter(header -> header.value() != null)
8485
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
8586
.collect(Collectors.toList());
8687
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public String get(@Nullable KafkaProcessRequest carrier, String key) {
4040
@Override
4141
public Iterator<String> getAll(@Nullable KafkaProcessRequest carrier, String key) {
4242
return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false)
43+
.filter(header -> header.value() != null)
4344
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
4445
.iterator();
4546
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public Long getBatchMessageCount(
8888
@Override
8989
public List<String> getMessageHeader(KafkaProducerRequest request, String name) {
9090
return StreamSupport.stream(request.getRecord().headers().headers(name).spliterator(), false)
91+
.filter(header -> header.value() != null)
9192
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
9293
.collect(Collectors.toList());
9394
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaReceiveAttributesGetter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public List<String> getMessageHeader(KafkaReceiveRequest request, String name) {
9090
.flatMap(
9191
consumerRecord ->
9292
StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false))
93+
.filter(header -> header.value() != null)
9394
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
9495
.collect(Collectors.toList());
9596
}

0 commit comments

Comments
 (0)