33import static datadog .context .propagation .Propagators .defaultPropagator ;
44import static datadog .trace .agent .tooling .bytebuddy .matcher .ClassLoaderMatchers .hasClassNamed ;
55import static datadog .trace .agent .tooling .bytebuddy .matcher .NameMatchers .named ;
6+ import static datadog .trace .api .datastreams .DataStreamsContext .fromTagsWithoutCheckpoint ;
7+ import static datadog .trace .bootstrap .instrumentation .api .AgentPropagation .DSM_CONCERN ;
68import static datadog .trace .bootstrap .instrumentation .api .AgentTracer .activateSpan ;
79import static datadog .trace .bootstrap .instrumentation .api .AgentTracer .activeSpan ;
8- import static datadog .trace .bootstrap .instrumentation .api .AgentTracer .propagate ;
910import static datadog .trace .bootstrap .instrumentation .api .AgentTracer .startSpan ;
1011import static datadog .trace .core .datastreams .TagsProcessor .DIRECTION_OUT ;
1112import static datadog .trace .core .datastreams .TagsProcessor .DIRECTION_TAG ;
2425import static net .bytebuddy .matcher .ElementMatchers .takesArgument ;
2526
2627import com .google .auto .service .AutoService ;
28+ import datadog .context .propagation .Propagator ;
29+ import datadog .context .propagation .Propagators ;
2730import datadog .trace .agent .tooling .Instrumenter ;
2831import datadog .trace .agent .tooling .InstrumenterModule ;
2932import datadog .trace .api .Config ;
33+ import datadog .trace .api .datastreams .DataStreamsContext ;
3034import datadog .trace .bootstrap .InstrumentationContext ;
3135import datadog .trace .bootstrap .instrumentation .api .AgentScope ;
3236import datadog .trace .bootstrap .instrumentation .api .AgentSpan ;
@@ -155,8 +159,9 @@ public static AgentScope onEnter(
155159 // inject the context in the headers, but delay sending the stats until we know the
156160 // message size.
157161 // The stats are saved in the pathway context and sent in PayloadSizeAdvice.
158- propagate ()
159- .injectPathwayContextWithoutSendingStats (span , record .headers (), setter , sortedTags );
162+ Propagator dsmPropagator = Propagators .forConcern (DSM_CONCERN );
163+ DataStreamsContext dsmContext = fromTagsWithoutCheckpoint (sortedTags );
164+ dsmPropagator .inject (span .with (dsmContext ), record .headers (), setter );
160165 AvroSchemaExtractor .tryExtractProducer (record , span );
161166 }
162167 } catch (final IllegalStateException e ) {
@@ -173,8 +178,9 @@ record =
173178 defaultPropagator ().inject (span , record .headers (), setter );
174179 if (STREAMING_CONTEXT .isDisabledForTopic (record .topic ())
175180 || STREAMING_CONTEXT .isSinkTopic (record .topic ())) {
176- propagate ()
177- .injectPathwayContextWithoutSendingStats (span , record .headers (), setter , sortedTags );
181+ Propagator dsmPropagator = Propagators .forConcern (DSM_CONCERN );
182+ DataStreamsContext dsmContext = fromTagsWithoutCheckpoint (sortedTags );
183+ dsmPropagator .inject (span .with (dsmContext ), record .headers (), setter );
178184 AvroSchemaExtractor .tryExtractProducer (record , span );
179185 }
180186 }
0 commit comments