Skip to content

Commit ee6947e

Browse files
Add baggage propagation telemetry (#9289)
* WIP: adding baggage propagation telemetry * suggestion of adding baggageMetrics as constant * adding tests * adding line at end of file
1 parent d7684b9 commit ee6947e

File tree

5 files changed

+388
-0
lines changed

5 files changed

+388
-0
lines changed

dd-trace-core/src/main/java/datadog/trace/core/baggage/BaggagePropagator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.context.propagation.CarrierVisitor;
88
import datadog.context.propagation.Propagator;
99
import datadog.trace.api.Config;
10+
import datadog.trace.api.metrics.BaggageMetrics;
1011
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1112
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
1213
import datadog.trace.bootstrap.instrumentation.api.Baggage;
@@ -26,6 +27,7 @@
2627
public class BaggagePropagator implements Propagator {
2728
private static final Logger LOG = LoggerFactory.getLogger(BaggagePropagator.class);
2829
private static final PercentEscaper UTF_ESCAPER = PercentEscaper.create();
30+
private static final BaggageMetrics BAGGAGE_METRICS = BaggageMetrics.getInstance();
2931
static final String BAGGAGE_KEY = "baggage";
3032
private final boolean injectBaggage;
3133
private final boolean extractBaggage;
@@ -89,11 +91,13 @@ public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
8991
processedItems++;
9092
// reached the max number of baggage items allowed
9193
if (processedItems == this.maxItems) {
94+
BAGGAGE_METRICS.onBaggageTruncatedByItemLimit();
9295
break;
9396
}
9497
// Drop newest k/v pair if adding it leads to exceeding the limit
9598
if (currentBytes + escapedKey.size + escapedVal.size + extraBytes > this.maxBytes) {
9699
baggageText.setLength(currentBytes);
100+
BAGGAGE_METRICS.onBaggageTruncatedByByteLimit();
97101
break;
98102
}
99103
currentBytes += escapedKey.size + escapedVal.size + extraBytes;
@@ -103,6 +107,9 @@ public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
103107
// Save header as cache to re-inject it later if baggage did not change
104108
baggage.setW3cHeader(headerValue);
105109
setter.set(carrier, BAGGAGE_KEY, headerValue);
110+
111+
// Record successful baggage injection for telemetry
112+
BAGGAGE_METRICS.onBaggageInjected();
106113
}
107114

108115
@Override
@@ -117,6 +124,9 @@ public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor
117124
return context;
118125
}
119126

