Skip to content

Commit 92f0a5a

Browse files
committed
[AdaptiveSampling] Capture all spans in anomaly traces + use cache with TTL for saving traceIDs
1 parent 5ecad33 commit 92f0a5a

File tree

1 file changed

+90
-34
lines changed

1 file changed

+90
-34
lines changed

.github/patches/opentelemetry-java-contrib.patch

Lines changed: 90 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/aws-xray/build.gradle.kts b/aws-xray/build.gradle.kts
2-
index ccec9d52..f764bba9 100644
2+
index ccec9d52..fddbad18 100644
33
--- a/aws-xray/build.gradle.kts
44
+++ b/aws-xray/build.gradle.kts
55
@@ -11,6 +11,7 @@ dependencies {
@@ -10,6 +10,14 @@ index ccec9d52..f764bba9 100644
1010

1111
implementation("com.squareup.okhttp3:okhttp")
1212
implementation("io.opentelemetry:opentelemetry-semconv")
13+
@@ -24,6 +25,7 @@ dependencies {
14+
15+
implementation("com.fasterxml.jackson.core:jackson-core")
16+
implementation("com.fasterxml.jackson.core:jackson-databind")
17+
+ implementation("com.github.ben-manes.caffeine:caffeine:2.9.3")
18+
19+
testImplementation("com.linecorp.armeria:armeria-junit5")
20+
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
1321
diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java
1422
new file mode 100644
1523
index 00000000..41f22f90
@@ -894,15 +902,17 @@ index 1ef8abf5..328e63dd 100644
894902
}
895903
}
896904
diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java
897-
index 75977dc0..a60fec96 100644
905+
index 75977dc0..7cd9bcc5 100644
898906
--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java
899907
+++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java
900-
@@ -5,42 +5,78 @@
908+
@@ -5,42 +5,81 @@
901909

902910
package io.opentelemetry.contrib.awsxray;
903911

904912
+import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
905913
+
914+
+import com.github.benmanes.caffeine.cache.Cache;
915+
+import com.github.benmanes.caffeine.cache.Caffeine;
906916
+import io.opentelemetry.api.common.AttributeKey;
907917
import io.opentelemetry.api.common.Attributes;
908918
+import io.opentelemetry.api.trace.Span;
@@ -922,6 +932,7 @@ index 75977dc0..a60fec96 100644
922932
+import java.nio.charset.StandardCharsets;
923933
+import java.security.MessageDigest;
924934
+import java.security.NoSuchAlgorithmException;
935+
+import java.time.Duration;
925936
import java.util.Arrays;
926937
import java.util.Comparator;
927938
import java.util.Date;
@@ -930,7 +941,6 @@ index 75977dc0..a60fec96 100644
930941
import java.util.Map;
931942
import java.util.Objects;
932943
import java.util.Set;
933-
+import java.util.concurrent.ConcurrentHashMap;
934944
+import java.util.function.Consumer;
935945
import java.util.logging.Level;
936946
import java.util.logging.Logger;
@@ -961,7 +971,8 @@ index 75977dc0..a60fec96 100644
961971
+ private final Map<String, String> hashToRuleMap;
962972
+
963973
+ private final boolean adaptiveSamplingRuleExists;
964-
+ private final Set<String> anomalyTracesSet;
974+
+ private final Cache<String, Boolean> anomalyTracesCache;
975+
+ private final Cache<String, Boolean> capturedTraceIdCache;
965976
+
966977
+ @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig;
967978
+ @Nullable private RateLimiter anomalyCaptureRateLimiter;
@@ -977,7 +988,7 @@ index 75977dc0..a60fec96 100644
977988
this(
978989
clientId,
979990
resource,
980-
@@ -49,8 +85,17 @@ final class XrayRulesSampler implements Sampler {
991+
@@ -49,8 +88,17 @@ final class XrayRulesSampler implements Sampler {
981992
rules.stream()
982993
// Lower priority value takes precedence so normal ascending sort.
983994
.sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority))
@@ -997,7 +1008,7 @@ index 75977dc0..a60fec96 100644
9971008
}
9981009

