Skip to content

Commit fec6bcd

Browse files
committed
Add AWS X-Ray Adaptive Sampling Support
1 parent d71dab4 commit fec6bcd

15 files changed

+2151
-83
lines changed

aws-xray/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dependencies {
1111
api("io.opentelemetry:opentelemetry-sdk-trace")
1212

1313
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
14+
implementation("io.opentelemetry.semconv:opentelemetry-semconv:1.32.0-alpha")
1415

1516
implementation("com.squareup.okhttp3:okhttp")
1617
implementation("io.opentelemetry.semconv:opentelemetry-semconv")
@@ -25,6 +26,7 @@ dependencies {
2526

2627
implementation("com.fasterxml.jackson.core:jackson-core")
2728
implementation("com.fasterxml.jackson.core:jackson-databind")
29+
implementation("com.github.ben-manes.caffeine:caffeine:2.9.3")
2830

2931
testImplementation("com.linecorp.armeria:armeria-junit5")
3032
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.awsxray;
7+
8+
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.api.trace.TraceState;
10+
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
11+
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
12+
13+
final class AwsSamplingResult implements SamplingResult {
14+
15+
// OTel trace state is a space shared with other vendors with a 256 character limit
16+
// We keep the key and values as short as possible while still identifiable
17+
public static final String AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY = "xrsr";
18+
19+
private final SamplingDecision decision;
20+
private final Attributes attributes;
21+
private final String samplingRuleName;
22+
23+
private AwsSamplingResult(
24+
SamplingDecision decision, Attributes attributes, String samplingRuleName) {
25+
this.decision = decision;
26+
this.attributes = attributes;
27+
this.samplingRuleName = samplingRuleName;
28+
}
29+
30+
static AwsSamplingResult create(
31+
SamplingDecision decision, Attributes attributes, String samplingRuleName) {
32+
return new AwsSamplingResult(decision, attributes, samplingRuleName);
33+
}
34+
35+
@Override
36+
public SamplingDecision getDecision() {
37+
return decision;
38+
}
39+
40+
@Override
41+
public Attributes getAttributes() {
42+
return attributes;
43+
}
44+
45+
@Override
46+
public TraceState getUpdatedTraceState(TraceState parentTraceState) {
47+
if (parentTraceState.get(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY) == null) {
48+
return parentTraceState.toBuilder()
49+
.put(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, samplingRuleName)
50+
.build();
51+
}
52+
return parentTraceState;
53+
}
54+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.awsxray;
7+
8+
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import com.fasterxml.jackson.annotation.JsonProperty;
10+
import com.fasterxml.jackson.annotation.JsonValue;
11+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
12+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
13+
import com.google.auto.value.AutoValue;
14+
import java.util.List;
15+
import javax.annotation.Nullable;
16+
17+
@AutoValue
18+
@JsonSerialize(as = AwsXrayAdaptiveSamplingConfig.class)
19+
@JsonDeserialize(builder = AutoValue_AwsXrayAdaptiveSamplingConfig.Builder.class)
20+
public abstract class AwsXrayAdaptiveSamplingConfig {
21+
22+
@JsonProperty("version")
23+
public abstract double getVersion();
24+
25+
@JsonProperty("anomalyConditions")
26+
@Nullable
27+
public abstract List<AnomalyConditions> getAnomalyConditions();
28+
29+
@JsonProperty("anomalyCaptureLimit")
30+
@Nullable
31+
public abstract AnomalyCaptureLimit getAnomalyCaptureLimit();
32+
33+
public static Builder builder() {
34+
return new AutoValue_AwsXrayAdaptiveSamplingConfig.Builder();
35+
}
36+
37+
public Builder toBuilder() {
38+
Builder b = builder();
39+
b.setVersion(getVersion());
40+
if (getAnomalyConditions() != null) {
41+
b.setAnomalyConditions(getAnomalyConditions());
42+
}
43+
if (getAnomalyCaptureLimit() != null) {
44+
b.setAnomalyCaptureLimit(getAnomalyCaptureLimit());
45+
}
46+
return b;
47+
}
48+
49+
@AutoValue.Builder
50+
public abstract static class Builder {
51+
@JsonProperty("version")
52+
public abstract Builder setVersion(double value);
53+
54+
@JsonProperty("anomalyConditions")
55+
public abstract Builder setAnomalyConditions(List<AnomalyConditions> value);
56+
57+
@JsonProperty("anomalyCaptureLimit")
58+
public abstract Builder setAnomalyCaptureLimit(AnomalyCaptureLimit value);
59+
60+
public abstract AwsXrayAdaptiveSamplingConfig build();
61+
}
62+
63+
@AutoValue
64+
@JsonDeserialize(
65+
builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder.class)
66+
public abstract static class AnomalyConditions {
67+
@JsonProperty("errorCodeRegex")
68+
@Nullable
69+
public abstract String getErrorCodeRegex();
70+
71+
@JsonProperty("operations")
72+
@Nullable
73+
public abstract List<String> getOperations();
74+
75+
@JsonProperty("highLatencyMs")
76+
@Nullable
77+
public abstract Long getHighLatencyMs();
78+
79+
@JsonProperty("usage")
80+
@Nullable
81+
public abstract UsageType getUsage();
82+
83+
public static Builder builder() {
84+
return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder();
85+
}
86+
87+
@AutoValue.Builder
88+
public abstract static class Builder {
89+
@JsonProperty("errorCodeRegex")
90+
public abstract Builder setErrorCodeRegex(String value);
91+
92+
@JsonProperty("operations")
93+
public abstract Builder setOperations(List<String> value);
94+
95+
@JsonProperty("highLatencyMs")
96+
public abstract Builder setHighLatencyMs(Long value);
97+
98+
@JsonProperty("usage")
99+
public abstract Builder setUsage(UsageType value);
100+
101+
public abstract AnomalyConditions build();
102+
}
103+
}
104+
105+
public enum UsageType {
106+
BOTH("both"),
107+
SAMPLING_BOOST("sampling-boost"),
108+
ANOMALY_TRACE_CAPTURE("anomaly-trace-capture"),
109+
NEITHER("neither"); // Not meant to be used by customers
110+
111+
private final String value;
112+
113+
UsageType(String value) {
114+
this.value = value;
115+
}
116+
117+
@JsonValue
118+
public String getValue() {
119+
return value;
120+
}
121+
122+
@JsonCreator
123+
public static UsageType fromValue(String value) {
124+
for (UsageType type : values()) {
125+
if (type.value.equals(value)) {
126+
return type;
127+
}
128+
}
129+
throw new IllegalArgumentException("Invalid usage value: " + value);
130+
}
131+
132+
public static boolean isUsedForBoost(UsageType usage) {
133+
return BOTH.equals(usage) || SAMPLING_BOOST.equals(usage);
134+
}
135+
136+
public static boolean isUsedForAnomalyTraceCapture(UsageType usage) {
137+
return BOTH.equals(usage) || ANOMALY_TRACE_CAPTURE.equals(usage);
138+
}
139+
}
140+
141+
@AutoValue
142+
@JsonDeserialize(
143+
builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyCaptureLimit.Builder.class)
144+
public abstract static class AnomalyCaptureLimit {
145+
@JsonProperty("anomalyTracesPerSecond")
146+
public abstract int getAnomalyTracesPerSecond();
147+
148+
public static Builder builder() {
149+
return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyCaptureLimit.Builder();
150+
}
151+
152+
@AutoValue.Builder
153+
public abstract static class Builder {
154+
@JsonProperty("anomalyTracesPerSecond")
155+
public abstract Builder setAnomalyTracesPerSecond(int value);
156+
157+
public abstract AnomalyCaptureLimit build();
158+
}
159+
}
160+
}

aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,22 @@
99
import io.opentelemetry.api.trace.SpanKind;
1010
import io.opentelemetry.context.Context;
1111
import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRuleRecord;
12+
import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument;
1213
import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument;
1314
import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument;
1415
import io.opentelemetry.sdk.common.Clock;
1516
import io.opentelemetry.sdk.resources.Resource;
17+
import io.opentelemetry.sdk.trace.ReadableSpan;
1618
import io.opentelemetry.sdk.trace.data.LinkData;
19+
import io.opentelemetry.sdk.trace.data.SpanData;
20+
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
21+
import io.opentelemetry.sdk.trace.export.SpanExporter;
1722
import io.opentelemetry.sdk.trace.samplers.Sampler;
1823
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
1924
import java.io.Closeable;
2025
import java.time.Duration;
2126
import java.time.Instant;
27+
import java.util.ArrayList;
2228
import java.util.Date;
2329
import java.util.Iterator;
2430
import java.util.List;
@@ -43,6 +49,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable {
4349

4450
private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName());
4551

52+
// Default batch size to be same as OTel BSP default
53+
private static final int maxExportBatchSize = 512;
54+
4655
private final Resource resource;
4756
private final Clock clock;
4857
private final Sampler initialSampler;
@@ -59,6 +68,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable {
5968
@Nullable private volatile XrayRulesSampler internalXrayRulesSampler;
6069
private volatile Sampler sampler;
6170

71+
@Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig;
72+
@Nullable private BatchSpanProcessor bsp;
73+
6274
/**
6375
* Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link
6476
* Resource} should be the same as what the OpenTelemetry SDK is configured with.
@@ -120,6 +132,40 @@ public String getDescription() {
120132
return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}";
121133
}
122134

135+
public void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) {
136+
if (this.adaptiveSamplingConfig != null) {
137+
throw new IllegalStateException("Programming bug - Adaptive sampling config is already set");
138+
} else if (config != null && this.adaptiveSamplingConfig == null) {
139+
// Save here and also pass to XrayRulesSampler directly as it already exists
140+
this.adaptiveSamplingConfig = config;
141+
if (internalXrayRulesSampler != null) {
142+
internalXrayRulesSampler.setAdaptiveSamplingConfig(config);
143+
}
144+
}
145+
}
146+
147+
public void setSpanExporter(SpanExporter spanExporter) {
148+
if (this.bsp != null) {
149+
throw new IllegalStateException("Programming bug - BatchSpanProcessor is already set");
150+
} else if (spanExporter != null && this.bsp == null) {
151+
this.bsp =
152+
BatchSpanProcessor.builder(spanExporter)
153+
.setExportUnsampledSpans(true) // Required to capture the unsampled anomaly spans
154+
.setMaxExportBatchSize(maxExportBatchSize)
155+
.build();
156+
}
157+
}
158+
159+
public void adaptSampling(ReadableSpan span, SpanData spanData) {
160+
if (this.bsp == null) {
161+
throw new IllegalStateException(
162+
"Programming bug - BatchSpanProcessor is null while trying to adapt sampling");
163+
}
164+
if (internalXrayRulesSampler != null) {
165+
internalXrayRulesSampler.adaptSampling(span, spanData, this.bsp::onEnd);
166+
}
167+
}
168+
123169
private void getAndUpdateSampler() {
124170
try {
125171
// No pagination support yet, or possibly ever.
@@ -134,8 +180,8 @@ private void getAndUpdateSampler() {
134180
initialSampler,
135181
response.getSamplingRules().stream()
136182
.map(SamplingRuleRecord::getRule)
137-
.collect(Collectors.toList())));
138-
183+
.collect(Collectors.toList()),
184+
adaptiveSamplingConfig));
139185
previousRulesResponse = response;
140186
ScheduledFuture<?> existingFetchTargetsFuture = fetchTargetsFuture;
141187
if (existingFetchTargetsFuture != null) {
@@ -179,14 +225,29 @@ private void fetchTargets() {
179225
XrayRulesSampler xrayRulesSampler = this.internalXrayRulesSampler;
180226
try {
181227
Date now = Date.from(Instant.ofEpochSecond(0, clock.now()));
182-
List<SamplingStatisticsDocument> statistics = xrayRulesSampler.snapshot(now);
228+
List<SamplingRuleApplier.SamplingRuleStatisticsSnapshot> statisticsSnapshot =
229+
xrayRulesSampler.snapshot(now);
230+
List<SamplingStatisticsDocument> statistics = new ArrayList<SamplingStatisticsDocument>();
231+
List<SamplingBoostStatisticsDocument> boostStatistics =
232+
new ArrayList<SamplingBoostStatisticsDocument>();
233+
statisticsSnapshot.stream()
234+
.forEach(
235+
snapshot -> {
236+
if (snapshot.getStatisticsDocument() != null) {
237+
statistics.add(snapshot.getStatisticsDocument());
238+
}
239+
if (snapshot.getBoostStatisticsDocument() != null
240+
&& snapshot.getBoostStatisticsDocument().getTotalCount() > 0) {
241+
boostStatistics.add(snapshot.getBoostStatisticsDocument());
242+
}
243+
});
183244
Set<String> requestedTargetRuleNames =
184245
statistics.stream()
185246
.map(SamplingStatisticsDocument::getRuleName)
186247
.collect(Collectors.toSet());
187248

188-
GetSamplingTargetsResponse response =
189-
client.getSamplingTargets(GetSamplingTargetsRequest.create(statistics));
249+
GetSamplingTargetsRequest req = GetSamplingTargetsRequest.create(statistics, boostStatistics);
250+
GetSamplingTargetsResponse response = client.getSamplingTargets(req);
190251
Map<String, SamplingTargetDocument> targets =
191252
response.getDocuments().stream()
192253
.collect(Collectors.toMap(SamplingTargetDocument::getRuleName, Function.identity()));

0 commit comments

Comments
 (0)