Skip to content

Commit f11639e

Browse files
authored
NPE fix for null keys in consumer records (#219)
1 parent 6e1c16f commit f11639e

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

opentelemetry/opentelemetry-module/src/main/java/ru/tinkoff/kora/opentelemetry/module/kafka/consumer/OpentelemetryKafkaConsumerTracer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.HashMap;
1919
import java.util.HashSet;
2020
import java.util.Map;
21+
import java.util.Objects;
2122

2223
public class OpentelemetryKafkaConsumerTracer implements KafkaConsumerTracer {
2324
private final Tracer tracer;
@@ -83,9 +84,11 @@ public KafkaConsumerRecordSpan get(ConsumerRecord<?, ?> record) {
8384
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka")
8485
.setAttribute(SemanticAttributes.MESSAGING_SOURCE_NAME, record.topic())
8586
.setAttribute(SemanticAttributes.MESSAGING_SOURCE_KIND, "topic")
86-
.setAttribute(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, record.key().toString())
8787
.setAttribute(SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION, (long) record.partition())
8888
.setAttribute(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset());
89+
try {
90+
recordSpanBuilder.setAttribute(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, Objects.toString(record.key()));
91+
} catch (Exception ignore) {}
8992
var recordSpan = recordSpanBuilder.startSpan();
9093
OpentelemetryContext.set(Context.current(), this.rootCtx.add(recordSpan));
9194

0 commit comments

Comments
 (0)