Skip to content

Commit 32c05de

Browse files
🪞 10020 - Extract trace context from Kafka producer record headers (#10022)
* Extract trace context from Kafka producer record headers Allow Kafka producers to continue existing traces by extracting trace context from record headers and using it as parent for the produce span. This enables distributed tracing when messages are forwarded between services with pre-existing context. * Link the parent span to the extracted context for KafkaProducerCallback * Port the change to Kafka Clients 3.8+ * Add TextMapExtractAdapter to Kafka producer instrumentation --------- Co-authored-by: Tudor Plugaru <[email protected]>
1 parent 89639f7 commit 32c05de

File tree

5 files changed

+108
-6
lines changed

5 files changed

+108
-6
lines changed

‎dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java‎

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
88
import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId;
99
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
10+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
1112
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1213
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -33,6 +34,7 @@
3334
import datadog.trace.bootstrap.InstrumentationContext;
3435
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
3536
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
37+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
3638
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
3739
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
3840
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
@@ -76,6 +78,7 @@ public String[] helperClassNames() {
7678
packageName + ".KafkaDecorator",
7779
packageName + ".TextMapInjectAdapterInterface",
7880
packageName + ".TextMapInjectAdapter",
81+
packageName + ".TextMapExtractAdapter",
7982
packageName + ".NoopTextMapInjectAdapter",
8083
packageName + ".KafkaProducerCallback",
8184
"datadog.trace.instrumentation.kafka_common.StreamingContext",
@@ -125,12 +128,26 @@ public static AgentScope onEnter(
125128
ClusterIdHolder.set(clusterId);
126129
}
127130

128-
final AgentSpan parent = activeSpan();
129-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
131+
// Try to extract existing trace context from record headers
132+
final AgentSpanContext extractedContext =
133+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
134+
135+
final AgentSpan localActiveSpan = activeSpan();
136+
137+
final AgentSpan span;
138+
final AgentSpan callbackParentSpan;
139+
140+
if (extractedContext != null) {
141+
span = startSpan(KAFKA_PRODUCE, extractedContext);
142+
callbackParentSpan = span;
143+
} else {
144+
span = startSpan(KAFKA_PRODUCE);
145+
callbackParentSpan = localActiveSpan;
146+
}
130147
PRODUCER_DECORATE.afterStart(span);
131148
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
132149

133-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
150+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
134151

135152
if (record.value() == null) {
136153
span.setTag(InstrumentationTags.TOMBSTONE, true);

‎dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy‎

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
2525
import org.apache.kafka.clients.producer.ProducerRecord
2626
import org.apache.kafka.clients.producer.RecordMetadata
2727
import org.apache.kafka.common.TopicPartition
28+
import org.apache.kafka.common.header.internals.RecordHeader
29+
import org.apache.kafka.common.header.internals.RecordHeaders
2830
import org.apache.kafka.common.serialization.StringSerializer
31+
32+
import java.nio.charset.StandardCharsets
2933
import org.junit.Rule
3034
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3135
import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -1013,6 +1017,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10131017
"true" | true
10141018
}
10151019

1020+
def "test producer extracts and uses existing trace context from record headers"() {
1021+
setup:
1022+
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
1023+
def producer = new KafkaProducer<>(senderProps)
1024+
1025+
def existingTraceId = 1234567890123456L
1026+
def existingSpanId = 9876543210987654L
1027+
def headers = new RecordHeaders()
1028+
headers.add(new RecordHeader("x-datadog-trace-id",
1029+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
1030+
headers.add(new RecordHeader("x-datadog-parent-id",
1031+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
1032+
1033+
when:
1034+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
1035+
producer.send(record).get()
1036+
1037+
then:
1038+
TEST_WRITER.waitForTraces(1)
1039+
def producedSpan = TEST_WRITER[0][0]
1040+
// Verify the span used the extracted context as parent
1041+
producedSpan.traceId.toLong() == existingTraceId
1042+
producedSpan.parentId == existingSpanId
1043+
// Verify a new span was created (not reusing the extracted span ID)
1044+
producedSpan.spanId != existingSpanId
1045+
1046+
cleanup:
1047+
producer?.close()
1048+
}
1049+
10161050
def containerProperties() {
10171051
try {
10181052
// Different class names for test and latestDepTest.

‎dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public String[] helperClassNames() {
3838
packageName + ".KafkaDecorator",
3939
packageName + ".TextMapInjectAdapterInterface",
4040
packageName + ".TextMapInjectAdapter",
41+
packageName + ".TextMapExtractAdapter",
4142
packageName + ".NoopTextMapInjectAdapter",
4243
packageName + ".KafkaProducerCallback",
4344
"datadog.trace.instrumentation.kafka_common.StreamingContext",

‎dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java‎

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
66
import static datadog.trace.api.datastreams.DataStreamsTags.create;
77
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
8+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
89
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
910
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -21,6 +22,7 @@
2122
import datadog.trace.bootstrap.InstrumentationContext;
2223
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2324
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
25+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2426
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
2527
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
2628
import net.bytebuddy.asm.Advice;
@@ -46,12 +48,26 @@ public static AgentScope onEnter(
4648
ClusterIdHolder.set(clusterId);
4749
}
4850

49-
final AgentSpan parent = activeSpan();
50-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
51+
// Try to extract existing trace context from record headers
52+
final AgentSpanContext extractedContext =
53+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
54+
55+
final AgentSpan localActiveSpan = activeSpan();
56+
57+
final AgentSpan span;
58+
final AgentSpan callbackParentSpan;
59+
60+
if (extractedContext != null) {
61+
span = startSpan(KAFKA_PRODUCE, extractedContext);
62+
callbackParentSpan = span;
63+
} else {
64+
span = startSpan(KAFKA_PRODUCE);
65+
callbackParentSpan = localActiveSpan;
66+
}
5167
PRODUCER_DECORATE.afterStart(span);
5268
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
5369

54-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
70+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
5571

5672
if (record.value() == null) {
5773
span.setTag(InstrumentationTags.TOMBSTONE, true);

‎dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy‎

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
1818
import org.apache.kafka.clients.producer.ProducerRecord
1919
import org.apache.kafka.clients.producer.RecordMetadata
2020
import org.apache.kafka.common.TopicPartition
21+
import org.apache.kafka.common.header.internals.RecordHeader
22+
import org.apache.kafka.common.header.internals.RecordHeaders
2123
import org.apache.kafka.common.serialization.StringSerializer
24+
25+
import java.nio.charset.StandardCharsets
2226
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
2327
import org.springframework.kafka.core.DefaultKafkaProducerFactory
2428
import org.springframework.kafka.core.KafkaTemplate
@@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
853857
"true" | true
854858
}
855859
860+
def "test producer extracts and uses existing trace context from record headers"() {
861+
setup:
862+
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
863+
def producer = new KafkaProducer<>(senderProps)
864+
865+
def existingTraceId = 1234567890123456L
866+
def existingSpanId = 9876543210987654L
867+
def headers = new RecordHeaders()
868+
headers.add(new RecordHeader("x-datadog-trace-id",
869+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
870+
headers.add(new RecordHeader("x-datadog-parent-id",
871+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
872+
873+
when:
874+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
875+
producer.send(record).get()
876+
877+
then:
878+
TEST_WRITER.waitForTraces(1)
879+
def producedSpan = TEST_WRITER[0][0]
880+
// Verify the span used the extracted context as parent
881+
producedSpan.traceId.toLong() == existingTraceId
882+
producedSpan.parentId == existingSpanId
883+
// Verify a new span was created (not reusing the extracted span ID)
884+
producedSpan.spanId != existingSpanId
885+
886+
cleanup:
887+
producer?.close()
888+
}
889+
856890
def producerSpan(
857891
TraceAssert trace,
858892
Map<String, ?> config,

0 commit comments

Comments
 (0)