Skip to content

Commit 61d862b

Browse files
committed
feat(dsm): Implement DSM context injection
1 parent 46fd85c commit 61d862b

File tree

4 files changed

+174
-6
lines changed

4 files changed

+174
-6
lines changed

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java renamed to dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsPropagator.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,40 @@
11
package datadog.trace.core.datastreams;
22

3+
import static datadog.trace.api.DDTags.PATHWAY_HASH;
4+
import static datadog.trace.bootstrap.instrumentation.api.PathwayContext.PROPAGATION_KEY_BASE64;
5+
36
import datadog.context.Context;
47
import datadog.context.propagation.CarrierSetter;
58
import datadog.context.propagation.CarrierVisitor;
69
import datadog.context.propagation.Propagator;
710
import datadog.trace.api.TraceConfig;
11+
import datadog.trace.api.datastreams.DataStreamsContext;
812
import datadog.trace.api.time.TimeSource;
913
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1014
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
1115
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
1216
import datadog.trace.bootstrap.instrumentation.api.TagContext;
17+
import java.io.IOException;
1318
import java.util.function.Supplier;
1419
import javax.annotation.Nullable;
1520
import javax.annotation.ParametersAreNonnullByDefault;
1621

1722
// TODO Javadoc
1823
@ParametersAreNonnullByDefault
19-
public class DataStreamPropagator implements Propagator {
24+
public class DataStreamsPropagator implements Propagator {
25+
private final DataStreamsMonitoring dataStreamsMonitoring;
2026
private final Supplier<TraceConfig> traceConfigSupplier;
2127
private final TimeSource timeSource;
2228
private final long hashOfKnownTags;
2329
private final String serviceNameOverride;
2430

25-
public DataStreamPropagator(
31+
public DataStreamsPropagator(
32+
DataStreamsMonitoring dataStreamsMonitoring,
2633
Supplier<TraceConfig> traceConfigSupplier,
2734
TimeSource timeSource,
2835
long hashOfKnownTags,
2936
String serviceNameOverride) {
37+
this.dataStreamsMonitoring = dataStreamsMonitoring;
3038
this.traceConfigSupplier = traceConfigSupplier;
3139
this.timeSource = timeSource;
3240
this.hashOfKnownTags = hashOfKnownTags;
@@ -35,12 +43,49 @@ public DataStreamPropagator(
3543

3644
@Override
3745
public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
38-
// TODO Still in CorePropagation, not migrated yet
46+
// TODO Pathway context needs to be stored into its own context element instead of span context
47+
AgentSpan span = AgentSpan.fromContext(context);
48+
DataStreamsContext dsmContext = DataStreamsContext.fromContext(context);
49+
PathwayContext pathwayContext;
50+
if (span == null
51+
|| dsmContext == null
52+
|| (pathwayContext = span.context().getPathwayContext()) == null
53+
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) {
54+
return;
55+
}
56+
57+
// TODO Allow set checkpoint to use DsmContext as parameter?
58+
pathwayContext.setCheckpoint(
59+
dsmContext.sortedTags(),
60+
dsmContext.sendCheckpoint() ? dataStreamsMonitoring::add : pathwayContext::saveStats,
61+
dsmContext.defaultTimestamp(),
62+
dsmContext.payloadSizeBytes());
63+
64+
boolean injected = injectPathwayContext(pathwayContext, carrier, setter);
65+
66+
if (injected && pathwayContext.getHash() != 0) {
67+
span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash()));
68+
}
69+
}
70+
71+
private <C> boolean injectPathwayContext(
72+
PathwayContext pathwayContext, C carrier, CarrierSetter<C> setter) {
73+
try {
74+
String encodedContext = pathwayContext.encode();
75+
if (encodedContext != null) {
76+
// LOGGER.debug("Injecting pathway context {}", pathwayContext);
77+
setter.set(carrier, PROPAGATION_KEY_BASE64, encodedContext);
78+
return true;
79+
}
80+
} catch (IOException e) {
81+
// LOGGER.debug("Unable to set encode pathway context", e);
82+
}
83+
return false;
3984
}
4085

4186
@Override
4287
public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor) {
43-
// TODO Pathway context needs to be stored into its own context element
88+
// TODO Pathway context needs to be stored into its own context element instead of span context
4489
// Get span context to store pathway context into
4590
TagContext spanContext = getSpanContextOrNull(context);
4691
PathwayContext pathwayContext;

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,12 @@ public PathwayContext newPathwayContext() {
203203

204204
@Override
205205
public Propagator propagator() {
206-
return new DataStreamPropagator(
207-
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, getThreadServiceName());
206+
return new DataStreamsPropagator(
207+
this,
208+
this.traceConfigSupplier,
209+
this.timeSource,
210+
this.hashOfKnownTags,
211+
getThreadServiceName());
208212
}
209213

210214
@Override
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package datadog.trace.api.datastreams;
2+
3+
import datadog.context.Context;
4+
import datadog.context.ContextKey;
5+
import datadog.context.ImplicitContextKeyed;
6+
import java.util.LinkedHashMap;
7+
8+
public class DataStreamsContext implements ImplicitContextKeyed {
9+
private static final ContextKey<DataStreamsContext> CONTEXT_KEY =
10+
ContextKey.named("dsm-context-key");
11+
12+
final LinkedHashMap<String, String> sortedTags;
13+
final long defaultTimestamp;
14+
final long payloadSizeBytes;
15+
final boolean sendCheckpoint;
16+
17+
public static DataStreamsContext fromContext(Context context) {
18+
return context.get(CONTEXT_KEY);
19+
}
20+
21+
public static DataStreamsContext fromTags(LinkedHashMap<String, String> sortedTags) {
22+
return new DataStreamsContext(sortedTags, 0, 0, true);
23+
}
24+
25+
public static DataStreamsContext fromKafka(
26+
LinkedHashMap<String, String> sortedTags, long defaultTimestamp, long payloadSizeBytes) {
27+
return new DataStreamsContext(sortedTags, defaultTimestamp, payloadSizeBytes, true);
28+
}
29+
30+
public static DataStreamsContext fromTagsWithoutCheckpoint(
31+
LinkedHashMap<String, String> sortedTags) {
32+
return new DataStreamsContext(sortedTags, 0, 0, false);
33+
}
34+
35+
// That's basically a record for now
36+
private DataStreamsContext(
37+
LinkedHashMap<String, String> sortedTags,
38+
long defaultTimestamp,
39+
long payloadSizeBytes,
40+
boolean sendCheckpoint) {
41+
this.sortedTags = sortedTags;
42+
this.defaultTimestamp = defaultTimestamp;
43+
this.payloadSizeBytes = payloadSizeBytes;
44+
this.sendCheckpoint = sendCheckpoint;
45+
}
46+
47+
public LinkedHashMap<String, String> sortedTags() {
48+
return this.sortedTags;
49+
}
50+
51+
public long defaultTimestamp() {
52+
return this.defaultTimestamp;
53+
}
54+
55+
public long payloadSizeBytes() {
56+
return this.payloadSizeBytes;
57+
}
58+
59+
public boolean sendCheckpoint() {
60+
return this.sendCheckpoint;
61+
}
62+
63+
@Override
64+
public Context storeInto(Context context) {
65+
return context.with(CONTEXT_KEY, this);
66+
}
67+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package datadog.trace.api.datastreams
2+
3+
import datadog.context.Context
4+
import spock.lang.Specification
5+
6+
class DataStreamsContextTest extends Specification {
7+
def 'test constructor'() {
8+
setup:
9+
def tags = new LinkedHashMap()
10+
11+
when:
12+
def dsmContext = DataStreamsContext.fromTags(tags)
13+
14+
then:
15+
dsmContext.sortedTags() == tags
16+
dsmContext.defaultTimestamp() == 0
17+
dsmContext.payloadSizeBytes() == 0
18+
dsmContext.sendCheckpoint()
19+
20+
when:
21+
dsmContext = DataStreamsContext.fromTagsWithoutCheckpoint(tags)
22+
23+
then:
24+
dsmContext.sortedTags() == tags
25+
dsmContext.defaultTimestamp() == 0
26+
dsmContext.payloadSizeBytes() == 0
27+
!dsmContext.sendCheckpoint()
28+
29+
when:
30+
def timestamp = 123L
31+
def payloadSize = 456L
32+
dsmContext = DataStreamsContext.fromKafka(tags, timestamp, payloadSize)
33+
34+
then:
35+
dsmContext.sortedTags() == tags
36+
dsmContext.defaultTimestamp() == timestamp
37+
dsmContext.payloadSizeBytes() == payloadSize
38+
dsmContext.sendCheckpoint()
39+
}
40+
41+
def 'test context store'() {
42+
setup:
43+
def tags = new LinkedHashMap()
44+
45+
when:
46+
def dsmContext = DataStreamsContext.fromTags(tags)
47+
def context = dsmContext.storeInto(Context.root())
48+
49+
then:
50+
DataStreamsContext.fromContext(context) == dsmContext
51+
}
52+
}

0 commit comments

Comments
 (0)