Skip to content

Commit 3215bed

Browse files
authored
Merge branch 'master' into sarahchen6/update-dd-octo-sts
2 parents b448d6a + 0f91094 commit 3215bed

File tree

8 files changed

+471
-5
lines changed

8 files changed

+471
-5
lines changed

.github/workflows/update-docker-build-image.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ jobs:
5050
echo "::notice::Using Docker build image tag: ${TAG}"
5151
- name: Update the Docker build image in GitLab CI config
5252
run: |
53-
sed -i '' -E 's|(BUILDER_IMAGE_VERSION_PREFIX:)[^#]*([#].*)|\1 "${{ steps.define-tag.outputs.tag }}-" \2|' .gitlab-ci.yml
54-
- name: Commit changes
55-
id: create-commit
53+
sed -i -E 's|(BUILDER_IMAGE_VERSION_PREFIX:)[^#]*([#].*)|\1 "${{ steps.define-tag.outputs.tag }}-" \2|' .gitlab-ci.yml
54+
- name: Commit and push changes
55+
env:
56+
GITHUB_TOKEN: ${{ steps.octo-sts.outputs.token }}
5657
run: |
5758
git config user.name "github-actions[bot]"
5859
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/utils/PortUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ private static boolean isPortOpen(String host, int port) {
105105

106106
public static void waitForPortToOpen(
107107
final int port, final long timeout, final TimeUnit unit, final Process process) {
108-
final long waitUntil = System.currentTimeMillis() + unit.toMillis(timeout);
108+
final long startedAt = System.currentTimeMillis();
109+
final long waitUntil = startedAt + unit.toMillis(timeout);
109110

110111
while (System.currentTimeMillis() < waitUntil) {
111112
try {
@@ -133,7 +134,13 @@ public static void waitForPortToOpen(
133134
}
134135
}
135136

136-
throw new RuntimeException("Timed out waiting for port " + port + " to be opened");
137+
throw new RuntimeException(
138+
"Timed out waiting for port "
139+
+ port
140+
+ " to be opened, started to wait at: "
141+
+ startedAt
142+
+ ", timed out at: "
143+
+ System.currentTimeMillis());
137144
}
138145

139146
public static void waitForPortToOpen(String host, int port, long timeout, TimeUnit unit) {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package utils
2+
3+
import datadog.trace.agent.test.AgentTestRunner
4+
import datadog.trace.agent.test.utils.PortUtils
5+
6+
import java.util.concurrent.TimeUnit
7+
8+
class PortUtilsTest extends AgentTestRunner {
9+
def "expect waitForPortToOpen succeed"() {
10+
given:
11+
int port = PortUtils.randomOpenPort()
12+
def socket = new ServerSocket(port) // Emulating port opened.
13+
14+
def process = Mock(Process)
15+
process.isAlive() >> true
16+
17+
when:
18+
PortUtils.waitForPortToOpen(port, 1, TimeUnit.SECONDS, process)
19+
20+
then:
21+
noExceptionThrown()
22+
23+
cleanup:
24+
socket.close()
25+
}
26+
27+
def "expect to handle port open timeout"() {
28+
given:
29+
int port = PortUtils.randomOpenPort()
30+
def process = Mock(Process)
31+
process.isAlive() >> true
32+
33+
when:
34+
PortUtils.waitForPortToOpen(port, 1, TimeUnit.SECONDS, process)
35+
36+
then:
37+
def ex = thrown(RuntimeException)
38+
ex.message.startsWith("Timed out waiting for port $port to be opened, started to wait at:")
39+
}
40+
41+
def "expect to handle process abnormal termination"() {
42+
given:
43+
int port = PortUtils.randomOpenPort()
44+
def process = Mock(Process)
45+
process.isAlive() >> false
46+
process.exitValue() >> 123
47+
48+
when:
49+
PortUtils.waitForPortToOpen(port, 1, TimeUnit.SECONDS, process)
50+
51+
then:
52+
def ex = thrown(RuntimeException)
53+
ex.message == "Process exited abnormally exitCode=123 before port=$port was opened"
54+
}
55+
56+
def "expect to handle process termination before port opened"() {
57+
given:
58+
int port = PortUtils.randomOpenPort()
59+
def process = Mock(Process)
60+
process.isAlive() >> false
61+
process.exitValue() >> 0
62+
63+
when:
64+
PortUtils.waitForPortToOpen(port, 1, TimeUnit.SECONDS, process)
65+
66+
then:
67+
def ex = thrown(RuntimeException)
68+
ex.message == "Process finished before port=$port was opened"
69+
}
70+
}

dd-trace-core/src/main/java/datadog/trace/core/baggage/BaggagePropagator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.context.propagation.CarrierVisitor;
88
import datadog.context.propagation.Propagator;
99
import datadog.trace.api.Config;
10+
import datadog.trace.api.metrics.BaggageMetrics;
1011
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1112
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
1213
import datadog.trace.bootstrap.instrumentation.api.Baggage;
@@ -26,6 +27,7 @@
2627
public class BaggagePropagator implements Propagator {
2728
private static final Logger LOG = LoggerFactory.getLogger(BaggagePropagator.class);
2829
private static final PercentEscaper UTF_ESCAPER = PercentEscaper.create();
30+
private static final BaggageMetrics BAGGAGE_METRICS = BaggageMetrics.getInstance();
2931
static final String BAGGAGE_KEY = "baggage";
3032
private final boolean injectBaggage;
3133
private final boolean extractBaggage;
@@ -89,11 +91,13 @@ public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
8991
processedItems++;
9092
// reached the max number of baggage items allowed
9193
if (processedItems == this.maxItems) {
94+
BAGGAGE_METRICS.onBaggageTruncatedByItemLimit();
9295
break;
9396
}
9497
// Drop newest k/v pair if adding it leads to exceeding the limit
9598
if (currentBytes + escapedKey.size + escapedVal.size + extraBytes > this.maxBytes) {
9699
baggageText.setLength(currentBytes);
100+
BAGGAGE_METRICS.onBaggageTruncatedByByteLimit();
97101
break;
98102
}
99103
currentBytes += escapedKey.size + escapedVal.size + extraBytes;
@@ -103,6 +107,9 @@ public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
103107
// Save header as cache to re-inject it later if baggage did not change
104108
baggage.setW3cHeader(headerValue);
105109
setter.set(carrier, BAGGAGE_KEY, headerValue);
110+
111+
// Record successful baggage injection for telemetry
112+
BAGGAGE_METRICS.onBaggageInjected();
106113
}
107114

108115
@Override
@@ -117,6 +124,9 @@ public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor
117124
return context;
118125
}
119126

127+
// Record successful baggage extraction for telemetry
128+
BAGGAGE_METRICS.onBaggageExtracted();
129+
120130
// TODO: consider a better way to link baggage with the extracted (legacy) TagContext
121131
AgentSpan extractedSpan = AgentSpan.fromContext(context);
122132
if (extractedSpan != null) {
@@ -158,12 +168,14 @@ private Map<String, String> parseBaggageHeaders(String input) {
158168
if (kvSeparatorInd > end) {
159169
LOG.debug(
160170
"Dropping baggage headers due to key with no value {}", input.substring(start, end));
171+
BAGGAGE_METRICS.onBaggageMalformed();
161172
return emptyMap();
162173
}
163174
String key = decode(input.substring(start, kvSeparatorInd).trim());
164175
String value = decode(input.substring(kvSeparatorInd + 1, end).trim());
165176
if (key.isEmpty() || value.isEmpty()) {
166177
LOG.debug("Dropping baggage headers due to empty k/v {}:{}", key, value);
178+
BAGGAGE_METRICS.onBaggageMalformed();
167179
return emptyMap();
168180
}
169181
baggage.put(key, value);
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package datadog.trace.core.baggage
2+
3+
import datadog.context.Context
4+
import datadog.trace.api.Config
5+
import datadog.trace.api.metrics.BaggageMetrics
6+
import datadog.trace.api.telemetry.CoreMetricCollector
7+
import spock.lang.Specification
8+
9+
class BaggagePropagatorTelemetryTest extends Specification {
10+
11+
def "should directly increment baggage metrics"() {
12+
given:
13+
def baggageMetrics = BaggageMetrics.getInstance()
14+
def collector = CoreMetricCollector.getInstance()
15+
16+
when:
17+
baggageMetrics.onBaggageInjected()
18+
collector.prepareMetrics()
19+
def metrics = collector.drain()
20+
21+
then:
22+
def baggageMetric = metrics.find { it.metricName == "context_header_style.injected" }
23+
baggageMetric != null
24+
baggageMetric.value >= 1
25+
baggageMetric.tags.contains("header_style:baggage")
26+
}
27+
28+
def "should increment telemetry counter when baggage is successfully extracted"() {
29+
given:
30+
def config = Mock(Config) {
31+
isBaggageExtract() >> true
32+
isBaggageInject() >> true
33+
getBaggageMaxItems() >> 64
34+
getBaggageMaxBytes() >> 8192
35+
}
36+
def propagator = new BaggagePropagator(config)
37+
def context = Context.root()
38+
def carrier = ["baggage": "key1=value1,key2=value2"]
39+
def visitor = { map, consumer ->
40+
map.each { k, v -> consumer.accept(k, v) }
41+
}
42+
def collector = CoreMetricCollector.getInstance()
43+
44+
when:
45+
propagator.extract(context, carrier, visitor)
46+
collector.prepareMetrics()
47+
def metrics = collector.drain()
48+
49+
then:
50+
def baggageMetric = metrics.find { it.metricName == "context_header_style.extracted" }
51+
baggageMetric != null
52+
baggageMetric.value >= 1
53+
baggageMetric.tags.contains("header_style:baggage")
54+
}
55+
56+
def "should directly increment all baggage metrics"() {
57+
given:
58+
def baggageMetrics = BaggageMetrics.getInstance()
59+
def collector = CoreMetricCollector.getInstance()
60+
61+
when:
62+
baggageMetrics.onBaggageInjected()
63+
baggageMetrics.onBaggageMalformed()
64+
baggageMetrics.onBaggageTruncatedByByteLimit()
65+
baggageMetrics.onBaggageTruncatedByItemLimit()
66+
collector.prepareMetrics()
67+
def metrics = collector.drain()
68+
69+
then:
70+
def injectedMetric = metrics.find { it.metricName == "context_header_style.injected" }
71+
injectedMetric != null
72+
injectedMetric.value == 1
73+
injectedMetric.tags.contains("header_style:baggage")
74+
75+
def malformedMetric = metrics.find { it.metricName == "context_header_style.malformed" }
76+
malformedMetric != null
77+
malformedMetric.value == 1
78+
malformedMetric.tags.contains("header_style:baggage")
79+
80+
def bytesTruncatedMetric = metrics.find {
81+
it.metricName == "context_header_style.truncated" &&
82+
it.tags.contains("truncation_reason:baggage_byte_count_exceeded")
83+
}
84+
bytesTruncatedMetric != null
85+
bytesTruncatedMetric.value == 1
86+
87+
def itemsTruncatedMetric = metrics.find {
88+
it.metricName == "context_header_style.truncated" &&
89+
it.tags.contains("truncation_reason:baggage_item_count_exceeded")
90+
}
91+
itemsTruncatedMetric != null
92+
itemsTruncatedMetric.value == 1
93+
}
94+
95+
def "should not increment telemetry counter when baggage extraction fails"() {
96+
given:
97+
def config = Mock(Config) {
98+
isBaggageExtract() >> true
99+
isBaggageInject() >> true
100+
getBaggageMaxItems() >> 64
101+
getBaggageMaxBytes() >> 8192
102+
}
103+
def propagator = new BaggagePropagator(config)
104+
def context = Context.root()
105+
def carrier = [:] // No baggage header
106+
def visitor = { map, consumer ->
107+
map.each { k, v -> consumer.accept(k, v) }
108+
}
109+
def collector = CoreMetricCollector.getInstance()
110+
111+
when:
112+
propagator.extract(context, carrier, visitor)
113+
collector.prepareMetrics()
114+
def metrics = collector.drain()
115+
116+
then:
117+
def foundMetrics = metrics.findAll { it.metricName.startsWith("context_header_style.") }
118+
foundMetrics.isEmpty() // No extraction occurred, so no metrics should be created
119+
}
120+
}

0 commit comments

Comments
 (0)