9991010
private XrayRulesSampler(
1000-
@@ -58,12 +103,32 @@ final class XrayRulesSampler implements Sampler {
1011+
@@ -58,12 +106,42 @@ final class XrayRulesSampler implements Sampler {
10011012
Resource resource,
10021013
Clock clock,
10031014
Sampler fallbackSampler,
@@ -1018,8 +1029,18 @@ index 75977dc0..a60fec96 100644
10181029
+ }
10191030
+ this.adaptiveSamplingRuleExists = adaptiveSamplingRuleExists;
10201031
+ this.adaptiveSamplingConfig = adaptiveSamplingConfig;
1021-
+ // The set is self-clearing, when spans close they are removed from the set
1022-
+ this.anomalyTracesSet = ConcurrentHashMap.newKeySet(100_000);
1032+
+ this.anomalyTracesCache =
1033+
+ Caffeine.newBuilder()
1034+
+ .maximumSize(100_000)
1035+
+ .ticker(clock::nanoTime)
1036+
+ .expireAfterWrite(Duration.ofMinutes(10))
1037+
+ .build();
1038+
+ this.capturedTraceIdCache =
1039+
+ Caffeine.newBuilder()
1040+
+ .maximumSize(10_000)
1041+
+ .ticker(clock::nanoTime)
1042+
+ .expireAfterWrite(Duration.ofMinutes(1))
1043+
+ .build();
10231044
+
10241045
+ // Initialize anomaly capture rate limiter if error capture limit is configured
10251046
+ if (adaptiveSamplingConfig != null && adaptiveSamplingConfig.getErrorCaptureLimit() != null) {
@@ -1031,7 +1052,7 @@ index 75977dc0..a60fec96 100644
10311052
}
10321053

10331054
@Override
1034-
@@ -74,10 +139,36 @@ final class XrayRulesSampler implements Sampler {
1055+
@@ -74,10 +152,36 @@ final class XrayRulesSampler implements Sampler {
10351056
SpanKind spanKind,
10361057
Attributes attributes,
10371058
List<LinkData> parentLinks) {
@@ -1070,7 +1091,7 @@ index 75977dc0..a60fec96 100644
10701091
}
10711092
}
10721093

1073-
@@ -96,7 +187,161 @@ final class XrayRulesSampler implements Sampler {
1094+
@@ -96,7 +200,164 @@ final class XrayRulesSampler implements Sampler {
10741095
return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}";
10751096
}
10761097

@@ -1175,10 +1196,12 @@ index 75977dc0..a60fec96 100644
11751196
+ parentContext == null || !parentContext.isValid() || parentContext.isRemote();
11761197
+
11771198
+ // Anomaly Capture
1178-
+ if (shouldCaptureAnomalySpan
1179-
+ && !span.getSpanContext().isSampled()
1180-
+ && anomalyCaptureRateLimiter != null
1181-
+ && anomalyCaptureRateLimiter.trySpend(1)) {
1199+
+ if (capturedTraceIdCache.getIfPresent(traceId) != null
1200+
+ || (shouldCaptureAnomalySpan
1201+
+ && !span.getSpanContext().isSampled()
1202+
+ && anomalyCaptureRateLimiter != null
1203+
+ && anomalyCaptureRateLimiter.trySpend(1))) {
1204+
+ capturedTraceIdCache.put(traceId, true);
11821205
+ spanBatcher.accept(span);
11831206
+ }
11841207
+
@@ -1217,14 +1240,15 @@ index 75977dc0..a60fec96 100644
12171240
+ if (shouldBoostSampling
12181241
+ && ruleToReportTo != null
12191242
+ && ruleToReportTo.hasBoost()
1220-
+ && this.anomalyTracesSet.add(traceId)) {
1243+
+ && this.anomalyTracesCache.getIfPresent(traceId) == null) {
1244+
+ this.anomalyTracesCache.put(traceId, true);
12211245
+ ruleToReportTo.countAnomalyTrace(span);
12221246
+ }
12231247
+ if (isLocalRootSpan) {
12241248
+ if (ruleToReportTo != null && ruleToReportTo.hasBoost()) {
12251249
+ ruleToReportTo.countTrace();
12261250
+ }
1227-
+ this.anomalyTracesSet.remove(traceId);
1251+
+ this.anomalyTracesCache.invalidate(traceId);
12281252
+ }
12291253
+ }
12301254
+ }
@@ -1233,7 +1257,7 @@ index 75977dc0..a60fec96 100644
12331257
return Arrays.stream(ruleAppliers)
12341258
.map(rule -> rule.snapshot(now))
12351259
.filter(Objects::nonNull)
1236-
@@ -115,15 +360,16 @@ final class XrayRulesSampler implements Sampler {
1260+
@@ -115,15 +376,16 @@ final class XrayRulesSampler implements Sampler {
12371261
Map<String, SamplingTargetDocument> ruleTargets,
12381262
Set<String> requestedTargetRuleNames,
12391263
Date now) {
@@ -1252,7 +1276,7 @@ index 75977dc0..a60fec96 100644
12521276
}
12531277
if (requestedTargetRuleNames.contains(rule.getRuleName())) {
12541278
// In practice X-Ray should return a target for any rule we requested but
1255-
@@ -135,6 +381,90 @@ final class XrayRulesSampler implements Sampler {
1279+
@@ -135,6 +397,90 @@ final class XrayRulesSampler implements Sampler {
12561280
return rule;
12571281
})
12581282
.toArray(SamplingRuleApplier[]::new);
@@ -1340,8 +1364,8 @@ index 75977dc0..a60fec96 100644
13401364
+ }
13411365
+
13421366
+ // For testing
1343-
+ Set<String> getAnomalyTracesSet() {
1344-
+ return anomalyTracesSet;
1367+
+ Cache<String, Boolean> getAnomalyTracesCache() {
1368+
+ return anomalyTracesCache;
13451369
}
13461370
}
13471371
diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java
@@ -1981,7 +2005,7 @@ index 6bb6e82a..6d71711b 100644
19812005
return applier.shouldSample(
19822006
Context.current(),
19832007
diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java
1984-
index 1ca8df34..3c1dfda6 100644
2008+
index 1ca8df34..f6a24af8 100644
19852009
--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java
19862010
+++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java
19872011
@@ -5,17 +5,28 @@
@@ -2201,7 +2225,7 @@ index 1ca8df34..3c1dfda6 100644
22012225

22022226
// Minimum is batTarget, 5s from now
22032227
assertThat(sampler.nextTargetFetchTimeNanos())
2204-
@@ -169,6 +251,731 @@ class XrayRulesSamplerTest {
2228+
@@ -169,6 +251,763 @@ class XrayRulesSamplerTest {
22052229
assertThat(sampler.snapshot(Date.from(now))).hasSize(4);
22062230
}
22072231

@@ -2474,7 +2498,7 @@ index 1ca8df34..3c1dfda6 100644
24742498
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
24752499
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
24762500
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2477-
+ assertThat(sampler.getAnomalyTracesSet().isEmpty()).isEqualTo(true);
2501+
+ assertThat(sampler.getAnomalyTracesCache().asMap().size()).isEqualTo(0);
24782502
+ }
24792503
+
24802504
+ @Test
@@ -2545,15 +2569,17 @@ index 1ca8df34..3c1dfda6 100644
25452569
+ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault()));
25462570
+ SpanData spanDataMock = mock(SpanData.class);
25472571
+ Attributes attributesMock = mock(Attributes.class);
2548-
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID");
25492572
+ when(spanDataMock.getAttributes()).thenReturn(attributesMock);
25502573
+ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L);
25512574
+ LongAdder exportCounter = new LongAdder();
25522575
+ Consumer<ReadableSpan> stubbedConsumer = x -> exportCounter.increment();
25532576
+
25542577
+ // First span should be captured, second should be rate limited
2578+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1");
25552579
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2580+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2");
25562581
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2582+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3");
25572583
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
25582584
+ // Only first span captured due to rate limiting
25592585
+ assertThat(exportCounter.sumThenReset()).isEqualTo(2L);
@@ -2595,7 +2621,7 @@ index 1ca8df34..3c1dfda6 100644
25952621
+ assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0);
25962622
+ assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0);
25972623
+ assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0);
2598-
+ assertThat(sampler.getAnomalyTracesSet().isEmpty()).isEqualTo(true);
2624+
+ assertThat(sampler.getAnomalyTracesCache().asMap().size()).isEqualTo(0);
25992625
+ }
26002626
+
26012627
+ @Test
@@ -2686,15 +2712,17 @@ index 1ca8df34..3c1dfda6 100644
26862712
+
26872713
+ SpanData spanDataMock = mock(SpanData.class);
26882714
+ Attributes attributesMock = mock(Attributes.class);
2689-
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID");
26902715
+ when(spanDataMock.getAttributes()).thenReturn(attributesMock);
26912716
+ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L);
26922717
+
26932718
+ LongAdder exportCounter = new LongAdder();
26942719
+ Consumer<ReadableSpan> stubbedConsumer = x -> exportCounter.increment();
26952720
+
2721+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1");
26962722
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2723+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2");
26972724
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2725+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3");
26982726
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
26992727
+ assertThat(exportCounter.sum()).isEqualTo(2L);
27002728
+ }
@@ -2752,15 +2780,17 @@ index 1ca8df34..3c1dfda6 100644
27522780
+
27532781
+ SpanData spanDataMock = mock(SpanData.class);
27542782
+ Attributes attributesMock = mock(Attributes.class);
2755-
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID");
27562783
+ when(spanDataMock.getAttributes()).thenReturn(attributesMock);
27572784
+ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L);
27582785
+
27592786
+ LongAdder exportCounter = new LongAdder();
27602787
+ Consumer<ReadableSpan> stubbedConsumer = x -> exportCounter.add(1);
27612788
+
2789+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1");
27622790
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2791+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2");
27632792
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2793+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3");
27642794
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
27652795
+ assertThat(exportCounter.sum()).isEqualTo(2L);
27662796
+ }
@@ -2826,22 +2856,23 @@ index 1ca8df34..3c1dfda6 100644
28262856
+ LongAdder exportCounter = new LongAdder();
28272857
+ Consumer<ReadableSpan> stubbedConsumer = x -> exportCounter.add(1);
28282858
+
2829-
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2830-
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2859+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1");
28312860
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
28322861
+ assertThat(exportCounter.sum()).isEqualTo(0L);
28332862
+
28342863
+ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L);
28352864
+ when(readableSpanMock.getLatencyNanos()).thenReturn(1L);
2836-
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2837-
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2865+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2");
28382866
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
28392867
+ assertThat(exportCounter.sum()).isEqualTo(0L);
28402868
+
28412869
+ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L);
28422870
+ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms
2871+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3");
28432872
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2873+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4");
28442874
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2875+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID5");
28452876
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
28462877
+ assertThat(exportCounter.sum()).isEqualTo(2L);
28472878
+ }
@@ -2916,18 +2947,43 @@ index 1ca8df34..3c1dfda6 100644
29162947
+ assertThat(exportCounter.sumThenReset()).isEqualTo(2L);
29172948
+
29182949
+ // Test non-matching operation
2950+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID_2");
29192951
+ when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext");
29202952
+ when(attributesMock.get(HTTP_METHOD)).thenReturn("POST");
29212953
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
29222954
+ when(attributesMock.get(URL_PATH)).thenReturn("/non-matching");
29232955
+ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET");
29242956
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2925-
+ assertThat(exportCounter.sum()).isEqualTo(0L);
2957+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(0L);
29262958
+
29272959
+ // Test aws.local.operation takes priority
29282960
+ when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /api1");
29292961
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2930-
+ assertThat(exportCounter.sum()).isEqualTo(1L);
2962+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(1L);
2963+
+
2964+
+ // Test sending previously matched traceIDs gets captured
2965+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID");
2966+
+ when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /non-matching");
2967+
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2968+
+ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID_2");
2969+
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2970+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(2L);
2971+
+
2972+
+ // Test sending previously matched traceIDs gets captured as long as trace is active
2973+
+ clock.advance(Duration.ofSeconds(45));
2974+
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2975+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(1L);
2976+
+ clock.advance(Duration.ofSeconds(45));
2977+
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2978+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(1L);
2979+
+ clock.advance(Duration.ofSeconds(45));
2980+
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2981+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(1L);
2982+
+
2983+
+ // Test sending non-matching trace after expire-time elapses
2984+
+ clock.advance(Duration.ofMinutes(100));
2985+
+ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer);
2986+
+ assertThat(exportCounter.sumThenReset()).isEqualTo(0L);
29312987
+ }
29322988
+
29332989
private static SamplingResult doSample(Sampler sampler, String name) {

0 commit comments

Comments
 (0)