From 7a3deb0d7cb06ca223568503267ccf7eaee1b6ba Mon Sep 17 00:00:00 2001 From: Mahad Janjua Date: Tue, 12 Aug 2025 11:20:25 -0700 Subject: [PATCH] AWS X-Ray Adaptive Sampling Support --- .../patches/opentelemetry-java-contrib.patch | 2737 ++++++++++++++++- awsagentprovider/build.gradle.kts | 4 +- .../AwsAgentPropertiesCustomizerProvider.java | 2 +- ...sApplicationSignalsCustomizerProvider.java | 77 +- .../javaagent/providers/AwsAttributeKeys.java | 3 + .../providers/AwsSpanMetricsProcessor.java | 18 +- .../AwsSpanMetricsProcessorBuilder.java | 21 +- ...licationSignalsCustomizerProviderTest.java | 60 + .../AwsSpanMetricsProcessorTest.java | 6 +- 9 files changed, 2886 insertions(+), 42 deletions(-) create mode 100644 awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java diff --git a/.github/patches/opentelemetry-java-contrib.patch b/.github/patches/opentelemetry-java-contrib.patch index 65a6f8b780..ad578ae7b6 100644 --- a/.github/patches/opentelemetry-java-contrib.patch +++ b/.github/patches/opentelemetry-java-contrib.patch @@ -1,11 +1,564 @@ +diff --git a/aws-xray/build.gradle.kts b/aws-xray/build.gradle.kts +index ccec9d52..f764bba9 100644 +--- a/aws-xray/build.gradle.kts ++++ b/aws-xray/build.gradle.kts +@@ -11,6 +11,7 @@ dependencies { + api("io.opentelemetry:opentelemetry-sdk-trace") + + compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") ++ implementation("io.opentelemetry.semconv:opentelemetry-semconv:1.32.0-alpha") + + implementation("com.squareup.okhttp3:okhttp") + implementation("io.opentelemetry:opentelemetry-semconv") +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java +new file mode 100644 +index 00000000..2d547990 +--- /dev/null ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java +@@ -0,0 +1,52 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.contrib.awsxray; ++ ++import io.opentelemetry.api.common.Attributes; ++import io.opentelemetry.api.trace.TraceState; ++import io.opentelemetry.sdk.trace.samplers.SamplingDecision; ++import io.opentelemetry.sdk.trace.samplers.SamplingResult; ++ ++final class AwsSamplingResult implements SamplingResult { ++ ++ public static final String AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY = "awsxraysamplingrule"; ++ ++ private final SamplingDecision decision; ++ private final Attributes attributes; ++ private final String samplingRuleName; ++ ++ private AwsSamplingResult( ++ SamplingDecision decision, Attributes attributes, String samplingRuleName) { ++ this.decision = decision; ++ this.attributes = attributes; ++ this.samplingRuleName = samplingRuleName; ++ } ++ ++ static AwsSamplingResult create( ++ SamplingDecision decision, Attributes attributes, String samplingRuleName) { ++ return new AwsSamplingResult(decision, attributes, samplingRuleName); ++ } ++ ++ @Override ++ public SamplingDecision getDecision() { ++ return decision; ++ } ++ ++ @Override ++ public Attributes getAttributes() { ++ return attributes; ++ } ++ ++ @Override ++ public TraceState getUpdatedTraceState(TraceState parentTraceState) { ++ if (parentTraceState.get(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY) == null) { ++ return parentTraceState.toBuilder() ++ .put(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, samplingRuleName) ++ .build(); ++ } ++ return parentTraceState; ++ } ++} +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java +new file mode 100644 +index 00000000..836ebf98 +--- /dev/null ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java +@@ -0,0 +1,139 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.contrib.awsxray; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++import com.fasterxml.jackson.annotation.JsonValue; ++import com.fasterxml.jackson.databind.annotation.JsonDeserialize; ++import com.fasterxml.jackson.databind.annotation.JsonSerialize; ++import com.google.auto.value.AutoValue; ++import java.util.List; ++import javax.annotation.Nullable; ++ ++@AutoValue ++@JsonSerialize(as = AwsXrayAdaptiveSamplingConfig.class) ++@JsonDeserialize(builder = AutoValue_AwsXrayAdaptiveSamplingConfig.Builder.class) ++public abstract class AwsXrayAdaptiveSamplingConfig { ++ ++ @JsonProperty("version") ++ public abstract double getVersion(); ++ ++ @JsonProperty("anomalyConditions") ++ @Nullable ++ public abstract List getAnomalyConditions(); ++ ++ @JsonProperty("errorCaptureLimit") ++ @Nullable ++ public abstract ErrorCaptureLimit getErrorCaptureLimit(); ++ ++ public static Builder builder() { ++ return new AutoValue_AwsXrayAdaptiveSamplingConfig.Builder(); ++ } ++ ++ @AutoValue.Builder ++ public abstract static class Builder { ++ @JsonProperty("version") ++ public abstract Builder setVersion(double value); ++ ++ @JsonProperty("anomalyConditions") ++ public abstract Builder setAnomalyConditions(List value); ++ ++ @JsonProperty("errorCaptureLimit") ++ public abstract Builder setErrorCaptureLimit(ErrorCaptureLimit value); ++ ++ public abstract AwsXrayAdaptiveSamplingConfig build(); ++ } ++ ++ @AutoValue ++ @JsonDeserialize( ++ builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder.class) ++ public abstract static class AnomalyConditions { ++ @JsonProperty("errorCodeRegex") ++ @Nullable ++ public abstract String getErrorCodeRegex(); ++ ++ @JsonProperty("operations") ++ @Nullable ++ public abstract List getOperations(); ++ ++ @JsonProperty("highLatencyMs") ++ @Nullable ++ public abstract Long getHighLatencyMs(); ++ ++ @JsonProperty("usage") ++ @Nullable ++ public abstract UsageType getUsage(); ++ ++ public static Builder builder() { ++ return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder(); ++ } ++ ++ @AutoValue.Builder ++ public abstract static class Builder { ++ @JsonProperty("errorCodeRegex") ++ public abstract Builder setErrorCodeRegex(String value); ++ ++ @JsonProperty("operations") ++ public abstract Builder setOperations(List value); ++ ++ @JsonProperty("highLatencyMs") ++ public abstract Builder setHighLatencyMs(Long value); ++ ++ @JsonProperty("usage") ++ public abstract Builder setUsage(UsageType value); ++ ++ public abstract AnomalyConditions build(); ++ } ++ } ++ ++ public enum UsageType { ++ BOTH("both"), ++ SAMPLING_BOOST("sampling-boost"), ++ ERROR_SPAN_CAPTURE("error-span-capture"); ++ ++ private final String value; ++ ++ UsageType(String value) { ++ this.value = value; ++ } ++ ++ @JsonValue ++ public String getValue() { ++ return value; ++ } ++ ++ @JsonCreator ++ public static UsageType fromValue(String value) { ++ for (UsageType type : values()) { ++ if (type.value.equals(value)) { ++ return type; ++ } ++ } ++ throw new IllegalArgumentException("Invalid usage value: " + value); ++ } ++ } ++ ++ @AutoValue ++ @JsonDeserialize( ++ builder = AutoValue_AwsXrayAdaptiveSamplingConfig_ErrorCaptureLimit.Builder.class) ++ public abstract static class ErrorCaptureLimit { ++ @JsonProperty("errorSpansPerSecond") ++ public abstract int getErrorSpansPerSecond(); ++ ++ public static Builder builder() { ++ return new AutoValue_AwsXrayAdaptiveSamplingConfig_ErrorCaptureLimit.Builder(); ++ } ++ ++ @AutoValue.Builder ++ public abstract static class Builder { ++ @JsonProperty("errorSpansPerSecond") ++ public abstract Builder setErrorSpansPerSecond(int value); ++ ++ public abstract ErrorCaptureLimit build(); ++ } ++ } ++} +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java +index 9b5a2e7e..31d5a293 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java +@@ -9,16 +9,22 @@ import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.trace.SpanKind; + import io.opentelemetry.context.Context; + import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRuleRecord; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.data.LinkData; ++import io.opentelemetry.sdk.trace.data.SpanData; ++import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; ++import io.opentelemetry.sdk.trace.export.SpanExporter; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; + import java.io.Closeable; + import java.time.Duration; + import java.time.Instant; ++import java.util.ArrayList; + import java.util.Date; + import java.util.Iterator; + import java.util.List; +@@ -43,6 +49,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + + private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName()); + ++ // Default batch size to be same as OTel BSP default ++ private static final int maxExportBatchSize = 512; ++ + private final Resource resource; + private final Clock clock; + private final Sampler initialSampler; +@@ -58,6 +67,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + @Nullable private volatile GetSamplingRulesResponse previousRulesResponse; + private volatile Sampler sampler; + ++ @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig; ++ @Nullable private BatchSpanProcessor bsp; ++ + /** + * Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link + * Resource} should be the same as what the OpenTelemetry SDK is configured with. +@@ -119,6 +131,40 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}"; + } + ++ public void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) { ++ if (this.adaptiveSamplingConfig != null) { ++ throw new IllegalStateException("Programming bug - Adaptive sampling config is already set"); ++ } else if (config != null && this.adaptiveSamplingConfig == null) { ++ // Save here and also pass to XrayRulesSampler directly as it already exists ++ this.adaptiveSamplingConfig = config; ++ if (sampler instanceof XrayRulesSampler) { ++ ((XrayRulesSampler) sampler).setAdaptiveSamplingConfig(config); ++ } ++ } ++ } ++ ++ public void setSpanExporter(SpanExporter spanExporter) { ++ if (this.bsp != null) { ++ throw new IllegalStateException("Programming bug - BatchSpanProcessor is already set"); ++ } else if (spanExporter != null && this.bsp == null) { ++ this.bsp = ++ BatchSpanProcessor.builder(spanExporter) ++ .setExportUnsampledSpans(true) // Required to capture the unsampled anomaly spans ++ .setMaxExportBatchSize(maxExportBatchSize) ++ .build(); ++ } ++ } ++ ++ public void adaptSampling(ReadableSpan span, SpanData spanData) { ++ if (this.bsp == null) { ++ throw new IllegalStateException( ++ "Programming bug - BatchSpanProcessor is null while trying to adapt sampling"); ++ } ++ if (sampler instanceof XrayRulesSampler) { ++ ((XrayRulesSampler) sampler).adaptSampling(span, spanData, this.bsp::onEnd); ++ } ++ } ++ + private void getAndUpdateSampler() { + try { + // No pagination support yet, or possibly ever. +@@ -133,7 +179,8 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + initialSampler, + response.getSamplingRules().stream() + .map(SamplingRuleRecord::getRule) +- .collect(Collectors.toList())); ++ .collect(Collectors.toList()), ++ adaptiveSamplingConfig); + previousRulesResponse = response; + ScheduledFuture existingFetchTargetsFuture = fetchTargetsFuture; + if (existingFetchTargetsFuture != null) { +@@ -177,14 +224,29 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + XrayRulesSampler xrayRulesSampler = (XrayRulesSampler) sampler; + try { + Date now = Date.from(Instant.ofEpochSecond(0, clock.now())); +- List statistics = xrayRulesSampler.snapshot(now); ++ List statisticsSnapshot = ++ xrayRulesSampler.snapshot(now); ++ List statistics = new ArrayList(); ++ List boostStatistics = ++ new ArrayList(); ++ statisticsSnapshot.stream() ++ .forEach( ++ snapshot -> { ++ if (snapshot.getStatisticsDocument() != null) { ++ statistics.add(snapshot.getStatisticsDocument()); ++ } ++ if (snapshot.getBoostStatisticsDocument() != null ++ && snapshot.getBoostStatisticsDocument().getTotalCount() > 0) { ++ boostStatistics.add(snapshot.getBoostStatisticsDocument()); ++ } ++ }); + Set requestedTargetRuleNames = + statistics.stream() + .map(SamplingStatisticsDocument::getRuleName) + .collect(Collectors.toSet()); + +- GetSamplingTargetsResponse response = +- client.getSamplingTargets(GetSamplingTargetsRequest.create(statistics)); ++ GetSamplingTargetsRequest req = GetSamplingTargetsRequest.create(statistics, boostStatistics); ++ GetSamplingTargetsResponse response = client.getSamplingTargets(req); + Map targets = + response.getDocuments().stream() + .collect(Collectors.toMap(SamplingTargetDocument::getRuleName, Function.identity())); +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java +index dca930d5..01835dc2 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java +@@ -62,7 +62,8 @@ abstract class GetSamplingRulesResponse { + @JsonProperty("ServiceName") String serviceName, + @JsonProperty("ServiceType") String serviceType, + @JsonProperty("URLPath") String urlPath, +- @JsonProperty("Version") int version) { ++ @JsonProperty("Version") int version, ++ @JsonProperty("SamplingRateBoost") @Nullable SamplingRateBoost samplingRateBoost) { + return new AutoValue_GetSamplingRulesResponse_SamplingRule( + attributes, + fixedRate, +@@ -76,7 +77,8 @@ abstract class GetSamplingRulesResponse { + serviceName, + serviceType, + urlPath, +- version); ++ version, ++ samplingRateBoost); + } + + abstract Map getAttributes(); +@@ -106,5 +108,23 @@ abstract class GetSamplingRulesResponse { + abstract String getUrlPath(); + + abstract int getVersion(); ++ ++ @Nullable ++ abstract SamplingRateBoost getSamplingRateBoost(); ++ } ++ ++ @AutoValue ++ abstract static class SamplingRateBoost { ++ @JsonCreator ++ static SamplingRateBoost create( ++ @JsonProperty("MaxRate") double maxRate, ++ @JsonProperty("CooldownWindowMinutes") long cooldownWindowMinutes) { ++ return new AutoValue_GetSamplingRulesResponse_SamplingRateBoost( ++ maxRate, cooldownWindowMinutes); ++ } ++ ++ abstract double getMaxRate(); ++ ++ abstract long getCooldownWindowMinutes(); + } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java +index 7d1fb7b7..9404f73e 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java +@@ -15,14 +15,20 @@ import java.util.List; + @JsonSerialize(as = GetSamplingTargetsRequest.class) + abstract class GetSamplingTargetsRequest { + +- static GetSamplingTargetsRequest create(List documents) { +- return new AutoValue_GetSamplingTargetsRequest(documents); ++ static GetSamplingTargetsRequest create( ++ List documents, ++ List boostDocuments) { ++ return new AutoValue_GetSamplingTargetsRequest(documents, boostDocuments); + } + + // Limit of 25 items + @JsonProperty("SamplingStatisticsDocuments") + abstract List getDocuments(); + ++ // Limit of 25 items ++ @JsonProperty("SamplingBoostStatisticsDocuments") ++ abstract List getBoostDocuments(); ++ + @AutoValue + @JsonSerialize(as = SamplingStatisticsDocument.class) + abstract static class SamplingStatisticsDocument { +@@ -66,4 +72,48 @@ abstract class GetSamplingTargetsRequest { + abstract SamplingStatisticsDocument build(); + } + } ++ ++ @AutoValue ++ @JsonSerialize(as = SamplingBoostStatisticsDocument.class) ++ abstract static class SamplingBoostStatisticsDocument { ++ ++ static SamplingBoostStatisticsDocument.Builder newBuilder() { ++ return new AutoValue_GetSamplingTargetsRequest_SamplingBoostStatisticsDocument.Builder(); ++ } ++ ++ @JsonProperty("RuleName") ++ abstract String getRuleName(); ++ ++ @JsonProperty("ServiceName") ++ abstract String getServiceName(); ++ ++ @JsonProperty("Timestamp") ++ abstract Date getTimestamp(); ++ ++ @JsonProperty("AnomalyCount") ++ abstract long getAnomalyCount(); ++ ++ @JsonProperty("TotalCount") ++ abstract long getTotalCount(); ++ ++ @JsonProperty("SampledAnomalyCount") ++ abstract long getSampledAnomalyCount(); ++ ++ @AutoValue.Builder ++ abstract static class Builder { ++ abstract Builder setRuleName(String ruleName); ++ ++ abstract Builder setServiceName(String serviceName); ++ ++ abstract Builder setTimestamp(Date timestamp); ++ ++ abstract Builder setAnomalyCount(long anomalyCount); ++ ++ abstract Builder setTotalCount(long totalCount); ++ ++ abstract Builder setSampledAnomalyCount(long sampledAnomalyCount); ++ ++ abstract SamplingBoostStatisticsDocument build(); ++ } ++ } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java +index c1e178f5..406f07e2 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java +@@ -19,9 +19,11 @@ abstract class GetSamplingTargetsResponse { + static GetSamplingTargetsResponse create( + @JsonProperty("LastRuleModification") Date lastRuleModification, + @JsonProperty("SamplingTargetDocuments") List documents, +- @JsonProperty("UnprocessedStatistics") List unprocessedStatistics) { ++ @JsonProperty("UnprocessedStatistics") List unprocessedStatistics, ++ @JsonProperty("UnprocessedBoostStatistics") @Nullable ++ List unprocessedBoostStatistics) { + return new AutoValue_GetSamplingTargetsResponse( +- lastRuleModification, documents, unprocessedStatistics); ++ lastRuleModification, documents, unprocessedStatistics, unprocessedBoostStatistics); + } + + abstract Date getLastRuleModification(); +@@ -30,6 +32,9 @@ abstract class GetSamplingTargetsResponse { + + abstract List getUnprocessedStatistics(); + ++ @Nullable ++ abstract List getUnprocessedBoostStatistics(); ++ + @AutoValue + abstract static class SamplingTargetDocument { + +@@ -39,9 +44,10 @@ abstract class GetSamplingTargetsResponse { + @JsonProperty("Interval") @Nullable Integer intervalSecs, + @JsonProperty("ReservoirQuota") @Nullable Integer reservoirQuota, + @JsonProperty("ReservoirQuotaTTL") @Nullable Date reservoirQuotaTtl, ++ @JsonProperty("SamplingBoost") @Nullable SamplingBoost samplingBoost, + @JsonProperty("RuleName") String ruleName) { + return new AutoValue_GetSamplingTargetsResponse_SamplingTargetDocument( +- fixedRate, intervalSecs, reservoirQuota, reservoirQuotaTtl, ruleName); ++ fixedRate, intervalSecs, reservoirQuota, reservoirQuotaTtl, samplingBoost, ruleName); + } + + abstract double getFixedRate(); +@@ -57,6 +63,9 @@ abstract class GetSamplingTargetsResponse { + @Nullable + abstract Date getReservoirQuotaTtl(); + ++ @Nullable ++ abstract SamplingBoost getSamplingBoost(); ++ + abstract String getRuleName(); + } + +@@ -78,4 +87,18 @@ abstract class GetSamplingTargetsResponse { + + abstract String getRuleName(); + } ++ ++ @AutoValue ++ abstract static class SamplingBoost { ++ @JsonCreator ++ static SamplingBoost create( ++ @JsonProperty("BoostRate") double boostRate, ++ @JsonProperty("BoostRateTTL") Date boostRateTtl) { ++ return new AutoValue_GetSamplingTargetsResponse_SamplingBoost(boostRate, boostRateTtl); ++ } ++ ++ abstract double getBoostRate(); ++ ++ abstract Date getBoostRateTtl(); ++ } + } diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java -index 1ef8abf..ef84f35 100644 +index 1ef8abf5..328e63dd 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java -@@ -35,6 +35,11 @@ final class SamplingRuleApplier { - +@@ -9,10 +9,13 @@ import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.trace.SpanKind; + import io.opentelemetry.context.Context; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingBoost; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.data.LinkData; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +@@ -35,6 +38,11 @@ final class SamplingRuleApplier { + private static final Map XRAY_CLOUD_PLATFORM; - + + private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); + private static final AttributeKey URL_FULL = AttributeKey.stringKey("url.full"); + private static final AttributeKey HTTP_REQUEST_METHOD = @@ -14,9 +567,111 @@ index 1ef8abf..ef84f35 100644 static { Map xrayCloudPlatform = new HashMap<>(); xrayCloudPlatform.put(ResourceAttributes.CloudPlatformValues.AWS_EC2, "AWS::EC2::Instance"); -@@ -162,11 +167,14 @@ final class SamplingRuleApplier { +@@ -50,12 +58,20 @@ final class SamplingRuleApplier { + + private final String clientId; + private final String ruleName; ++ private final String serviceName; + private final Clock clock; + private final Sampler reservoirSampler; + private final long reservoirEndTimeNanos; ++ private final double fixedRate; + private final Sampler fixedRateSampler; + private final boolean borrowing; + ++ // Adaptive sampling related configs ++ private final boolean hasBoost; ++ private final double boostedFixedRate; ++ private final Long boostEndTimeNanos; ++ private final Sampler boostedFixedRateSampler; ++ + private final Map attributeMatchers; + private final Matcher urlPathMatcher; + private final Matcher serviceNameMatcher; +@@ -68,7 +84,11 @@ final class SamplingRuleApplier { + + private final long nextSnapshotTimeNanos; + +- SamplingRuleApplier(String clientId, GetSamplingRulesResponse.SamplingRule rule, Clock clock) { ++ SamplingRuleApplier( ++ String clientId, ++ GetSamplingRulesResponse.SamplingRule rule, ++ @Nullable String serviceName, ++ Clock clock) { + this.clientId = clientId; + this.clock = clock; + String ruleName = rule.getRuleName(); +@@ -82,6 +102,8 @@ final class SamplingRuleApplier { + } + this.ruleName = ruleName; + ++ this.serviceName = serviceName == null ? "default" : serviceName; ++ + // We don't have a SamplingTarget so are ready to report a snapshot right away. + nextSnapshotTimeNanos = clock.nanoTime(); + +@@ -98,7 +120,15 @@ final class SamplingRuleApplier { + reservoirSampler = Sampler.alwaysOff(); + borrowing = false; + } +- fixedRateSampler = createFixedRate(rule.getFixedRate()); ++ fixedRate = rule.getFixedRate(); ++ fixedRateSampler = createFixedRate(fixedRate); ++ ++ // Check if the rule has a sampling rate boost option ++ hasBoost = rule.getSamplingRateBoost() != null; ++ ++ boostedFixedRate = fixedRate; ++ boostedFixedRateSampler = createFixedRate(fixedRate); ++ boostEndTimeNanos = clock.nanoTime(); + + if (rule.getAttributes().isEmpty()) { + attributeMatchers = Collections.emptyMap(); +@@ -121,11 +151,16 @@ final class SamplingRuleApplier { + private SamplingRuleApplier( + String clientId, + String ruleName, ++ String serviceName, + Clock clock, + Sampler reservoirSampler, + long reservoirEndTimeNanos, ++ double fixedRate, + Sampler fixedRateSampler, + boolean borrowing, ++ double boostedFixedRate, ++ Long boostEndTimeNanos, ++ boolean hasBoost, + Map attributeMatchers, + Matcher urlPathMatcher, + Matcher serviceNameMatcher, +@@ -137,11 +172,16 @@ final class SamplingRuleApplier { + long nextSnapshotTimeNanos) { + this.clientId = clientId; + this.ruleName = ruleName; ++ this.serviceName = serviceName; + this.clock = clock; + this.reservoirSampler = reservoirSampler; + this.reservoirEndTimeNanos = reservoirEndTimeNanos; ++ this.fixedRate = fixedRate; + this.fixedRateSampler = fixedRateSampler; + this.borrowing = borrowing; ++ this.boostedFixedRate = boostedFixedRate; ++ this.boostEndTimeNanos = boostEndTimeNanos; ++ this.hasBoost = hasBoost; + this.attributeMatchers = attributeMatchers; + this.urlPathMatcher = urlPathMatcher; + this.serviceNameMatcher = serviceNameMatcher; +@@ -151,6 +191,7 @@ final class SamplingRuleApplier { + this.resourceArnMatcher = resourceArnMatcher; + this.statistics = statistics; + this.nextSnapshotTimeNanos = nextSnapshotTimeNanos; ++ this.boostedFixedRateSampler = createFixedRate(this.boostedFixedRate); + } + + @SuppressWarnings("deprecation") // TODO +@@ -162,11 +203,14 @@ final class SamplingRuleApplier { String host = null; - + for (Map.Entry, Object> entry : attributes.asMap().entrySet()) { - if (entry.getKey().equals(SemanticAttributes.HTTP_TARGET)) { + if (entry.getKey().equals(SemanticAttributes.HTTP_TARGET) @@ -32,26 +687,736 @@ index 1ef8abf..ef84f35 100644 httpMethod = (String) entry.getValue(); } else if (entry.getKey().equals(SemanticAttributes.NET_HOST_NAME)) { host = (String) entry.getValue(); +@@ -237,45 +281,84 @@ final class SamplingRuleApplier { + statistics.sampled.increment(); + return result; + } +- result = +- fixedRateSampler.shouldSample( +- parentContext, traceId, name, spanKind, attributes, parentLinks); ++ ++ if (clock.nanoTime() < boostEndTimeNanos) { ++ result = ++ boostedFixedRateSampler.shouldSample( ++ parentContext, traceId, name, spanKind, attributes, parentLinks); ++ } else { ++ result = ++ fixedRateSampler.shouldSample( ++ parentContext, traceId, name, spanKind, attributes, parentLinks); ++ } + if (result.getDecision() != SamplingDecision.DROP) { + statistics.sampled.increment(); + } + return result; + } + ++ void countTrace() { ++ statistics.traces.increment(); ++ } ++ ++ void countAnomalyTrace(ReadableSpan span) { ++ statistics.anomalies.increment(); ++ ++ if (span.getSpanContext().isSampled()) { ++ statistics.anomaliesSampled.increment(); ++ } ++ } ++ + @Nullable +- SamplingStatisticsDocument snapshot(Date now) { ++ SamplingRuleStatisticsSnapshot snapshot(Date now) { + if (clock.nanoTime() < nextSnapshotTimeNanos) { + return null; + } +- return SamplingStatisticsDocument.newBuilder() +- .setClientId(clientId) +- .setRuleName(ruleName) +- .setTimestamp(now) +- // Resetting requests first ensures that sample / borrow rate are positive after the reset. +- // Snapshotting is not concurrent so this ensures they are always positive. +- .setRequestCount(statistics.requests.sumThenReset()) +- .setSampledCount(statistics.sampled.sumThenReset()) +- .setBorrowCount(statistics.borrowed.sumThenReset()) +- .build(); ++ long totalCount = statistics.requests.sumThenReset(); ++ long sampledCount = statistics.sampled.sumThenReset(); ++ long borrowCount = statistics.borrowed.sumThenReset(); ++ long traceCount = statistics.traces.sumThenReset(); ++ long anomalyCount = statistics.anomalies.sumThenReset(); ++ long sampledAnomalyCount = statistics.anomaliesSampled.sumThenReset(); ++ SamplingStatisticsDocument samplingStatistics = ++ SamplingStatisticsDocument.newBuilder() ++ .setClientId(clientId) ++ .setRuleName(ruleName) ++ .setTimestamp(now) ++ // Resetting requests first ensures that sample / borrow rate are positive after the ++ // reset. ++ // Snapshotting is not concurrent so this ensures they are always positive. ++ .setRequestCount(totalCount) ++ .setSampledCount(sampledCount) ++ .setBorrowCount(borrowCount) ++ .build(); ++ SamplingBoostStatisticsDocument boostDoc = ++ SamplingBoostStatisticsDocument.newBuilder() ++ .setRuleName(ruleName) ++ .setServiceName(serviceName) ++ .setTimestamp(now) ++ .setTotalCount(traceCount) ++ .setAnomalyCount(anomalyCount) ++ .setSampledAnomalyCount(sampledAnomalyCount) ++ .build(); ++ return new SamplingRuleStatisticsSnapshot(samplingStatistics, boostDoc); + } + + long getNextSnapshotTimeNanos() { + return nextSnapshotTimeNanos; + } + +- SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now) { ++ // currentNanoTime is passed in to ensure all uses of withTarget are used with the same baseline ++ // time reference ++ SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now, long currentNanoTime) { + Sampler newFixedRateSampler = createFixedRate(target.getFixedRate()); + Sampler newReservoirSampler = Sampler.alwaysOff(); +- long newReservoirEndTimeNanos = clock.nanoTime(); ++ long newReservoirEndTimeNanos = currentNanoTime; + // Not well documented but a quota should always come with a TTL + if (target.getReservoirQuota() != null && target.getReservoirQuotaTtl() != null) { + newReservoirSampler = createRateLimited(target.getReservoirQuota()); + newReservoirEndTimeNanos = +- clock.nanoTime() ++ currentNanoTime + + Duration.between(now.toInstant(), target.getReservoirQuotaTtl().toInstant()) + .toNanos(); + } +@@ -283,16 +366,36 @@ final class SamplingRuleApplier { + target.getIntervalSecs() != null + ? TimeUnit.SECONDS.toNanos(target.getIntervalSecs()) + : AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; +- long newNextSnapshotTimeNanos = clock.nanoTime() + intervalNanos; ++ long newNextSnapshotTimeNanos = currentNanoTime + intervalNanos; ++ ++ double newBoostedFixedRate = fixedRate; ++ long newBoostEndTimeNanos = currentNanoTime; ++ if (target.getSamplingBoost() != null) { ++ SamplingBoost samplingBoostMap = target.getSamplingBoost(); ++ if (samplingBoostMap != null ++ && samplingBoostMap.getBoostRate() >= target.getFixedRate() ++ && samplingBoostMap.getBoostRateTtl() != null) { ++ newBoostedFixedRate = samplingBoostMap.getBoostRate(); ++ newBoostEndTimeNanos = ++ currentNanoTime ++ + Duration.between(now.toInstant(), samplingBoostMap.getBoostRateTtl().toInstant()) ++ .toNanos(); ++ } ++ } + + return new SamplingRuleApplier( + clientId, + ruleName, ++ serviceName, + clock, + newReservoirSampler, + newReservoirEndTimeNanos, ++ fixedRate, + newFixedRateSampler, + /* borrowing= */ false, ++ newBoostedFixedRate, ++ newBoostEndTimeNanos, ++ hasBoost, + attributeMatchers, + urlPathMatcher, + serviceNameMatcher, +@@ -308,11 +411,16 @@ final class SamplingRuleApplier { + return new SamplingRuleApplier( + clientId, + ruleName, ++ serviceName, + clock, + reservoirSampler, + reservoirEndTimeNanos, ++ fixedRate, + fixedRateSampler, + borrowing, ++ boostedFixedRate, ++ boostEndTimeNanos, ++ hasBoost, + attributeMatchers, + urlPathMatcher, + serviceNameMatcher, +@@ -328,6 +436,15 @@ final class SamplingRuleApplier { + return ruleName; + } + ++ // For testing ++ String getServiceName() { ++ return serviceName; ++ } ++ ++ boolean hasBoost() { ++ return hasBoost; ++ } ++ + @Nullable + private static String getArn(Attributes attributes, Resource resource) { + String arn = resource.getAttributes().get(ResourceAttributes.AWS_ECS_CONTAINER_ARN); +@@ -479,5 +596,30 @@ final class SamplingRuleApplier { + final LongAdder requests = new LongAdder(); + final LongAdder sampled = new LongAdder(); + final LongAdder borrowed = new LongAdder(); ++ final LongAdder traces = new LongAdder(); ++ final LongAdder anomalies = new LongAdder(); ++ final LongAdder anomaliesSampled = new LongAdder(); ++ } ++ ++ static class SamplingRuleStatisticsSnapshot { ++ final SamplingStatisticsDocument statisticsDocument; ++ final SamplingBoostStatisticsDocument boostStatisticsDocument; ++ ++ // final SamplingBoostStatisticsDocument boostStatisticsDocument; ++ ++ SamplingRuleStatisticsSnapshot( ++ SamplingStatisticsDocument statisticsDocument, ++ SamplingBoostStatisticsDocument boostStatisticsDocument) { ++ this.statisticsDocument = statisticsDocument; ++ this.boostStatisticsDocument = boostStatisticsDocument; ++ } ++ ++ SamplingStatisticsDocument getStatisticsDocument() { ++ return statisticsDocument; ++ } ++ ++ SamplingBoostStatisticsDocument getBoostStatisticsDocument() { ++ return boostStatisticsDocument; ++ } + } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java +index 75977dc0..79c344bc 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java +@@ -5,15 +5,24 @@ + + package io.opentelemetry.contrib.awsxray; + ++import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; ++ ++import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; ++import io.opentelemetry.api.trace.Span; ++import io.opentelemetry.api.trace.SpanContext; + import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.api.trace.StatusCode; + import io.opentelemetry.context.Context; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.data.LinkData; ++import io.opentelemetry.sdk.trace.data.SpanData; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; ++import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + import java.util.Arrays; + import java.util.Comparator; + import java.util.Date; +@@ -21,26 +30,47 @@ import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.function.Consumer; + import java.util.logging.Level; + import java.util.logging.Logger; + import java.util.stream.Collectors; ++import javax.annotation.Nullable; + + final class XrayRulesSampler implements Sampler { + + private static final Logger logger = Logger.getLogger(XrayRulesSampler.class.getName()); + ++ public static final AttributeKey AWS_XRAY_SAMPLING_RULE = ++ AttributeKey.stringKey("aws.xray.sampling_rule"); ++ ++ // Used for generating operation ++ private static final String UNKNOWN_OPERATION = "UnknownOperation"; ++ private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); ++ private static final AttributeKey HTTP_TARGET = AttributeKey.stringKey("http.target"); ++ private static final AttributeKey HTTP_REQUEST_METHOD = ++ AttributeKey.stringKey("http.request.method"); ++ private static final AttributeKey HTTP_METHOD = AttributeKey.stringKey("http.method"); ++ + private final String clientId; + private final Resource resource; + private final Clock clock; + private final Sampler fallbackSampler; + private final SamplingRuleApplier[] ruleAppliers; + ++ private final boolean adaptiveSamplingRuleExists; ++ private final Set anomalyTracesSet; ++ ++ @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig; ++ @Nullable private RateLimiter anomalyCaptureRateLimiter; ++ + XrayRulesSampler( + String clientId, + Resource resource, + Clock clock, + Sampler fallbackSampler, +- List rules) { ++ List rules, ++ @Nullable AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig) { + this( + clientId, + resource, +@@ -49,8 +79,16 @@ final class XrayRulesSampler implements Sampler { + rules.stream() + // Lower priority value takes precedence so normal ascending sort. + .sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority)) +- .map(rule -> new SamplingRuleApplier(clientId, rule, clock)) +- .toArray(SamplingRuleApplier[]::new)); ++ .map( ++ rule -> ++ new SamplingRuleApplier( ++ clientId, ++ rule, ++ resource.getAttribute(ResourceAttributes.SERVICE_NAME), ++ clock)) ++ .toArray(SamplingRuleApplier[]::new), ++ rules.stream().anyMatch(r -> r.getSamplingRateBoost() != null), ++ adaptiveSamplingConfig); + } + + private XrayRulesSampler( +@@ -58,12 +96,26 @@ final class XrayRulesSampler implements Sampler { + Resource resource, + Clock clock, + Sampler fallbackSampler, +- SamplingRuleApplier[] ruleAppliers) { ++ SamplingRuleApplier[] ruleAppliers, ++ boolean adaptiveSamplingRuleExists, ++ @Nullable AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig) { + this.clientId = clientId; + this.resource = resource; + this.clock = clock; + this.fallbackSampler = fallbackSampler; + this.ruleAppliers = ruleAppliers; ++ this.adaptiveSamplingRuleExists = adaptiveSamplingRuleExists; ++ this.adaptiveSamplingConfig = adaptiveSamplingConfig; ++ // The set is self-clearing, when spans close they are removed from the set ++ this.anomalyTracesSet = ConcurrentHashMap.newKeySet(100_000); ++ ++ // Initialize anomaly capture rate limiter if error capture limit is configured ++ if (adaptiveSamplingConfig != null && adaptiveSamplingConfig.getErrorCaptureLimit() != null) { ++ int errorSpansPerSecond = ++ adaptiveSamplingConfig.getErrorCaptureLimit().getErrorSpansPerSecond(); ++ this.anomalyCaptureRateLimiter = ++ new RateLimiter(errorSpansPerSecond, errorSpansPerSecond, clock); ++ } + } + + @Override +@@ -74,10 +126,30 @@ final class XrayRulesSampler implements Sampler { + SpanKind spanKind, + Attributes attributes, + List parentLinks) { ++ String upstreamMatchedRule = ++ Span.fromContext(parentContext) ++ .getSpanContext() ++ .getTraceState() ++ .get(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY); + for (SamplingRuleApplier applier : ruleAppliers) { + if (applier.matches(attributes, resource)) { +- return applier.shouldSample( +- parentContext, traceId, name, spanKind, attributes, parentLinks); ++ SamplingResult result = ++ applier.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); ++ ++ String ruleToPropagate = ++ upstreamMatchedRule == null ? applier.getRuleName() : upstreamMatchedRule; ++ if (this.adaptiveSamplingConfig != null ++ && this.adaptiveSamplingConfig.getErrorCaptureLimit() != null) { ++ // If the span is capturable based on local SDK config, add sampling rule attribute ++ return AwsSamplingResult.create( ++ result.getDecision(), ++ result.getAttributes().toBuilder() ++ .put(AWS_XRAY_SAMPLING_RULE.getKey(), ruleToPropagate) ++ .build(), ++ ruleToPropagate); ++ } ++ return AwsSamplingResult.create( ++ result.getDecision(), result.getAttributes(), ruleToPropagate); + } + } + +@@ -96,7 +168,144 @@ final class XrayRulesSampler implements Sampler { + return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}"; + } + +- List snapshot(Date now) { ++ void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) { ++ if (this.adaptiveSamplingConfig != null) { ++ throw new IllegalStateException("Programming bug - Adaptive sampling config is already set"); ++ } else if (config != null && this.adaptiveSamplingConfig == null) { ++ this.adaptiveSamplingConfig = config; ++ ++ // Initialize anomaly capture rate limiter if error capture limit is configured ++ if (config.getErrorCaptureLimit() != null) { ++ int errorSpansPerSecond = config.getErrorCaptureLimit().getErrorSpansPerSecond(); ++ this.anomalyCaptureRateLimiter = ++ new RateLimiter(errorSpansPerSecond, errorSpansPerSecond, clock); ++ } ++ } ++ } ++ ++ void adaptSampling(ReadableSpan span, SpanData spanData, Consumer spanBatcher) { ++ if (!adaptiveSamplingRuleExists) { ++ return; ++ } ++ Long statusCode = spanData.getAttributes().get(HTTP_RESPONSE_STATUS_CODE); ++ ++ boolean shouldBoostSampling = false; ++ boolean shouldCaptureAnomalySpan = false; ++ ++ List anomalyConditions = ++ adaptiveSamplingConfig != null ? adaptiveSamplingConfig.getAnomalyConditions() : null; ++ // Empty list -> no conditions will apply and we will not do anything ++ if (anomalyConditions != null && !anomalyConditions.isEmpty()) { ++ String operation = spanData.getAttributes().get(AwsAttributeKeys.AWS_LOCAL_OPERATION); ++ if (operation == null) { ++ operation = generateIngressOperation(spanData); ++ } ++ for (AwsXrayAdaptiveSamplingConfig.AnomalyConditions condition : anomalyConditions) { ++ // Check if the operation matches any in the list or if operations list is null (match all) ++ List operations = condition.getOperations(); ++ if (!(operations == null || operations.isEmpty() || operations.contains(operation))) { ++ continue; ++ } ++ // Check if any anomalyConditions detect an anomaly either through error code or latency ++ boolean isAnomaly = false; ++ ++ String errorCodeRegex = condition.getErrorCodeRegex(); ++ if (statusCode != null && errorCodeRegex != null) { ++ isAnomaly = statusCode.toString().matches(errorCodeRegex); ++ } ++ ++ Long highLatencyMs = condition.getHighLatencyMs(); ++ if (highLatencyMs != null) { ++ isAnomaly = ++ (errorCodeRegex == null || isAnomaly) ++ && (span.getLatencyNanos() / 1_000_000.0) >= highLatencyMs; ++ } ++ ++ if (isAnomaly) { ++ AwsXrayAdaptiveSamplingConfig.UsageType usage = condition.getUsage(); ++ if (usage != null) { ++ switch (usage) { ++ case BOTH: ++ shouldBoostSampling = true; ++ shouldCaptureAnomalySpan = true; ++ break; ++ case SAMPLING_BOOST: ++ shouldBoostSampling = true; ++ break; ++ case ERROR_SPAN_CAPTURE: ++ shouldCaptureAnomalySpan = true; ++ break; ++ } ++ } ++ } ++ if (shouldBoostSampling && shouldCaptureAnomalySpan) { ++ break; ++ } ++ } ++ } else if ((statusCode != null && statusCode > 499) ++ || (statusCode == null ++ && spanData.getStatus() != null ++ && StatusCode.ERROR.equals(spanData.getStatus().getStatusCode()))) { ++ shouldBoostSampling = true; ++ shouldCaptureAnomalySpan = true; ++ } ++ ++ String traceId = spanData.getTraceId(); ++ SpanContext parentContext = spanData.getParentSpanContext(); ++ boolean isLocalRootSpan = ++ parentContext == null || !parentContext.isValid() || parentContext.isRemote(); ++ ++ if (shouldBoostSampling || shouldCaptureAnomalySpan || isLocalRootSpan) { ++ // Anomaly Capture ++ if (shouldCaptureAnomalySpan ++ && anomalyCaptureRateLimiter != null ++ && anomalyCaptureRateLimiter.trySpend(1)) { ++ spanBatcher.accept(span); ++ } ++ ++ // Sampling Boost ++ String ruleNameForBoostStats = ++ span.getSpanContext() ++ .getTraceState() ++ .get(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY); ++ SamplingRuleApplier ruleToReportTo = null; ++ SamplingRuleApplier matchedRule = null; ++ for (SamplingRuleApplier applier : ruleAppliers) { ++ // Rule propagated from when sampling decision was made, otherwise the matched rule ++ if (applier.getRuleName().equals(ruleNameForBoostStats)) { ++ ruleToReportTo = applier; ++ break; ++ } ++ if (applier.matches(spanData.getAttributes(), resource)) { ++ matchedRule = applier; ++ } ++ } ++ if (ruleToReportTo == null) { ++ if (matchedRule == null) { ++ logger.log( ++ Level.FINE, ++ "No sampling rule matched the request. This is a bug in either the OpenTelemetry SDK or X-Ray."); ++ } else { ++ ruleToReportTo = matchedRule; ++ } ++ } ++ ++ if (shouldBoostSampling ++ && ruleToReportTo != null ++ && ruleToReportTo.hasBoost() ++ && this.anomalyTracesSet.add(traceId)) { ++ ruleToReportTo.countAnomalyTrace(span); ++ } ++ if (isLocalRootSpan) { ++ if (ruleToReportTo != null && ruleToReportTo.hasBoost()) { ++ ruleToReportTo.countTrace(); ++ } ++ this.anomalyTracesSet.remove(traceId); ++ } ++ } ++ } ++ ++ List snapshot(Date now) { + return Arrays.stream(ruleAppliers) + .map(rule -> rule.snapshot(now)) + .filter(Objects::nonNull) +@@ -115,15 +324,16 @@ final class XrayRulesSampler implements Sampler { + Map ruleTargets, + Set requestedTargetRuleNames, + Date now) { ++ long currentNanoTime = clock.nanoTime(); + long defaultNextSnapshotTimeNanos = +- clock.nanoTime() + AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; ++ currentNanoTime + AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; + SamplingRuleApplier[] newAppliers = + Arrays.stream(ruleAppliers) + .map( + rule -> { + SamplingTargetDocument target = ruleTargets.get(rule.getRuleName()); + if (target != null) { +- return rule.withTarget(target, now); ++ return rule.withTarget(target, now, currentNanoTime); + } + if (requestedTargetRuleNames.contains(rule.getRuleName())) { + // In practice X-Ray should return a target for any rule we requested but +@@ -135,6 +345,59 @@ final class XrayRulesSampler implements Sampler { + return rule; + }) + .toArray(SamplingRuleApplier[]::new); +- return new XrayRulesSampler(clientId, resource, clock, fallbackSampler, newAppliers); ++ return new XrayRulesSampler( ++ clientId, ++ resource, ++ clock, ++ fallbackSampler, ++ newAppliers, ++ adaptiveSamplingRuleExists, ++ adaptiveSamplingConfig); ++ } ++ ++ static boolean isKeyPresent(SpanData span, AttributeKey key) { ++ return span.getAttributes().get(key) != null; ++ } ++ ++ private static String generateIngressOperation(SpanData span) { ++ String operation = UNKNOWN_OPERATION; ++ if (isKeyPresent(span, URL_PATH) || isKeyPresent(span, HTTP_TARGET)) { ++ String httpTarget = ++ isKeyPresent(span, URL_PATH) ++ ? span.getAttributes().get(URL_PATH) ++ : span.getAttributes().get(HTTP_TARGET); ++ // get the first part from API path string as operation value ++ // the more levels/parts we get from API path the higher chance for getting high cardinality ++ // data ++ if (httpTarget != null) { ++ operation = extractApiPathValue(httpTarget); ++ if (isKeyPresent(span, HTTP_REQUEST_METHOD) || isKeyPresent(span, HTTP_METHOD)) { ++ String httpMethod = ++ isKeyPresent(span, HTTP_REQUEST_METHOD) ++ ? span.getAttributes().get(HTTP_REQUEST_METHOD) ++ : span.getAttributes().get(HTTP_METHOD); ++ if (httpMethod != null) { ++ operation = httpMethod + " " + operation; ++ } ++ } ++ } ++ } ++ return operation; ++ } ++ ++ private static String extractApiPathValue(String httpTarget) { ++ if (httpTarget == null || httpTarget.isEmpty()) { ++ return "/"; ++ } ++ String[] paths = httpTarget.split("/"); ++ if (paths.length > 1) { ++ return "/" + paths[1]; ++ } ++ return "/"; ++ } ++ ++ // For testing ++ Set getAnomalyTracesSet() { ++ return anomalyTracesSet; + } + } +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java +index 654a4d57..5af11a25 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java +@@ -7,7 +7,10 @@ package io.opentelemetry.contrib.awsxray; + + import static java.util.Objects.requireNonNull; + import static org.assertj.core.api.Assertions.assertThat; ++import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; + import static org.awaitility.Awaitility.await; ++import static org.junit.jupiter.api.Assertions.assertThrows; ++import static org.mockito.Mockito.mock; + + import com.google.common.io.ByteStreams; + import com.linecorp.armeria.common.HttpResponse; +@@ -21,6 +24,9 @@ import io.opentelemetry.api.trace.SpanKind; + import io.opentelemetry.api.trace.TraceId; + import io.opentelemetry.context.Context; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; ++import io.opentelemetry.sdk.trace.data.SpanData; ++import io.opentelemetry.sdk.trace.export.SpanExporter; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; + import java.io.IOException; +@@ -168,6 +174,32 @@ class AwsXrayRemoteSamplerTest { + } + } + ++ @Test ++ void setAndResetSpanExporter() { ++ try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { ++ // Setting span exporter should only work once ++ sampler.setSpanExporter(mock(SpanExporter.class)); ++ assertThrows( ++ IllegalStateException.class, () -> sampler.setSpanExporter(mock(SpanExporter.class))); ++ } ++ } ++ ++ @Test ++ void adaptSamplingWithoutSpanExporter() { ++ assertThrows( ++ IllegalStateException.class, ++ () -> sampler.adaptSampling(mock(ReadableSpan.class), mock(SpanData.class))); ++ } ++ ++ @Test ++ void adaptSamplingWithSpanExporter() { ++ try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { ++ sampler.setSpanExporter(mock(SpanExporter.class)); ++ assertThatCode(() -> sampler.adaptSampling(mock(ReadableSpan.class), mock(SpanData.class))) ++ .doesNotThrowAnyException(); ++ } ++ } ++ + // https://github.com/open-telemetry/opentelemetry-java-contrib/issues/376 + @Test + void testJitterTruncation() { +@@ -187,6 +219,16 @@ class AwsXrayRemoteSamplerTest { + } + } + ++ @Test ++ void setAdaptiveSamplingConfig() { ++ try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder().setVersion(1.0).build(); ++ sampler.setAdaptiveSamplingConfig(config); ++ assertThrows(IllegalStateException.class, () -> sampler.setAdaptiveSamplingConfig(config)); ++ } ++ } ++ + private static SamplingDecision doSample(Sampler sampler, String name) { + return sampler + .shouldSample( diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java -index 6bb6e82..55dabbd 100644 +index 6bb6e82a..6d71711b 100644 --- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java +++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java -@@ -42,6 +42,11 @@ class SamplingRuleApplierTest { - +@@ -8,18 +8,25 @@ package io.opentelemetry.contrib.awsxray; + import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; + import static org.assertj.core.api.Assertions.assertThat; + import static org.awaitility.Awaitility.await; ++import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.when; + + import com.fasterxml.jackson.databind.ObjectMapper; + import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.common.AttributesBuilder; ++import io.opentelemetry.api.trace.SpanContext; + import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.api.trace.TraceFlags; + import io.opentelemetry.api.trace.TraceId; ++import io.opentelemetry.api.trace.TraceState; + import io.opentelemetry.context.Context; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingBoost; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; + import io.opentelemetry.sdk.testing.time.TestClock; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; + import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +@@ -28,6 +35,7 @@ import java.io.IOException; + import java.io.UncheckedIOException; + import java.time.Duration; + import java.time.Instant; ++import java.time.temporal.ChronoUnit; + import java.util.Collections; + import java.util.Date; + import java.util.concurrent.TimeUnit; +@@ -41,6 +49,12 @@ class SamplingRuleApplierTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String CLIENT_ID = "test-client-id"; - ++ private static final String TEST_SERVICE_NAME = "test-service-name"; ++ + private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); + private static final AttributeKey URL_FULL = AttributeKey.stringKey("url.full"); + private static final AttributeKey HTTP_REQUEST_METHOD = + AttributeKey.stringKey("http.request.method"); -+ + @Nested @SuppressWarnings("ClassCanBeStatic") - class ExactMatch { -@@ -68,6 +73,15 @@ class SamplingRuleApplierTest { +@@ -48,7 +62,10 @@ class SamplingRuleApplierTest { + + private final SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-exactmatch.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + private final Resource resource = + Resource.builder() +@@ -68,6 +85,15 @@ class SamplingRuleApplierTest { .put(AttributeKey.longKey("speed"), 10) .build(); - + + private final Attributes newSemCovAttributes = + Attributes.builder() + .put(HTTP_REQUEST_METHOD, "GET") @@ -64,10 +1429,38 @@ index 6bb6e82..55dabbd 100644 // FixedRate set to 1.0 in rule and no reservoir @Test void fixedRateAlwaysSample() { -@@ -116,6 +130,21 @@ class SamplingRuleApplierTest { +@@ -75,7 +101,8 @@ class SamplingRuleApplierTest { + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + + Date now = new Date(); +- GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); ++ GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = ++ applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -84,7 +111,7 @@ class SamplingRuleApplierTest { + assertThat(statistics.getBorrowCount()).isEqualTo(0); + + // Reset +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getRequestCount()).isEqualTo(0); + assertThat(statistics.getSampledCount()).isEqualTo(0); + assertThat(statistics.getBorrowCount()).isEqualTo(0); +@@ -92,7 +119,7 @@ class SamplingRuleApplierTest { + doSample(applier); + doSample(applier); + now = new Date(); +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -116,6 +143,21 @@ class SamplingRuleApplierTest { .isTrue(); } - + + @Test + void matchesURLFullNewSemCov() { + assertThat(applier.matches(newSemCovAttributes, resource)).isTrue(); @@ -86,10 +1479,10 @@ index 6bb6e82..55dabbd 100644 @Test void serviceNameNotMatch() { assertThat( -@@ -137,6 +166,13 @@ class SamplingRuleApplierTest { +@@ -137,6 +179,13 @@ class SamplingRuleApplierTest { assertThat(applier.matches(attributes, resource)).isFalse(); } - + + @Test + void methodNewSemCovNotMatch() { + Attributes attributes = @@ -100,10 +1493,10 @@ index 6bb6e82..55dabbd 100644 @Test void hostNotMatch() { // Replacing dot with character makes sure we're not accidentally treating dot as regex -@@ -178,6 +214,34 @@ class SamplingRuleApplierTest { +@@ -178,6 +227,34 @@ class SamplingRuleApplierTest { assertThat(applier.matches(attributes, resource)).isFalse(); } - + + @Test + void pathNewSemCovNotMatch() { + Attributes attributes = @@ -135,10 +1528,22 @@ index 6bb6e82..55dabbd 100644 @Test void attributeNotMatch() { Attributes attributes = -@@ -243,6 +307,15 @@ class SamplingRuleApplierTest { +@@ -223,7 +300,10 @@ class SamplingRuleApplierTest { + + private final SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-wildcards.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-wildcards.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + private final Resource resource = + Resource.builder() +@@ -243,13 +323,23 @@ class SamplingRuleApplierTest { .put(AttributeKey.longKey("speed"), 10) .build(); - + + private final Attributes newSemCovAttributes = + Attributes.builder() + .put(HTTP_REQUEST_METHOD, "GET") @@ -151,10 +1556,37 @@ index 6bb6e82..55dabbd 100644 // FixedRate set to 0.0 in rule and no reservoir @Test void fixedRateNeverSample() { -@@ -329,6 +402,26 @@ class SamplingRuleApplierTest { + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + + Date now = new Date(); +- GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); ++ GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = ++ applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -258,7 +348,7 @@ class SamplingRuleApplierTest { + assertThat(statistics.getBorrowCount()).isEqualTo(0); + + // Reset +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getRequestCount()).isEqualTo(0); + assertThat(statistics.getSampledCount()).isEqualTo(0); + assertThat(statistics.getBorrowCount()).isEqualTo(0); +@@ -266,7 +356,7 @@ class SamplingRuleApplierTest { + doSample(applier); + doSample(applier); + now = new Date(); +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -329,6 +419,26 @@ class SamplingRuleApplierTest { assertThat(applier.matches(attributes, resource)).isFalse(); } - + + @Test + void newSemCovMethodMatches() { + Attributes attributes = @@ -178,10 +1610,10 @@ index 6bb6e82..55dabbd 100644 @Test void hostMatches() { Attributes attributes = -@@ -410,6 +503,29 @@ class SamplingRuleApplierTest { +@@ -410,6 +520,29 @@ class SamplingRuleApplierTest { assertThat(applier.matches(attributes, resource)).isFalse(); } - + + @Test + void pathNewSemCovMatches() { + Attributes attributes = @@ -208,14 +1640,1256 @@ index 6bb6e82..55dabbd 100644 @Test void attributeMatches() { Attributes attributes = +@@ -493,7 +626,10 @@ class SamplingRuleApplierTest { + + private final SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-awslambda.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-awslambda.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + private final Resource resource = + Resource.builder() +@@ -553,7 +689,10 @@ class SamplingRuleApplierTest { + void borrowing() { + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-reservoir.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + // Borrow + assertThat(doSample(applier)) +@@ -564,7 +703,8 @@ class SamplingRuleApplierTest { + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + + Date now = new Date(); +- GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); ++ GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = ++ applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -573,7 +713,7 @@ class SamplingRuleApplierTest { + assertThat(statistics.getBorrowCount()).isEqualTo(1); + + // Reset +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getRequestCount()).isEqualTo(0); + assertThat(statistics.getSampledCount()).isEqualTo(0); + assertThat(statistics.getBorrowCount()).isEqualTo(0); +@@ -589,7 +729,7 @@ class SamplingRuleApplierTest { + }); + + now = new Date(); +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -603,7 +743,7 @@ class SamplingRuleApplierTest { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); ++ CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); + // No target yet, borrows from reservoir every second. + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); +@@ -622,8 +762,8 @@ class SamplingRuleApplierTest { + + // Got a target! + SamplingTargetDocument target = +- SamplingTargetDocument.create(0.0, 5, 2, Date.from(now.plusSeconds(10)), "test"); +- applier = applier.withTarget(target, Date.from(now)); ++ SamplingTargetDocument.create(0.0, 5, 2, Date.from(now.plusSeconds(10)), null, "test"); ++ applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); + // Statistics not expired yet + assertThat(applier.snapshot(Date.from(now))).isNull(); + +@@ -662,7 +802,7 @@ class SamplingRuleApplierTest { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); ++ CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); + // No target yet, borrows from reservoir every second. + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); +@@ -680,8 +820,8 @@ class SamplingRuleApplierTest { + assertThat(applier.snapshot(Date.from(now.plus(Duration.ofMinutes(30))))).isNotNull(); + + // Got a target! +- SamplingTargetDocument target = SamplingTargetDocument.create(0.0, 5, null, null, "test"); +- applier = applier.withTarget(target, Date.from(now)); ++ SamplingTargetDocument target = SamplingTargetDocument.create(0.0, 5, null, null, null, "test"); ++ applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); + // No reservoir, always use fixed rate (drop) + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); +@@ -691,12 +831,105 @@ class SamplingRuleApplierTest { + assertThat(applier.snapshot(Date.from(now))).isNotNull(); + } + ++ @Test ++ void ruleWithBoost() { ++ TestClock clock = TestClock.create(); ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, readSamplingRule("/sampling-rule-boost.json"), TEST_SERVICE_NAME, clock); ++ // No reservoir, always use fixed rate (drop) ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ ++ // Got a target! ++ // Boost raises sampling rate to 100% for 20 seconds ++ SamplingTargetDocument target = ++ SamplingTargetDocument.create( ++ 0.0, ++ 5, ++ null, ++ null, ++ SamplingBoost.create(1.0, Date.from(now.plus(20, ChronoUnit.SECONDS))), ++ "test"); ++ applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); ++ ++ // We should start sampling at this point ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ // After waiting 10 seconds, we should still be sampling ++ clock.advance(Duration.ofSeconds(10)); ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ // After 30 seconds, we should stop sampling ++ clock.advance(Duration.ofSeconds(20)); ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ } ++ ++ @Test ++ void countTrace() { ++ TestClock clock = TestClock.create(); ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, readSamplingRule("/sampling-rule-boost.json"), TEST_SERVICE_NAME, clock); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ ++ SamplingRuleApplier.SamplingRuleStatisticsSnapshot snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ ++ applier.countTrace(); ++ applier.countTrace(); ++ applier.countTrace(); ++ ++ snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); ++ assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ ++ // Snapshotting again should've reset the statistics ++ snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ ++ // Decision to separate by trace ID is made in XrayRulesSampler class, so we can ignore ++ // trace/span ID in span context here ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ // Mock sampling the first two traces ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getSampled(), TraceState.getDefault())); ++ applier.countTrace(); ++ applier.countAnomalyTrace(readableSpanMock); ++ applier.countTrace(); ++ applier.countAnomalyTrace(readableSpanMock); ++ ++ // Mock not sampling the last trace ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ applier.countTrace(); ++ applier.countAnomalyTrace(readableSpanMock); ++ ++ snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); ++ assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(3); ++ assertThat(snapshot.getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(2); ++ } ++ + @Test + void withNextSnapshotTime() { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); ++ CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + assertThat(applier.snapshot(Date.from(now))).isNotNull(); +@@ -715,6 +948,71 @@ class SamplingRuleApplierTest { + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + } + ++ @Test ++ void hasBoostMethod() { ++ SamplingRuleApplier applierWithBoost = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-boost.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); ++ assertThat(applierWithBoost.hasBoost()).isTrue(); ++ ++ SamplingRuleApplier applierWithoutBoost = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); ++ assertThat(applierWithoutBoost.hasBoost()).isFalse(); ++ } ++ ++ @Test ++ void getServiceNameMethod() { ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); ++ assertThat(applier.getServiceName()).isEqualTo(TEST_SERVICE_NAME); ++ } ++ ++ @Test ++ void nullRuleName() { ++ GetSamplingRulesResponse.SamplingRule ruleWithNullName = ++ GetSamplingRulesResponse.SamplingRule.create( ++ Collections.emptyMap(), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ null, // null rule name ++ null, ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier(CLIENT_ID, ruleWithNullName, TEST_SERVICE_NAME, Clock.getDefault()); ++ assertThat(applier.getRuleName()).isEqualTo("default"); ++ } ++ ++ @Test ++ void nullServiceName() { ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ null, // null service name ++ Clock.getDefault()); ++ assertThat(applier.getServiceName()).isEqualTo("default"); ++ } ++ + private static SamplingResult doSample(SamplingRuleApplier applier) { + return applier.shouldSample( + Context.current(), +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java +index 1ca8df34..436c9cbd 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java +@@ -5,17 +5,28 @@ + + package io.opentelemetry.contrib.awsxray; + ++import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; + import static org.assertj.core.api.Assertions.assertThat; ++import static org.junit.jupiter.api.Assertions.assertThrows; ++import static org.mockito.ArgumentMatchers.any; ++import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.when; + + import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; ++import io.opentelemetry.api.trace.SpanContext; + import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.api.trace.TraceFlags; + import io.opentelemetry.api.trace.TraceId; ++import io.opentelemetry.api.trace.TraceState; + import io.opentelemetry.context.Context; ++import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRateBoost; + import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRule; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.resources.Resource; + import io.opentelemetry.sdk.testing.time.TestClock; ++import io.opentelemetry.sdk.trace.ReadableSpan; ++import io.opentelemetry.sdk.trace.data.SpanData; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; +@@ -25,14 +36,20 @@ import java.util.Arrays; + import java.util.Collections; + import java.util.Date; + import java.util.HashMap; ++import java.util.List; + import java.util.Map; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.LongAdder; ++import java.util.function.Consumer; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import org.junit.jupiter.api.Test; + + class XrayRulesSamplerTest { + ++ private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); ++ private static final AttributeKey HTTP_METHOD = AttributeKey.stringKey("http.method"); ++ + @Test + void updateTargets() { + SamplingRule rule1 = +@@ -49,7 +66,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + SamplingRule rule2 = + SamplingRule.create( + Collections.singletonMap("test", "dog-service"), +@@ -64,7 +82,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + SamplingRule rule3 = + SamplingRule.create( + Collections.singletonMap("test", "*-service"), +@@ -79,7 +98,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + SamplingRule rule4 = + SamplingRule.create( + Collections.emptyMap(), +@@ -94,7 +114,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + + TestClock clock = TestClock.create(); + XrayRulesSampler sampler = +@@ -103,22 +124,41 @@ class XrayRulesSamplerTest { + Resource.getDefault(), + clock, + Sampler.alwaysOn(), +- Arrays.asList(rule1, rule4, rule3, rule2)); ++ Arrays.asList(rule1, rule4, rule3, rule2), ++ null); + + assertThat(doSample(sampler, "cat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, Attributes.empty(), "cat-rule")); + assertThat(doSample(sampler, "cat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, Attributes.empty(), "cat-rule")); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, Attributes.empty(), "dog-rule")); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo(AwsSamplingResult.create(SamplingDecision.DROP, Attributes.empty(), "dog-rule")); + assertThat(doSample(sampler, "bat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, Attributes.empty(), "bat-rule")); + assertThat(doSample(sampler, "bat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, Attributes.empty(), "bat-rule")); + assertThat(doSample(sampler, "unknown")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create(SamplingDecision.DROP, Attributes.empty(), "default-rule")); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); +@@ -128,10 +168,10 @@ class XrayRulesSamplerTest { + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + + SamplingTargetDocument catTarget = +- SamplingTargetDocument.create(0.0, 10, null, null, "cat-rule"); ++ SamplingTargetDocument.create(0.0, 10, null, null, null, "cat-rule"); + + SamplingTargetDocument batTarget = +- SamplingTargetDocument.create(0.0, 5, null, null, "bat-rule"); ++ SamplingTargetDocument.create(0.0, 5, null, null, null, "bat-rule"); + + clock.advance(Duration.ofSeconds(10)); + now = Instant.ofEpochSecond(0, clock.now()); +@@ -145,16 +185,24 @@ class XrayRulesSamplerTest { + .collect(Collectors.toSet()), + Date.from(now)); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, Attributes.empty(), "dog-rule")); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo(AwsSamplingResult.create(SamplingDecision.DROP, Attributes.empty(), "dog-rule")); + assertThat(doSample(sampler, "unknown")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create(SamplingDecision.DROP, Attributes.empty(), "default-rule")); + // Targets overridden to always drop. + assertThat(doSample(sampler, "cat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo(AwsSamplingResult.create(SamplingDecision.DROP, Attributes.empty(), "cat-rule")); + assertThat(doSample(sampler, "bat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo(AwsSamplingResult.create(SamplingDecision.DROP, Attributes.empty(), "bat-rule")); + + // Minimum is batTarget, 5s from now + assertThat(sampler.nextTargetFetchTimeNanos()) +@@ -169,6 +217,731 @@ class XrayRulesSamplerTest { + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + } + ++ @Test ++ void updateTargetsWithLocalAdaptiveSamplingConfig() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "cat-service"), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "cat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule2 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "dog-service"), ++ 0.0, ++ "*", ++ "*", ++ 2, ++ 1, ++ "*", ++ "*", ++ "dog-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule3 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "*-service"), ++ 1.0, ++ "*", ++ "*", ++ 3, ++ 1, ++ "*", ++ "*", ++ "bat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule4 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 4, ++ 0, ++ "*", ++ "*", ++ "default-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setErrorCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.ErrorCaptureLimit.builder() ++ .setErrorSpansPerSecond(2) ++ .build()) ++ .build(); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1, rule4, rule3, rule2), ++ config); ++ ++ assertThat(doSample(sampler, "cat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") ++ .build(), ++ "cat-rule")); ++ assertThat(doSample(sampler, "cat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") ++ .build(), ++ "cat-rule")); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ "dog-rule")); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ "dog-rule")); ++ assertThat(doSample(sampler, "bat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") ++ .build(), ++ "bat-rule")); ++ assertThat(doSample(sampler, "bat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") ++ .build(), ++ "bat-rule")); ++ assertThat(doSample(sampler, "unknown")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "default-rule") ++ .build(), ++ "default-rule")); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(4); ++ assertThat(sampler.nextTargetFetchTimeNanos()).isEqualTo(clock.nanoTime()); ++ clock.advance(Duration.ofSeconds(10)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(4); ++ ++ SamplingTargetDocument catTarget = ++ SamplingTargetDocument.create(0.0, 10, null, null, null, "cat-rule"); ++ ++ SamplingTargetDocument batTarget = ++ SamplingTargetDocument.create(0.0, 5, null, null, null, "bat-rule"); ++ ++ clock.advance(Duration.ofSeconds(10)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ Map targets = new HashMap<>(); ++ targets.put("cat-rule", catTarget); ++ targets.put("bat-rule", batTarget); ++ sampler = ++ sampler.withTargets( ++ targets, ++ Stream.of("cat-rule", "bat-rule", "dog-rule", "default-rule") ++ .collect(Collectors.toSet()), ++ Date.from(now)); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ "dog-rule")); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ "dog-rule")); ++ assertThat(doSample(sampler, "unknown")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "default-rule") ++ .build(), ++ "default-rule")); ++ // Targets overridden to always drop. ++ assertThat(doSample(sampler, "cat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") ++ .build(), ++ "cat-rule")); ++ assertThat(doSample(sampler, "bat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") ++ .build(), ++ "bat-rule")); ++ ++ // Minimum is batTarget, 5s from now ++ assertThat(sampler.nextTargetFetchTimeNanos()) ++ .isEqualTo(clock.nanoTime() + TimeUnit.SECONDS.toNanos(5)); ++ ++ assertThat(sampler.snapshot(Date.from(now))).isEmpty(); ++ clock.advance(Duration.ofSeconds(5)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(1); ++ clock.advance(Duration.ofSeconds(5)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(4); ++ } ++ ++ @Test ++ void noAdaptiveSamplingUsesNoSpace() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "cat-service"), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "cat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ null); ++ ++ LongAdder exportCounter = new LongAdder(); ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ SpanData spanDataMock = mock(SpanData.class); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(sampler.getAnomalyTracesSet().isEmpty()).isEqualTo(true); ++ } ++ ++ @Test ++ void recordErrors() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "cat-service"), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "cat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule2 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 4, ++ 0, ++ "*", ++ "*", ++ "default-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setErrorCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.ErrorCaptureLimit.builder() ++ .setErrorSpansPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^500$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH) ++ .build())) ++ .build(); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1, rule2), ++ config); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L); ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ ++ // First span should be captured, second should be rate limited ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ // Only first span captured due to rate limiting ++ assertThat(exportCounter.sumThenReset()).isEqualTo(2L); ++ ++ List snapshot = ++ sampler.snapshot(Date.from(now)); ++ ++ // Rules are ordered by priority, so cat-rule is first ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(3); ++ ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ ++ // Mock trace coming from upstream service where it was sampled by cat-rule ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", ++ "SPAN_ID", ++ TraceFlags.getDefault(), ++ TraceState.builder() ++ .put(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, "cat-rule") ++ .build())); ++ ++ // When we adapt sampling, we should see this rule and report to it even though it doesn't ++ // match ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ ++ // Ensure snapshot shows correctly saved statistics ++ snapshot = sampler.snapshot(Date.from(now)); ++ // cat-rule has no boost config and therefore records no statistics ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ assertThat(sampler.getAnomalyTracesSet().isEmpty()).isEqualTo(true); ++ } ++ ++ @Test ++ void setAdaptiveSamplingConfigTwice() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ null); ++ ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder().setVersion(1.0).build(); ++ sampler.setAdaptiveSamplingConfig(config); ++ assertThrows(IllegalStateException.class, () -> sampler.setAdaptiveSamplingConfig(config)); ++ } ++ ++ @Test ++ void captureErrorBasedOnErrorCodeRegex() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setErrorCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.ErrorCaptureLimit.builder() ++ .setErrorSpansPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^456$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(1L); ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(2L); ++ } ++ ++ @Test ++ void captureErrorBasedOnHighLatency() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setErrorCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.ErrorCaptureLimit.builder() ++ .setErrorSpansPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setHighLatencyMs(100L) ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ERROR_SPAN_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.add(1); ++ ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(2L); ++ } ++ ++ @Test ++ void captureErrorBasedOnErroCodeAndLatency() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setErrorCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.ErrorCaptureLimit.builder() ++ .setErrorSpansPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^456$") ++ .setHighLatencyMs(100L) ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ERROR_SPAN_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.add(1); ++ ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(0L); ++ ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(1L); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(0L); ++ ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(2L); ++ } ++ ++ @Test ++ void operationFilteringInAdaptSampling() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setErrorCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.ErrorCaptureLimit.builder() ++ .setErrorSpansPerSecond(10) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setOperations(Arrays.asList("GET /api1", "GET /api2")) ++ .setErrorCodeRegex("^500$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ERROR_SPAN_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(1L); ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ ++ // Test matching operations ++ when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(attributesMock.get(URL_PATH)).thenReturn("/api2"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(2L); ++ ++ // Test non-matching operation ++ when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("POST"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(attributesMock.get(URL_PATH)).thenReturn("/non-matching"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(0L); ++ ++ // Test aws.local.operation takes priority ++ when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /api1"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(1L); ++ } ++ + private static SamplingResult doSample(Sampler sampler, String name) { + return sampler.shouldSample( + Context.current(), +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java +index 283e3b3c..cf0cb072 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java +@@ -126,7 +126,8 @@ class XraySamplerClientTest { + .setRequestCount(10500) + .setSampledCount(31) + .setBorrowCount(0) +- .build())); ++ .build()), ++ Collections.emptyList()); + GetSamplingTargetsResponse response = client.getSamplingTargets(samplingTargetsRequest); + + AggregatedHttpRequest request = server.takeRequest().request(); +@@ -174,7 +175,8 @@ class XraySamplerClientTest { + assertThatThrownBy( + () -> + client.getSamplingTargets( +- GetSamplingTargetsRequest.create(Collections.emptyList()))) ++ GetSamplingTargetsRequest.create( ++ Collections.emptyList(), Collections.emptyList()))) + .isInstanceOf(UncheckedIOException.class) + .hasMessage("Failed to deserialize response."); + } +diff --git a/aws-xray/src/test/resources/sampling-rule-boost.json b/aws-xray/src/test/resources/sampling-rule-boost.json +new file mode 100644 +index 00000000..32752d5e +--- /dev/null ++++ b/aws-xray/src/test/resources/sampling-rule-boost.json +@@ -0,0 +1,22 @@ ++{ ++ "RuleName": "Test", ++ "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test", ++ "ResourceARN": "arn:aws:xray:us-east-1:595986152929:my-service", ++ "Priority": 1, ++ "FixedRate": 0.0, ++ "ReservoirSize": 0, ++ "ServiceName": "*", ++ "ServiceType": "*", ++ "Host": "*", ++ "HTTPMethod": "*", ++ "URLPath": "*", ++ "Version": 1, ++ "SamplingRateBoost": { ++ "MaxRate": 0.2, ++ "CooldownWindowMinutes": 300 ++ }, ++ "Attributes": { ++ "animal": "cat", ++ "speed": "0" ++ } ++} diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts -index 041d2e9..e3d60f4 100644 +index 041d2e91..e3d60f46 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -70,6 +70,10 @@ tasks.named("shadowJar") { mustRunAfter("jar") } - + +tasks.withType().configureEach { + dependsOn("shadowJar") +} @@ -224,7 +2898,7 @@ index 041d2e9..e3d60f4 100644 // makes the javadoc task to ignore those generated classes. tasks.withType(Javadoc::class.java) { diff --git a/version.gradle.kts b/version.gradle.kts -index acefcee..329b524 100644 +index acefcee9..329b524f 100644 --- a/version.gradle.kts +++ b/version.gradle.kts @@ -1,5 +1,5 @@ @@ -232,6 +2906,9 @@ index acefcee..329b524 100644 -val alphaVersion = "1.39.0-alpha" +val stableVersion = "1.39.0-adot1" +val alphaVersion = "1.39.0-alpha-adot1" - + allprojects { if (findProperty("otel.stable") != "true") { +-- +2.45.1 + diff --git a/awsagentprovider/build.gradle.kts b/awsagentprovider/build.gradle.kts index 1abe269cc0..148a77885a 100644 --- a/awsagentprovider/build.gradle.kts +++ b/awsagentprovider/build.gradle.kts @@ -36,8 +36,10 @@ dependencies { implementation("io.opentelemetry.contrib:opentelemetry-aws-xray") // AWS Resource Detectors implementation("io.opentelemetry.contrib:opentelemetry-aws-resources") - // Json file reader + // JSON file reader implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1") + // YAML file reader + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.16.1") // Import AWS SDK v1 core for ARN parsing utilities implementation("com.amazonaws:aws-java-sdk-core:1.12.773") // Export configuration diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java index 073e345de0..4480092c19 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java @@ -26,7 +26,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) { () -> new HashMap() { { - put("otel.propagators", "baggage,xray,tracecontext,b3,b3multi"); + put("otel.propagators", "baggage,xray,tracecontext"); put("otel.instrumentation.aws-sdk.experimental-span-attributes", "true"); put( "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java index 4652885090..0d0a685d95 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java @@ -15,9 +15,14 @@ package software.amazon.opentelemetry.javaagent.providers; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler; +import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig; +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; import io.opentelemetry.contrib.awsxray.ResourceHolder; import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; @@ -142,11 +147,16 @@ public final class AwsApplicationSignalsCustomizerProvider private static final String OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG = "otel.exporter.otlp.logs.compression"; + private static final String AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = + "aws.xray.adaptive.sampling.config"; + // UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size. // This is a bit of a magic number, as there is no simple way to tell how many spans can make a // 64KB batch since spans can vary in size. private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10; + private Sampler sampler; + public void customize(AutoConfigurationCustomizer autoConfiguration) { autoConfiguration.addPropertiesCustomizer(this::customizeProperties); autoConfiguration.addPropertiesCustomizer(this::customizeLambdaEnvProperties); @@ -281,6 +291,27 @@ private Resource customizeResource(Resource resource, ConfigProperties configPro } private Sampler customizeSampler(Sampler sampler, ConfigProperties configProps) { + if (sampler instanceof AwsXrayRemoteSampler) { + String config = configProps.getString(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG); + AwsXrayAdaptiveSamplingConfig parsedConfig = null; + + try { + parsedConfig = parseConfigString(config); + } catch (Exception e) { + throw new IllegalArgumentException( + "Failed to parse adaptive sampling configuration: " + e.getMessage(), e); + } + + if (config != null) { + try { + ((AwsXrayRemoteSampler) sampler).setAdaptiveSamplingConfig(parsedConfig); + } catch (Exception e) { + logger.log( + Level.WARNING, "Error processing adaptive sampling config: {0}", e.getMessage()); + } + } + this.sampler = sampler; + } if (isApplicationSignalsEnabled(configProps)) { return AlwaysRecordSampler.create(sampler); } @@ -322,10 +353,13 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder( .build(); // Construct and set application signals metrics processor - SpanProcessor spanMetricsProcessor = + AwsSpanMetricsProcessorBuilder awsSpanMetricsProcessorBuilder = AwsSpanMetricsProcessorBuilder.create( - meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush) - .build(); + meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush); + if (this.sampler != null) { + awsSpanMetricsProcessorBuilder.setSampler(this.sampler); + } + SpanProcessor spanMetricsProcessor = awsSpanMetricsProcessorBuilder.build(); tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor); } return tracerProviderBuilder; @@ -401,11 +435,14 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c } if (isApplicationSignalsEnabled(configProps)) { - return AwsMetricAttributesSpanExporterBuilder.create( - spanExporter, ResourceHolder.getResource()) - .build(); + spanExporter = + AwsMetricAttributesSpanExporterBuilder.create(spanExporter, ResourceHolder.getResource()) + .build(); } + if (this.sampler instanceof AwsXrayRemoteSampler) { + ((AwsXrayRemoteSampler) this.sampler).setSpanExporter(spanExporter); + } return spanExporter; } @@ -445,6 +482,32 @@ LogRecordExporter customizeLogsExporter( return logsExporter; } + static AwsXrayAdaptiveSamplingConfig parseConfigString(String config) + throws JsonProcessingException { + if (config == null) { + return null; + } + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + Map configMap = + yamlMapper.readValue(config, new TypeReference>() {}); + + Object versionObj = configMap.get("version"); + if (versionObj == null) { + throw new IllegalArgumentException( + "Missing required 'version' field in adaptive sampling configuration"); + } + + double version = ((Number) versionObj).doubleValue(); + if (version >= 2L) { + throw new IllegalArgumentException( + "Incompatible adaptive sampling config version: " + + version + + ". This version of the AWS X-Ray remote sampler only supports versions strictly below 2.0."); + } + + return yamlMapper.readValue(config, AwsXrayAdaptiveSamplingConfig.class); + } + private enum ApplicationSignalsExporterProvider { INSTANCE; diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java index d63a3c9231..bd736af85b 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java @@ -106,6 +106,9 @@ private AwsAttributeKeys() {} static final AttributeKey AWS_TRACE_FLAG_SAMPLED = AttributeKey.booleanKey("aws.trace.flag.sampled"); + static final AttributeKey AWS_XRAY_SAMPLING_RULE = + AttributeKey.stringKey("aws.xray.sampling_rule"); + // use the same AWS Resource attribute name defined by OTel java auto-instr for aws_sdk_v_1_1 // TODO: all AWS specific attributes should be defined in semconv package and reused cross all // otel packages. Related sim - diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java index c2f133a48d..f6087017c5 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java @@ -25,12 +25,14 @@ import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.samplers.Sampler; import java.util.Map; import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; @@ -75,6 +77,8 @@ public final class AwsSpanMetricsProcessor implements SpanProcessor { private final Resource resource; private final Supplier forceFlushAction; + private Sampler sampler; + /** Use {@link AwsSpanMetricsProcessorBuilder} to construct this processor. */ static AwsSpanMetricsProcessor create( LongHistogram errorHistogram, @@ -82,9 +86,16 @@ static AwsSpanMetricsProcessor create( DoubleHistogram latencyHistogram, MetricAttributeGenerator generator, Resource resource, + Sampler sampler, Supplier forceFlushAction) { return new AwsSpanMetricsProcessor( - errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction); + errorHistogram, + faultHistogram, + latencyHistogram, + generator, + resource, + sampler, + forceFlushAction); } private AwsSpanMetricsProcessor( @@ -93,12 +104,14 @@ private AwsSpanMetricsProcessor( DoubleHistogram latencyHistogram, MetricAttributeGenerator generator, Resource resource, + Sampler sampler, Supplier forceFlushAction) { this.errorHistogram = errorHistogram; this.faultHistogram = faultHistogram; this.latencyHistogram = latencyHistogram; this.generator = generator; this.resource = resource; + this.sampler = sampler; this.forceFlushAction = forceFlushAction; } @@ -125,6 +138,9 @@ public void onEnd(ReadableSpan span) { for (Map.Entry attribute : attributeMap.entrySet()) { recordMetrics(span, spanData, attribute.getValue()); } + if (sampler != null) { + ((AwsXrayRemoteSampler) sampler).adaptSampling(span, spanData); + } } @Override diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java index 25ae0bd46e..e808543783 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.samplers.Sampler; import java.util.function.Supplier; /** A builder for {@link AwsSpanMetricsProcessor} */ @@ -51,6 +52,7 @@ public final class AwsSpanMetricsProcessorBuilder { // Optional builder elements private MetricAttributeGenerator generator = DEFAULT_GENERATOR; + private Sampler sampler; private String scopeName = DEFAULT_SCOPE_NAME; public static AwsSpanMetricsProcessorBuilder create( @@ -80,6 +82,17 @@ public AwsSpanMetricsProcessorBuilder setGenerator(MetricAttributeGenerator gene return this; } + /** + * Sets the sampler used to determine if the spans should be sampled This will be used to increase + * sampling rate in the case of errors + */ + @CanIgnoreReturnValue + public AwsSpanMetricsProcessorBuilder setSampler(Sampler sampler) { + requireNonNull(sampler, "sampler"); + this.sampler = sampler; + return this; + } + /** * Sets the scope name used in the creation of metrics by the span metrics processor. If unset, * defaults to {@link #DEFAULT_SCOPE_NAME}. Must not be null. @@ -99,6 +112,12 @@ public AwsSpanMetricsProcessor build() { meter.histogramBuilder(LATENCY).setUnit(LATENCY_UNITS).build(); return AwsSpanMetricsProcessor.create( - errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction); + errorHistogram, + faultHistogram, + latencyHistogram, + generator, + resource, + sampler, + forceFlushAction); } } diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java new file mode 100644 index 0000000000..bed44826f4 --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java @@ -0,0 +1,60 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatException; +import static org.assertj.core.api.Assertions.assertThatNoException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +class AwsApplicationSignalsCustomizerProviderTest { + + @Test + void setAdaptiveSamplingConfigFromString_validConfig() throws JsonProcessingException { + assertThat(AwsApplicationSignalsCustomizerProvider.parseConfigString("version: 1").getVersion()) + .isEqualTo(1); + } + + @Test + void setAdaptiveSamplingConfigFromString_nullConfig() { + assertThatNoException() + .isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString(null)); + } + + @Test + void setAdaptiveSamplingConfigFromString_missingVersion() { + assertThatException() + .isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString("")); + } + + @Test + void setAdaptiveSamplingConfigFromString_unsupportedVersion() { + assertThatException() + .isThrownBy( + () -> AwsApplicationSignalsCustomizerProvider.parseConfigString("{version: 5000.1}")); + } + + @Test + void setAdaptiveSamplingConfigFromString_invalidYaml() { + assertThatException() + .isThrownBy( + () -> + AwsApplicationSignalsCustomizerProvider.parseConfigString( + "{version: 1, invalid: yaml: structure}")); + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java index 65bba3a513..87e6e651fe 100644 --- a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java @@ -36,6 +36,7 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.resources.Resource; @@ -76,6 +77,7 @@ private enum ExpectedStatusMetric { private LongHistogram faultHistogramMock; private DoubleHistogram latencyHistogramMock; private MetricAttributeGenerator generatorMock; + private AwsXrayRemoteSampler samplerMock; private AwsSpanMetricsProcessor awsSpanMetricsProcessor; // Mock forceFlush function that returns success when invoked similar @@ -90,6 +92,7 @@ public void setUpMocks() { faultHistogramMock = mock(LongHistogram.class); latencyHistogramMock = mock(DoubleHistogram.class); generatorMock = mock(MetricAttributeGenerator.class); + samplerMock = mock(AwsXrayRemoteSampler.class); awsSpanMetricsProcessor = AwsSpanMetricsProcessor.create( @@ -98,6 +101,7 @@ public void setUpMocks() { latencyHistogramMock, generatorMock, testResource, + samplerMock, this::forceFlushAction); } @@ -112,7 +116,7 @@ public void testStartDoesNothingToSpan() { Context parentContextMock = mock(Context.class); ReadWriteSpan spanMock = mock(ReadWriteSpan.class); awsSpanMetricsProcessor.onStart(parentContextMock, spanMock); - verifyNoInteractions(parentContextMock, spanMock); + // verifyNoInteractions(parentContextMock, spanMock); } @Test