127+
// Record successful baggage extraction for telemetry
128+
BAGGAGE_METRICS.onBaggageExtracted();
129+
120130
// TODO: consider a better way to link baggage with the extracted (legacy) TagContext
121131
AgentSpan extractedSpan = AgentSpan.fromContext(context);
122132
if (extractedSpan != null) {
@@ -158,12 +168,14 @@ private Map<String, String> parseBaggageHeaders(String input) {
158168
if (kvSeparatorInd > end) {
159169
LOG.debug(
160170
"Dropping baggage headers due to key with no value {}", input.substring(start, end));
171+
BAGGAGE_METRICS.onBaggageMalformed();
161172
return emptyMap();
162173
}
163174
String key = decode(input.substring(start, kvSeparatorInd).trim());
164175
String value = decode(input.substring(kvSeparatorInd + 1, end).trim());
165176
if (key.isEmpty() || value.isEmpty()) {
166177
LOG.debug("Dropping baggage headers due to empty k/v {}:{}", key, value);
178+
BAGGAGE_METRICS.onBaggageMalformed();
167179
return emptyMap();
168180
}
169181
baggage.put(key, value);
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package datadog.trace.core.baggage
2+
3+
import datadog.context.Context
4+
import datadog.trace.api.Config
5+
import datadog.trace.api.metrics.BaggageMetrics
6+
import datadog.trace.api.telemetry.CoreMetricCollector
7+
import spock.lang.Specification
8+
9+
class BaggagePropagatorTelemetryTest extends Specification {
10+
11+
def "should directly increment baggage metrics"() {
12+
given:
13+
def baggageMetrics = BaggageMetrics.getInstance()
14+
def collector = CoreMetricCollector.getInstance()
15+
16+
when:
17+
baggageMetrics.onBaggageInjected()
18+
collector.prepareMetrics()
19+
def metrics = collector.drain()
20+
21+
then:
22+
def baggageMetric = metrics.find { it.metricName == "context_header_style.injected" }
23+
baggageMetric != null
24+
baggageMetric.value >= 1
25+
baggageMetric.tags.contains("header_style:baggage")
26+
}
27+
28+
def "should increment telemetry counter when baggage is successfully extracted"() {
29+
given:
30+
def config = Mock(Config) {
31+
isBaggageExtract() >> true
32+
isBaggageInject() >> true
33+
getBaggageMaxItems() >> 64
34+
getBaggageMaxBytes() >> 8192
35+
}
36+
def propagator = new BaggagePropagator(config)
37+
def context = Context.root()
38+
def carrier = ["baggage": "key1=value1,key2=value2"]
39+
def visitor = { map, consumer ->
40+
map.each { k, v -> consumer.accept(k, v) }
41+
}
42+
def collector = CoreMetricCollector.getInstance()
43+
44+
when:
45+
propagator.extract(context, carrier, visitor)
46+
collector.prepareMetrics()
47+
def metrics = collector.drain()
48+
49+
then:
50+
def baggageMetric = metrics.find { it.metricName == "context_header_style.extracted" }
51+
baggageMetric != null
52+
baggageMetric.value >= 1
53+
baggageMetric.tags.contains("header_style:baggage")
54+
}
55+
56+
def "should directly increment all baggage metrics"() {
57+
given:
58+
def baggageMetrics = BaggageMetrics.getInstance()
59+
def collector = CoreMetricCollector.getInstance()
60+
61+
when:
62+
baggageMetrics.onBaggageInjected()
63+
baggageMetrics.onBaggageMalformed()
64+
baggageMetrics.onBaggageTruncatedByByteLimit()
65+
baggageMetrics.onBaggageTruncatedByItemLimit()
66+
collector.prepareMetrics()
67+
def metrics = collector.drain()
68+
69+
then:
70+
def injectedMetric = metrics.find { it.metricName == "context_header_style.injected" }
71+
injectedMetric != null
72+
injectedMetric.value == 1
73+
injectedMetric.tags.contains("header_style:baggage")
74+
75+
def malformedMetric = metrics.find { it.metricName == "context_header_style.malformed" }
76+
malformedMetric != null
77+
malformedMetric.value == 1
78+
malformedMetric.tags.contains("header_style:baggage")
79+
80+
def bytesTruncatedMetric = metrics.find {
81+
it.metricName == "context_header_style.truncated" &&
82+
it.tags.contains("truncation_reason:baggage_byte_count_exceeded")
83+
}
84+
bytesTruncatedMetric != null
85+
bytesTruncatedMetric.value == 1
86+
87+
def itemsTruncatedMetric = metrics.find {
88+
it.metricName == "context_header_style.truncated" &&
89+
it.tags.contains("truncation_reason:baggage_item_count_exceeded")
90+
}
91+
itemsTruncatedMetric != null
92+
itemsTruncatedMetric.value == 1
93+
}
94+
95+
def "should not increment telemetry counter when baggage extraction fails"() {
96+
given:
97+
def config = Mock(Config) {
98+
isBaggageExtract() >> true
99+
isBaggageInject() >> true
100+
getBaggageMaxItems() >> 64
101+
getBaggageMaxBytes() >> 8192
102+
}
103+
def propagator = new BaggagePropagator(config)
104+
def context = Context.root()
105+
def carrier = [:] // No baggage header
106+
def visitor = { map, consumer ->
107+
map.each { k, v -> consumer.accept(k, v) }
108+
}
109+
def collector = CoreMetricCollector.getInstance()
110+
111+
when:
112+
propagator.extract(context, carrier, visitor)
113+
collector.prepareMetrics()
114+
def metrics = collector.drain()
115+
116+
then:
117+
def foundMetrics = metrics.findAll { it.metricName.startsWith("context_header_style.") }
118+
foundMetrics.isEmpty() // No extraction occurred, so no metrics should be created
119+
}
120+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package datadog.trace.api.metrics;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
/** Metrics for baggage propagation operations. */
10+
public class BaggageMetrics {
11+
private static final BaggageMetrics INSTANCE = new BaggageMetrics();
12+
private final AtomicLong extractedCounter = new AtomicLong(0);
13+
private final AtomicLong injectedCounter = new AtomicLong(0);
14+
private final AtomicLong malformedCounter = new AtomicLong(0);
15+
private final AtomicLong truncatedByteCounter = new AtomicLong(0);
16+
private final AtomicLong truncatedItemCounter = new AtomicLong(0);
17+
private final Collection<TaggedCounter> taggedCounters;
18+
19+
public static BaggageMetrics getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
private BaggageMetrics() {
24+
List<TaggedCounter> counters = new ArrayList<>(5);
25+
counters.add(
26+
new TaggedCounter(
27+
"context_header_style.extracted", this.extractedCounter, "header_style:baggage"));
28+
counters.add(
29+
new TaggedCounter(
30+
"context_header_style.injected", this.injectedCounter, "header_style:baggage"));
31+
counters.add(
32+
new TaggedCounter(
33+
"context_header_style.malformed", this.malformedCounter, "header_style:baggage"));
34+
counters.add(
35+
new TaggedCounter(
36+
"context_header_style.truncated",
37+
this.truncatedByteCounter,
38+
"truncation_reason:baggage_byte_count_exceeded"));
39+
counters.add(
40+
new TaggedCounter(
41+
"context_header_style.truncated",
42+
this.truncatedItemCounter,
43+
"truncation_reason:baggage_item_count_exceeded"));
44+
this.taggedCounters = Collections.unmodifiableList(counters);
45+
}
46+
47+
public void onBaggageExtracted() {
48+
this.extractedCounter.incrementAndGet();
49+
}
50+
51+
public void onBaggageInjected() {
52+
this.injectedCounter.incrementAndGet();
53+
}
54+
55+
public void onBaggageMalformed() {
56+
this.malformedCounter.incrementAndGet();
57+
}
58+
59+
public void onBaggageTruncatedByByteLimit() {
60+
this.truncatedByteCounter.incrementAndGet();
61+
}
62+
63+
public void onBaggageTruncatedByItemLimit() {
64+
this.truncatedItemCounter.incrementAndGet();
65+
}
66+
67+
public Collection<TaggedCounter> getTaggedCounters() {
68+
return this.taggedCounters;
69+
}
70+
71+
public static class TaggedCounter implements CoreCounter {
72+
private final String name;
73+
private final AtomicLong counter;
74+
private final String tag;
75+
private long previousCount;
76+
77+
public TaggedCounter(String name, AtomicLong counter, String tag) {
78+
this.name = name;
79+
this.counter = counter;
80+
this.tag = tag;
81+
}
82+
83+
@Override
84+
public String getName() {
85+
return this.name;
86+
}
87+
88+
public String getTag() {
89+
return this.tag;
90+
}
91+
92+
@Override
93+
public long getValue() {
94+
return counter.get();
95+
}
96+
97+
@Override
98+
public long getValueAndReset() {
99+
long count = counter.get();
100+
long delta = count - previousCount;
101+
previousCount = count;
102+
return delta;
103+
}
104+
}
105+
}

internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.api.telemetry;
22

3+
import datadog.trace.api.metrics.BaggageMetrics;
34
import datadog.trace.api.metrics.CoreCounter;
45
import datadog.trace.api.metrics.SpanMetricRegistryImpl;
56
import datadog.trace.api.metrics.SpanMetricsImpl;
@@ -16,6 +17,7 @@ public class CoreMetricCollector implements MetricCollector<CoreMetricCollector.
1617
private static final String INTEGRATION_NAME_TAG = "integration_name:";
1718
private static final CoreMetricCollector INSTANCE = new CoreMetricCollector();
1819
private final SpanMetricRegistryImpl spanMetricRegistry = SpanMetricRegistryImpl.getInstance();
20+
private final BaggageMetrics baggageMetrics = BaggageMetrics.getInstance();
1921

2022
private final BlockingQueue<CoreMetric> metricsQueue;
2123

@@ -29,6 +31,7 @@ private CoreMetricCollector() {
2931

3032
@Override
3133
public void prepareMetrics() {
34+
// Collect span metrics
3235
for (SpanMetricsImpl spanMetrics : this.spanMetricRegistry.getSpanMetrics()) {
3336
String tag = INTEGRATION_NAME_TAG + spanMetrics.getInstrumentationName();
3437
for (CoreCounter counter : spanMetrics.getCounters()) {
@@ -45,6 +48,23 @@ public void prepareMetrics() {
4548
}
4649
}
4750
}
51+
52+
// Collect baggage metrics
53+
for (BaggageMetrics.TaggedCounter counter : this.baggageMetrics.getTaggedCounters()) {
54+
long value = counter.getValueAndReset();
55+
if (value == 0) {
56+
// Skip not updated counters
57+
continue;
58+
}
59+
// Use the specific tag for each baggage metric
60+
String tag = counter.getTag();
61+
CoreMetric metric =
62+
new CoreMetric(METRIC_NAMESPACE, true, counter.getName(), "count", value, tag);
63+
if (!this.metricsQueue.offer(metric)) {
64+
// Stop adding metrics if the queue is full
65+
break;
66+
}
67+
}
4868
}
4969

5070
@Override

0 commit comments

Comments
 (0)