Skip to content

Commit b971a9d

Browse files
committed
feat(dsm): Migrate DSM injection call to propagator API 2/3
1 parent 19be22c commit b971a9d

File tree

3 files changed

+27
-12
lines changed

3 files changed

+27
-12
lines changed

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
35
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
46
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
57
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -19,7 +21,10 @@
1921
import static datadog.trace.instrumentation.kafka_common.Utils.computePayloadSizeBytes;
2022
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2123

24+
import datadog.context.propagation.Propagator;
25+
import datadog.context.propagation.Propagators;
2226
import datadog.trace.api.Config;
27+
import datadog.trace.api.datastreams.DataStreamsContext;
2328
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2429
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2530
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
@@ -116,9 +121,9 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
116121
// since the data received from the source may leave the topology on
117122
// some other instance of the application, breaking the context propagation
118123
// for DSM users
119-
propagate()
120-
.injectPathwayContext(
121-
span, val.headers(), SETTER, sortedTags, val.timestamp(), payloadSize);
124+
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
125+
DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize);
126+
dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER);
122127
}
123128
}
124129
} else {

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
35
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
46
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
57
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -14,7 +16,10 @@
1416
import static datadog.trace.instrumentation.kafka_clients38.TextMapInjectAdapter.SETTER;
1517
import static java.util.concurrent.TimeUnit.MILLISECONDS;
1618

19+
import datadog.context.propagation.Propagator;
20+
import datadog.context.propagation.Propagators;
1721
import datadog.trace.api.Config;
22+
import datadog.trace.api.datastreams.DataStreamsContext;
1823
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1924
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2025
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
@@ -116,9 +121,9 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
116121
// since the data received from the source may leave the topology on
117122
// some other instance of the application, breaking the context propagation
118123
// for DSM users
119-
propagate()
120-
.injectPathwayContext(
121-
span, val.headers(), SETTER, sortedTags, val.timestamp(), payloadSize);
124+
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
125+
DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize);
126+
dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER);
122127
}
123128
}
124129
} else {

dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package datadog.trace.instrumentation.kafka_streams;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
5+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
46
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
57
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
68
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -29,9 +31,12 @@
2931
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
3032

3133
import com.google.auto.service.AutoService;
34+
import datadog.context.propagation.Propagator;
35+
import datadog.context.propagation.Propagators;
3236
import datadog.trace.agent.tooling.Instrumenter;
3337
import datadog.trace.agent.tooling.InstrumenterModule;
3438
import datadog.trace.api.Config;
39+
import datadog.trace.api.datastreams.DataStreamsContext;
3540
import datadog.trace.bootstrap.InstrumentationContext;
3641
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
3742
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -263,9 +268,9 @@ public static void start(
263268
.setCheckpoint(span, sortedTags, record.timestamp, payloadSize);
264269
} else {
265270
if (STREAMING_CONTEXT.isSourceTopic(record.topic())) {
266-
propagate()
267-
.injectPathwayContext(
268-
span, record, SR_SETTER, sortedTags, record.timestamp, payloadSize);
271+
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
272+
DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp, payloadSize);
273+
dsmPropagator.inject(span.with(dsmContext), record, SR_SETTER);
269274
}
270275
}
271276
} else {
@@ -345,9 +350,9 @@ public static void start(
345350
.setCheckpoint(span, sortedTags, record.timestamp(), payloadSize);
346351
} else {
347352
if (STREAMING_CONTEXT.isSourceTopic(record.topic())) {
348-
propagate()
349-
.injectPathwayContext(
350-
span, record, PR_SETTER, sortedTags, record.timestamp(), payloadSize);
353+
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
354+
DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp(), payloadSize);
355+
dsmPropagator.inject(span.with(dsmContext), record, PR_SETTER);
351356
}
352357
}
353358
} else {

0 commit comments

Comments
 (0)