diff --git a/.github/patches/opentelemetry-java-contrib.patch b/.github/patches/opentelemetry-java-contrib.patch new file mode 100644 index 0000000000..718fa85de4 --- /dev/null +++ b/.github/patches/opentelemetry-java-contrib.patch @@ -0,0 +1,3116 @@ +diff --git a/.github/renovate.json5 b/.github/renovate.json5 +index 4f7743a3..9e2082ed 100644 +--- a/.github/renovate.json5 ++++ b/.github/renovate.json5 +@@ -176,5 +176,27 @@ + 'npx (?[^@]+)@(?[^\\s]+)', + ], + }, ++ { ++ customType: 'regex', ++ datasourceTemplate: 'java-version', ++ managerFilePatterns: [ ++ '.github/workflows/**', ++ ], ++ matchStrings: [ ++ '(?\\d+) # renovate: datasource=java-version', ++ ], ++ depNameTemplate: 'java', ++ extractVersionTemplate: '^(?\\d+)', ++ }, ++ { ++ customType: 'regex', ++ datasourceTemplate: 'github-releases', ++ managerFilePatterns: [ ++ '**/build.gradle.kts', ++ ], ++ matchStrings: [ ++ '"https://github.com/(?[^/]+/[^/]+)/zipball/(?.+?)"', ++ ], ++ }, + ], + } +diff --git a/aws-xray/build.gradle.kts b/aws-xray/build.gradle.kts +index 54dabba7..d56b12bd 100644 +--- a/aws-xray/build.gradle.kts ++++ b/aws-xray/build.gradle.kts +@@ -11,6 +11,7 @@ dependencies { + api("io.opentelemetry:opentelemetry-sdk-trace") + + compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") ++ implementation("io.opentelemetry.semconv:opentelemetry-semconv:1.32.0-alpha") + + implementation("com.squareup.okhttp3:okhttp") + implementation("io.opentelemetry.semconv:opentelemetry-semconv") +@@ -25,6 +26,7 @@ dependencies { + + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") ++ implementation("com.github.ben-manes.caffeine:caffeine:2.9.3") + + testImplementation("com.linecorp.armeria:armeria-junit5") + testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java +new file mode 100644 +index 00000000..41f22f90 +--- /dev/null ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java +@@ -0,0 +1,54 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.contrib.awsxray; ++ ++import io.opentelemetry.api.common.Attributes; ++import io.opentelemetry.api.trace.TraceState; ++import io.opentelemetry.sdk.trace.samplers.SamplingDecision; ++import io.opentelemetry.sdk.trace.samplers.SamplingResult; ++ ++final class AwsSamplingResult implements SamplingResult { ++ ++ // OTel trace state is a space shared with other vendors with a 256 character limit ++ // We keep the key and values as short as possible while still identifiable ++ public static final String AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY = "xrsr"; ++ ++ private final SamplingDecision decision; ++ private final Attributes attributes; ++ private final String samplingRuleName; ++ ++ private AwsSamplingResult( ++ SamplingDecision decision, Attributes attributes, String samplingRuleName) { ++ this.decision = decision; ++ this.attributes = attributes; ++ this.samplingRuleName = samplingRuleName; ++ } ++ ++ static AwsSamplingResult create( ++ SamplingDecision decision, Attributes attributes, String samplingRuleName) { ++ return new AwsSamplingResult(decision, attributes, samplingRuleName); ++ } ++ ++ @Override ++ public SamplingDecision getDecision() { ++ return decision; ++ } ++ ++ @Override ++ public Attributes getAttributes() { ++ return attributes; ++ } ++ ++ @Override ++ public TraceState getUpdatedTraceState(TraceState parentTraceState) { ++ if (parentTraceState.get(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY) == null) { ++ return parentTraceState.toBuilder() ++ .put(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, samplingRuleName) ++ .build(); ++ } ++ return parentTraceState; ++ } ++} +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java +new file mode 100644 +index 00000000..dc5b7a01 +--- /dev/null ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java +@@ -0,0 +1,148 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.contrib.awsxray; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++import com.fasterxml.jackson.annotation.JsonValue; ++import com.fasterxml.jackson.databind.annotation.JsonDeserialize; ++import com.fasterxml.jackson.databind.annotation.JsonSerialize; ++import com.google.auto.value.AutoValue; ++import java.util.List; ++import javax.annotation.Nullable; ++ ++@AutoValue ++@JsonSerialize(as = AwsXrayAdaptiveSamplingConfig.class) ++@JsonDeserialize(builder = AutoValue_AwsXrayAdaptiveSamplingConfig.Builder.class) ++public abstract class AwsXrayAdaptiveSamplingConfig { ++ ++ @JsonProperty("version") ++ public abstract double getVersion(); ++ ++ @JsonProperty("anomalyConditions") ++ @Nullable ++ public abstract List getAnomalyConditions(); ++ ++ @JsonProperty("anomalyCaptureLimit") ++ @Nullable ++ public abstract AnomalyCaptureLimit getAnomalyCaptureLimit(); ++ ++ public static Builder builder() { ++ return new AutoValue_AwsXrayAdaptiveSamplingConfig.Builder(); ++ } ++ ++ @AutoValue.Builder ++ public abstract static class Builder { ++ @JsonProperty("version") ++ public abstract Builder setVersion(double value); ++ ++ @JsonProperty("anomalyConditions") ++ public abstract Builder setAnomalyConditions(List value); ++ ++ @JsonProperty("anomalyCaptureLimit") ++ public abstract Builder setAnomalyCaptureLimit(AnomalyCaptureLimit value); ++ ++ public abstract AwsXrayAdaptiveSamplingConfig build(); ++ } ++ ++ @AutoValue ++ @JsonDeserialize( ++ builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder.class) ++ public abstract static class AnomalyConditions { ++ @JsonProperty("errorCodeRegex") ++ @Nullable ++ public abstract String getErrorCodeRegex(); ++ ++ @JsonProperty("operations") ++ @Nullable ++ public abstract List getOperations(); ++ ++ @JsonProperty("highLatencyMs") ++ @Nullable ++ public abstract Long getHighLatencyMs(); ++ ++ @JsonProperty("usage") ++ @Nullable ++ public abstract UsageType getUsage(); ++ ++ public static Builder builder() { ++ return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder(); ++ } ++ ++ @AutoValue.Builder ++ public abstract static class Builder { ++ @JsonProperty("errorCodeRegex") ++ public abstract Builder setErrorCodeRegex(String value); ++ ++ @JsonProperty("operations") ++ public abstract Builder setOperations(List value); ++ ++ @JsonProperty("highLatencyMs") ++ public abstract Builder setHighLatencyMs(Long value); ++ ++ @JsonProperty("usage") ++ public abstract Builder setUsage(UsageType value); ++ ++ public abstract AnomalyConditions build(); ++ } ++ } ++ ++ public enum UsageType { ++ BOTH("both"), ++ SAMPLING_BOOST("sampling-boost"), ++ ANOMALY_TRACE_CAPTURE("anomaly-trace-capture"), ++ NEITHER("neither"); // Not meant to be used by customers ++ ++ private final String value; ++ ++ UsageType(String value) { ++ this.value = value; ++ } ++ ++ @JsonValue ++ public String getValue() { ++ return value; ++ } ++ ++ @JsonCreator ++ public static UsageType fromValue(String value) { ++ for (UsageType type : values()) { ++ if (type.value.equals(value)) { ++ return type; ++ } ++ } ++ throw new IllegalArgumentException("Invalid usage value: " + value); ++ } ++ ++ public static boolean isUsedForBoost(UsageType usage) { ++ return BOTH.equals(usage) || SAMPLING_BOOST.equals(usage); ++ } ++ ++ public static boolean isUsedForAnomalyTraceCapture(UsageType usage) { ++ return BOTH.equals(usage) || ANOMALY_TRACE_CAPTURE.equals(usage); ++ } ++ } ++ ++ @AutoValue ++ @JsonDeserialize( ++ builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyCaptureLimit.Builder.class) ++ public abstract static class AnomalyCaptureLimit { ++ @JsonProperty("anomalyTracesPerSecond") ++ public abstract int getAnomalyTracesPerSecond(); ++ ++ public static Builder builder() { ++ return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyCaptureLimit.Builder(); ++ } ++ ++ @AutoValue.Builder ++ public abstract static class Builder { ++ @JsonProperty("anomalyTracesPerSecond") ++ public abstract Builder setAnomalyTracesPerSecond(int value); ++ ++ public abstract AnomalyCaptureLimit build(); ++ } ++ } ++} +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java +index ad9b72a2..7864f358 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java +@@ -9,16 +9,22 @@ import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.trace.SpanKind; + import io.opentelemetry.context.Context; + import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRuleRecord; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.data.LinkData; ++import io.opentelemetry.sdk.trace.data.SpanData; ++import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; ++import io.opentelemetry.sdk.trace.export.SpanExporter; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; + import java.io.Closeable; + import java.time.Duration; + import java.time.Instant; ++import java.util.ArrayList; + import java.util.Date; + import java.util.Iterator; + import java.util.List; +@@ -43,6 +49,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + + private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName()); + ++ // Default batch size to be same as OTel BSP default ++ private static final int maxExportBatchSize = 512; ++ + private final Resource resource; + private final Clock clock; + private final Sampler initialSampler; +@@ -59,6 +68,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + @Nullable private volatile XrayRulesSampler internalXrayRulesSampler; + private volatile Sampler sampler; + ++ @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig; ++ @Nullable private BatchSpanProcessor bsp; ++ + /** + * Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link + * Resource} should be the same as what the OpenTelemetry SDK is configured with. +@@ -120,6 +132,40 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}"; + } + ++ public void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) { ++ if (this.adaptiveSamplingConfig != null) { ++ throw new IllegalStateException("Programming bug - Adaptive sampling config is already set"); ++ } else if (config != null && this.adaptiveSamplingConfig == null) { ++ // Save here and also pass to XrayRulesSampler directly as it already exists ++ this.adaptiveSamplingConfig = config; ++ if (internalXrayRulesSampler != null) { ++ internalXrayRulesSampler.setAdaptiveSamplingConfig(config); ++ } ++ } ++ } ++ ++ public void setSpanExporter(SpanExporter spanExporter) { ++ if (this.bsp != null) { ++ throw new IllegalStateException("Programming bug - BatchSpanProcessor is already set"); ++ } else if (spanExporter != null && this.bsp == null) { ++ this.bsp = ++ BatchSpanProcessor.builder(spanExporter) ++ .setExportUnsampledSpans(true) // Required to capture the unsampled anomaly spans ++ .setMaxExportBatchSize(maxExportBatchSize) ++ .build(); ++ } ++ } ++ ++ public void adaptSampling(ReadableSpan span, SpanData spanData) { ++ if (this.bsp == null) { ++ throw new IllegalStateException( ++ "Programming bug - BatchSpanProcessor is null while trying to adapt sampling"); ++ } ++ if (internalXrayRulesSampler != null) { ++ internalXrayRulesSampler.adaptSampling(span, spanData, this.bsp::onEnd); ++ } ++ } ++ + private void getAndUpdateSampler() { + try { + // No pagination support yet, or possibly ever. +@@ -134,8 +180,8 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + initialSampler, + response.getSamplingRules().stream() + .map(SamplingRuleRecord::getRule) +- .collect(Collectors.toList()))); +- ++ .collect(Collectors.toList()), ++ adaptiveSamplingConfig)); + previousRulesResponse = response; + ScheduledFuture existingFetchTargetsFuture = fetchTargetsFuture; + if (existingFetchTargetsFuture != null) { +@@ -179,14 +225,29 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { + XrayRulesSampler xrayRulesSampler = this.internalXrayRulesSampler; + try { + Date now = Date.from(Instant.ofEpochSecond(0, clock.now())); +- List statistics = xrayRulesSampler.snapshot(now); ++ List statisticsSnapshot = ++ xrayRulesSampler.snapshot(now); ++ List statistics = new ArrayList(); ++ List boostStatistics = ++ new ArrayList(); ++ statisticsSnapshot.stream() ++ .forEach( ++ snapshot -> { ++ if (snapshot.getStatisticsDocument() != null) { ++ statistics.add(snapshot.getStatisticsDocument()); ++ } ++ if (snapshot.getBoostStatisticsDocument() != null ++ && snapshot.getBoostStatisticsDocument().getTotalCount() > 0) { ++ boostStatistics.add(snapshot.getBoostStatisticsDocument()); ++ } ++ }); + Set requestedTargetRuleNames = + statistics.stream() + .map(SamplingStatisticsDocument::getRuleName) + .collect(Collectors.toSet()); + +- GetSamplingTargetsResponse response = +- client.getSamplingTargets(GetSamplingTargetsRequest.create(statistics)); ++ GetSamplingTargetsRequest req = GetSamplingTargetsRequest.create(statistics, boostStatistics); ++ GetSamplingTargetsResponse response = client.getSamplingTargets(req); + Map targets = + response.getDocuments().stream() + .collect(Collectors.toMap(SamplingTargetDocument::getRuleName, Function.identity())); +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java +index dca930d5..01835dc2 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java +@@ -62,7 +62,8 @@ abstract class GetSamplingRulesResponse { + @JsonProperty("ServiceName") String serviceName, + @JsonProperty("ServiceType") String serviceType, + @JsonProperty("URLPath") String urlPath, +- @JsonProperty("Version") int version) { ++ @JsonProperty("Version") int version, ++ @JsonProperty("SamplingRateBoost") @Nullable SamplingRateBoost samplingRateBoost) { + return new AutoValue_GetSamplingRulesResponse_SamplingRule( + attributes, + fixedRate, +@@ -76,7 +77,8 @@ abstract class GetSamplingRulesResponse { + serviceName, + serviceType, + urlPath, +- version); ++ version, ++ samplingRateBoost); + } + + abstract Map getAttributes(); +@@ -106,5 +108,23 @@ abstract class GetSamplingRulesResponse { + abstract String getUrlPath(); + + abstract int getVersion(); ++ ++ @Nullable ++ abstract SamplingRateBoost getSamplingRateBoost(); ++ } ++ ++ @AutoValue ++ abstract static class SamplingRateBoost { ++ @JsonCreator ++ static SamplingRateBoost create( ++ @JsonProperty("MaxRate") double maxRate, ++ @JsonProperty("CooldownWindowMinutes") long cooldownWindowMinutes) { ++ return new AutoValue_GetSamplingRulesResponse_SamplingRateBoost( ++ maxRate, cooldownWindowMinutes); ++ } ++ ++ abstract double getMaxRate(); ++ ++ abstract long getCooldownWindowMinutes(); + } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java +index 7d1fb7b7..9404f73e 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java +@@ -15,14 +15,20 @@ import java.util.List; + @JsonSerialize(as = GetSamplingTargetsRequest.class) + abstract class GetSamplingTargetsRequest { + +- static GetSamplingTargetsRequest create(List documents) { +- return new AutoValue_GetSamplingTargetsRequest(documents); ++ static GetSamplingTargetsRequest create( ++ List documents, ++ List boostDocuments) { ++ return new AutoValue_GetSamplingTargetsRequest(documents, boostDocuments); + } + + // Limit of 25 items + @JsonProperty("SamplingStatisticsDocuments") + abstract List getDocuments(); + ++ // Limit of 25 items ++ @JsonProperty("SamplingBoostStatisticsDocuments") ++ abstract List getBoostDocuments(); ++ + @AutoValue + @JsonSerialize(as = SamplingStatisticsDocument.class) + abstract static class SamplingStatisticsDocument { +@@ -66,4 +72,48 @@ abstract class GetSamplingTargetsRequest { + abstract SamplingStatisticsDocument build(); + } + } ++ ++ @AutoValue ++ @JsonSerialize(as = SamplingBoostStatisticsDocument.class) ++ abstract static class SamplingBoostStatisticsDocument { ++ ++ static SamplingBoostStatisticsDocument.Builder newBuilder() { ++ return new AutoValue_GetSamplingTargetsRequest_SamplingBoostStatisticsDocument.Builder(); ++ } ++ ++ @JsonProperty("RuleName") ++ abstract String getRuleName(); ++ ++ @JsonProperty("ServiceName") ++ abstract String getServiceName(); ++ ++ @JsonProperty("Timestamp") ++ abstract Date getTimestamp(); ++ ++ @JsonProperty("AnomalyCount") ++ abstract long getAnomalyCount(); ++ ++ @JsonProperty("TotalCount") ++ abstract long getTotalCount(); ++ ++ @JsonProperty("SampledAnomalyCount") ++ abstract long getSampledAnomalyCount(); ++ ++ @AutoValue.Builder ++ abstract static class Builder { ++ abstract Builder setRuleName(String ruleName); ++ ++ abstract Builder setServiceName(String serviceName); ++ ++ abstract Builder setTimestamp(Date timestamp); ++ ++ abstract Builder setAnomalyCount(long anomalyCount); ++ ++ abstract Builder setTotalCount(long totalCount); ++ ++ abstract Builder setSampledAnomalyCount(long sampledAnomalyCount); ++ ++ abstract SamplingBoostStatisticsDocument build(); ++ } ++ } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java +index c1e178f5..406f07e2 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java +@@ -19,9 +19,11 @@ abstract class GetSamplingTargetsResponse { + static GetSamplingTargetsResponse create( + @JsonProperty("LastRuleModification") Date lastRuleModification, + @JsonProperty("SamplingTargetDocuments") List documents, +- @JsonProperty("UnprocessedStatistics") List unprocessedStatistics) { ++ @JsonProperty("UnprocessedStatistics") List unprocessedStatistics, ++ @JsonProperty("UnprocessedBoostStatistics") @Nullable ++ List unprocessedBoostStatistics) { + return new AutoValue_GetSamplingTargetsResponse( +- lastRuleModification, documents, unprocessedStatistics); ++ lastRuleModification, documents, unprocessedStatistics, unprocessedBoostStatistics); + } + + abstract Date getLastRuleModification(); +@@ -30,6 +32,9 @@ abstract class GetSamplingTargetsResponse { + + abstract List getUnprocessedStatistics(); + ++ @Nullable ++ abstract List getUnprocessedBoostStatistics(); ++ + @AutoValue + abstract static class SamplingTargetDocument { + +@@ -39,9 +44,10 @@ abstract class GetSamplingTargetsResponse { + @JsonProperty("Interval") @Nullable Integer intervalSecs, + @JsonProperty("ReservoirQuota") @Nullable Integer reservoirQuota, + @JsonProperty("ReservoirQuotaTTL") @Nullable Date reservoirQuotaTtl, ++ @JsonProperty("SamplingBoost") @Nullable SamplingBoost samplingBoost, + @JsonProperty("RuleName") String ruleName) { + return new AutoValue_GetSamplingTargetsResponse_SamplingTargetDocument( +- fixedRate, intervalSecs, reservoirQuota, reservoirQuotaTtl, ruleName); ++ fixedRate, intervalSecs, reservoirQuota, reservoirQuotaTtl, samplingBoost, ruleName); + } + + abstract double getFixedRate(); +@@ -57,6 +63,9 @@ abstract class GetSamplingTargetsResponse { + @Nullable + abstract Date getReservoirQuotaTtl(); + ++ @Nullable ++ abstract SamplingBoost getSamplingBoost(); ++ + abstract String getRuleName(); + } + +@@ -78,4 +87,18 @@ abstract class GetSamplingTargetsResponse { + + abstract String getRuleName(); + } ++ ++ @AutoValue ++ abstract static class SamplingBoost { ++ @JsonCreator ++ static SamplingBoost create( ++ @JsonProperty("BoostRate") double boostRate, ++ @JsonProperty("BoostRateTTL") Date boostRateTtl) { ++ return new AutoValue_GetSamplingTargetsResponse_SamplingBoost(boostRate, boostRateTtl); ++ } ++ ++ abstract double getBoostRate(); ++ ++ abstract Date getBoostRateTtl(); ++ } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java +index 1d97c4ae..6462c7f3 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java +@@ -11,10 +11,13 @@ import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.trace.SpanKind; + import io.opentelemetry.context.Context; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingBoost; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.data.LinkData; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +@@ -76,12 +79,20 @@ final class SamplingRuleApplier { + + private final String clientId; + private final String ruleName; ++ private final String serviceName; + private final Clock clock; + private final Sampler reservoirSampler; + private final long reservoirEndTimeNanos; ++ private final double fixedRate; + private final Sampler fixedRateSampler; + private final boolean borrowing; + ++ // Adaptive sampling related configs ++ private final boolean hasBoost; ++ private final double boostedFixedRate; ++ private final Long boostEndTimeNanos; ++ private final Sampler boostedFixedRateSampler; ++ + private final Map attributeMatchers; + private final Matcher urlPathMatcher; + private final Matcher serviceNameMatcher; +@@ -94,7 +105,11 @@ final class SamplingRuleApplier { + + private final long nextSnapshotTimeNanos; + +- SamplingRuleApplier(String clientId, GetSamplingRulesResponse.SamplingRule rule, Clock clock) { ++ SamplingRuleApplier( ++ String clientId, ++ GetSamplingRulesResponse.SamplingRule rule, ++ @Nullable String serviceName, ++ Clock clock) { + this.clientId = clientId; + this.clock = clock; + String ruleName = rule.getRuleName(); +@@ -108,6 +123,8 @@ final class SamplingRuleApplier { + } + this.ruleName = ruleName; + ++ this.serviceName = serviceName == null ? "default" : serviceName; ++ + // We don't have a SamplingTarget so are ready to report a snapshot right away. + nextSnapshotTimeNanos = clock.nanoTime(); + +@@ -124,7 +141,15 @@ final class SamplingRuleApplier { + reservoirSampler = Sampler.alwaysOff(); + borrowing = false; + } +- fixedRateSampler = createFixedRate(rule.getFixedRate()); ++ fixedRate = rule.getFixedRate(); ++ fixedRateSampler = createFixedRate(fixedRate); ++ ++ // Check if the rule has a sampling rate boost option ++ hasBoost = rule.getSamplingRateBoost() != null; ++ ++ boostedFixedRate = fixedRate; ++ boostedFixedRateSampler = createFixedRate(fixedRate); ++ boostEndTimeNanos = clock.nanoTime(); + + if (rule.getAttributes().isEmpty()) { + attributeMatchers = Collections.emptyMap(); +@@ -147,11 +172,16 @@ final class SamplingRuleApplier { + private SamplingRuleApplier( + String clientId, + String ruleName, ++ String serviceName, + Clock clock, + Sampler reservoirSampler, + long reservoirEndTimeNanos, ++ double fixedRate, + Sampler fixedRateSampler, + boolean borrowing, ++ double boostedFixedRate, ++ Long boostEndTimeNanos, ++ boolean hasBoost, + Map attributeMatchers, + Matcher urlPathMatcher, + Matcher serviceNameMatcher, +@@ -163,11 +193,16 @@ final class SamplingRuleApplier { + long nextSnapshotTimeNanos) { + this.clientId = clientId; + this.ruleName = ruleName; ++ this.serviceName = serviceName; + this.clock = clock; + this.reservoirSampler = reservoirSampler; + this.reservoirEndTimeNanos = reservoirEndTimeNanos; ++ this.fixedRate = fixedRate; + this.fixedRateSampler = fixedRateSampler; + this.borrowing = borrowing; ++ this.boostedFixedRate = boostedFixedRate; ++ this.boostEndTimeNanos = boostEndTimeNanos; ++ this.hasBoost = hasBoost; + this.attributeMatchers = attributeMatchers; + this.urlPathMatcher = urlPathMatcher; + this.serviceNameMatcher = serviceNameMatcher; +@@ -177,6 +212,7 @@ final class SamplingRuleApplier { + this.resourceArnMatcher = resourceArnMatcher; + this.statistics = statistics; + this.nextSnapshotTimeNanos = nextSnapshotTimeNanos; ++ this.boostedFixedRateSampler = createFixedRate(this.boostedFixedRate); + } + + @SuppressWarnings("deprecation") // TODO +@@ -273,45 +309,84 @@ final class SamplingRuleApplier { + statistics.sampled.increment(); + return result; + } +- result = +- fixedRateSampler.shouldSample( +- parentContext, traceId, name, spanKind, attributes, parentLinks); ++ ++ if (clock.nanoTime() < boostEndTimeNanos) { ++ result = ++ boostedFixedRateSampler.shouldSample( ++ parentContext, traceId, name, spanKind, attributes, parentLinks); ++ } else { ++ result = ++ fixedRateSampler.shouldSample( ++ parentContext, traceId, name, spanKind, attributes, parentLinks); ++ } + if (result.getDecision() != SamplingDecision.DROP) { + statistics.sampled.increment(); + } + return result; + } + ++ void countTrace() { ++ statistics.traces.increment(); ++ } ++ ++ void countAnomalyTrace(ReadableSpan span) { ++ statistics.anomalies.increment(); ++ ++ if (span.getSpanContext().isSampled()) { ++ statistics.anomaliesSampled.increment(); ++ } ++ } ++ + @Nullable +- SamplingStatisticsDocument snapshot(Date now) { ++ SamplingRuleStatisticsSnapshot snapshot(Date now) { + if (clock.nanoTime() < nextSnapshotTimeNanos) { + return null; + } +- return SamplingStatisticsDocument.newBuilder() +- .setClientId(clientId) +- .setRuleName(ruleName) +- .setTimestamp(now) +- // Resetting requests first ensures that sample / borrow rate are positive after the reset. +- // Snapshotting is not concurrent so this ensures they are always positive. +- .setRequestCount(statistics.requests.sumThenReset()) +- .setSampledCount(statistics.sampled.sumThenReset()) +- .setBorrowCount(statistics.borrowed.sumThenReset()) +- .build(); ++ long totalCount = statistics.requests.sumThenReset(); ++ long sampledCount = statistics.sampled.sumThenReset(); ++ long borrowCount = statistics.borrowed.sumThenReset(); ++ long traceCount = statistics.traces.sumThenReset(); ++ long anomalyCount = statistics.anomalies.sumThenReset(); ++ long sampledAnomalyCount = statistics.anomaliesSampled.sumThenReset(); ++ SamplingStatisticsDocument samplingStatistics = ++ SamplingStatisticsDocument.newBuilder() ++ .setClientId(clientId) ++ .setRuleName(ruleName) ++ .setTimestamp(now) ++ // Resetting requests first ensures that sample / borrow rate are positive after the ++ // reset. ++ // Snapshotting is not concurrent so this ensures they are always positive. ++ .setRequestCount(totalCount) ++ .setSampledCount(sampledCount) ++ .setBorrowCount(borrowCount) ++ .build(); ++ SamplingBoostStatisticsDocument boostDoc = ++ SamplingBoostStatisticsDocument.newBuilder() ++ .setRuleName(ruleName) ++ .setServiceName(serviceName) ++ .setTimestamp(now) ++ .setTotalCount(traceCount) ++ .setAnomalyCount(anomalyCount) ++ .setSampledAnomalyCount(sampledAnomalyCount) ++ .build(); ++ return new SamplingRuleStatisticsSnapshot(samplingStatistics, boostDoc); + } + + long getNextSnapshotTimeNanos() { + return nextSnapshotTimeNanos; + } + +- SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now) { ++ // currentNanoTime is passed in to ensure all uses of withTarget are used with the same baseline ++ // time reference ++ SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now, long currentNanoTime) { + Sampler newFixedRateSampler = createFixedRate(target.getFixedRate()); + Sampler newReservoirSampler = Sampler.alwaysOff(); +- long newReservoirEndTimeNanos = clock.nanoTime(); ++ long newReservoirEndTimeNanos = currentNanoTime; + // Not well documented but a quota should always come with a TTL + if (target.getReservoirQuota() != null && target.getReservoirQuotaTtl() != null) { + newReservoirSampler = createRateLimited(target.getReservoirQuota()); + newReservoirEndTimeNanos = +- clock.nanoTime() ++ currentNanoTime + + Duration.between(now.toInstant(), target.getReservoirQuotaTtl().toInstant()) + .toNanos(); + } +@@ -319,16 +394,36 @@ final class SamplingRuleApplier { + target.getIntervalSecs() != null + ? TimeUnit.SECONDS.toNanos(target.getIntervalSecs()) + : AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; +- long newNextSnapshotTimeNanos = clock.nanoTime() + intervalNanos; ++ long newNextSnapshotTimeNanos = currentNanoTime + intervalNanos; ++ ++ double newBoostedFixedRate = fixedRate; ++ long newBoostEndTimeNanos = currentNanoTime; ++ if (target.getSamplingBoost() != null) { ++ SamplingBoost samplingBoostMap = target.getSamplingBoost(); ++ if (samplingBoostMap != null ++ && samplingBoostMap.getBoostRate() >= target.getFixedRate() ++ && samplingBoostMap.getBoostRateTtl() != null) { ++ newBoostedFixedRate = samplingBoostMap.getBoostRate(); ++ newBoostEndTimeNanos = ++ currentNanoTime ++ + Duration.between(now.toInstant(), samplingBoostMap.getBoostRateTtl().toInstant()) ++ .toNanos(); ++ } ++ } + + return new SamplingRuleApplier( + clientId, + ruleName, ++ serviceName, + clock, + newReservoirSampler, + newReservoirEndTimeNanos, ++ fixedRate, + newFixedRateSampler, + /* borrowing= */ false, ++ newBoostedFixedRate, ++ newBoostEndTimeNanos, ++ hasBoost, + attributeMatchers, + urlPathMatcher, + serviceNameMatcher, +@@ -344,11 +439,16 @@ final class SamplingRuleApplier { + return new SamplingRuleApplier( + clientId, + ruleName, ++ serviceName, + clock, + reservoirSampler, + reservoirEndTimeNanos, ++ fixedRate, + fixedRateSampler, + borrowing, ++ boostedFixedRate, ++ boostEndTimeNanos, ++ hasBoost, + attributeMatchers, + urlPathMatcher, + serviceNameMatcher, +@@ -364,6 +464,15 @@ final class SamplingRuleApplier { + return ruleName; + } + ++ // For testing ++ String getServiceName() { ++ return serviceName; ++ } ++ ++ boolean hasBoost() { ++ return hasBoost; ++ } ++ + @Nullable + private static String getArn(Attributes attributes, Resource resource) { + String arn = resource.getAttributes().get(AWS_ECS_CONTAINER_ARN); +@@ -515,5 +624,30 @@ final class SamplingRuleApplier { + final LongAdder requests = new LongAdder(); + final LongAdder sampled = new LongAdder(); + final LongAdder borrowed = new LongAdder(); ++ final LongAdder traces = new LongAdder(); ++ final LongAdder anomalies = new LongAdder(); ++ final LongAdder anomaliesSampled = new LongAdder(); ++ } ++ ++ static class SamplingRuleStatisticsSnapshot { ++ final SamplingStatisticsDocument statisticsDocument; ++ final SamplingBoostStatisticsDocument boostStatisticsDocument; ++ ++ // final SamplingBoostStatisticsDocument boostStatisticsDocument; ++ ++ SamplingRuleStatisticsSnapshot( ++ SamplingStatisticsDocument statisticsDocument, ++ SamplingBoostStatisticsDocument boostStatisticsDocument) { ++ this.statisticsDocument = statisticsDocument; ++ this.boostStatisticsDocument = boostStatisticsDocument; ++ } ++ ++ SamplingStatisticsDocument getStatisticsDocument() { ++ return statisticsDocument; ++ } ++ ++ SamplingBoostStatisticsDocument getBoostStatisticsDocument() { ++ return boostStatisticsDocument; ++ } + } + } +diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java +index 75977dc0..9620ba2b 100644 +--- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java ++++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java +@@ -5,42 +5,79 @@ + + package io.opentelemetry.contrib.awsxray; + ++import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; ++import static io.opentelemetry.semconv.ServiceAttributes.SERVICE_NAME; ++ ++import com.github.benmanes.caffeine.cache.Cache; ++import com.github.benmanes.caffeine.cache.Caffeine; ++import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; ++import io.opentelemetry.api.trace.Span; + import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.api.trace.StatusCode; + import io.opentelemetry.context.Context; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.data.LinkData; ++import io.opentelemetry.sdk.trace.data.SpanData; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; ++import java.nio.charset.StandardCharsets; ++import java.security.MessageDigest; ++import java.security.NoSuchAlgorithmException; ++import java.time.Duration; + import java.util.Arrays; + import java.util.Comparator; + import java.util.Date; ++import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; ++import java.util.function.Consumer; + import java.util.logging.Level; + import java.util.logging.Logger; + import java.util.stream.Collectors; ++import javax.annotation.Nullable; + + final class XrayRulesSampler implements Sampler { + + private static final Logger logger = Logger.getLogger(XrayRulesSampler.class.getName()); + ++ public static final AttributeKey AWS_XRAY_SAMPLING_RULE = ++ AttributeKey.stringKey("aws.xray.sampling_rule"); ++ ++ // Used for generating operation ++ private static final String UNKNOWN_OPERATION = "UnknownOperation"; ++ private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); ++ private static final AttributeKey HTTP_TARGET = AttributeKey.stringKey("http.target"); ++ private static final AttributeKey HTTP_REQUEST_METHOD = ++ AttributeKey.stringKey("http.request.method"); ++ private static final AttributeKey HTTP_METHOD = AttributeKey.stringKey("http.method"); ++ + private final String clientId; + private final Resource resource; + private final Clock clock; + private final Sampler fallbackSampler; + private final SamplingRuleApplier[] ruleAppliers; ++ private final Map ruleToHashMap; ++ private final Map hashToRuleMap; ++ ++ private final boolean adaptiveSamplingRuleExists; ++ private final Cache traceUsageCache; ++ ++ @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig; ++ @Nullable private RateLimiter anomalyCaptureRateLimiter; + + XrayRulesSampler( + String clientId, + Resource resource, + Clock clock, + Sampler fallbackSampler, +- List rules) { ++ List rules, ++ @Nullable AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig) { + this( + clientId, + resource, +@@ -49,8 +86,19 @@ final class XrayRulesSampler implements Sampler { + rules.stream() + // Lower priority value takes precedence so normal ascending sort. + .sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority)) +- .map(rule -> new SamplingRuleApplier(clientId, rule, clock)) +- .toArray(SamplingRuleApplier[]::new)); ++ .map( ++ rule -> ++ new SamplingRuleApplier( ++ clientId, rule, resource.getAttribute(SERVICE_NAME), clock)) ++ .toArray(SamplingRuleApplier[]::new), ++ createRuleHashMaps(rules), ++ rules.stream().anyMatch(r -> r.getSamplingRateBoost() != null), ++ adaptiveSamplingConfig, ++ Caffeine.newBuilder() ++ .maximumSize(100_000) ++ .ticker(clock::nanoTime) ++ .expireAfterWrite(Duration.ofMinutes(10)) ++ .build()); + } + + private XrayRulesSampler( +@@ -58,12 +106,36 @@ final class XrayRulesSampler implements Sampler { + Resource resource, + Clock clock, + Sampler fallbackSampler, +- SamplingRuleApplier[] ruleAppliers) { ++ SamplingRuleApplier[] ruleAppliers, ++ Map ruleToHashMap, ++ boolean adaptiveSamplingRuleExists, ++ @Nullable AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig, ++ Cache traceUsageCache) { + this.clientId = clientId; + this.resource = resource; + this.clock = clock; + this.fallbackSampler = fallbackSampler; + this.ruleAppliers = ruleAppliers; ++ this.ruleToHashMap = ruleToHashMap; ++ this.hashToRuleMap = new HashMap<>(); ++ for (Map.Entry entry : ruleToHashMap.entrySet()) { ++ this.hashToRuleMap.put(entry.getValue(), entry.getKey()); ++ } ++ this.adaptiveSamplingRuleExists = adaptiveSamplingRuleExists; ++ this.adaptiveSamplingConfig = adaptiveSamplingConfig; ++ this.traceUsageCache = traceUsageCache; ++ ++ // Initialize anomaly capture rate limiter ++ if (this.adaptiveSamplingConfig != null ++ && this.adaptiveSamplingConfig.getAnomalyCaptureLimit() == null) { ++ this.anomalyCaptureRateLimiter = new RateLimiter(1, 1, clock); ++ } else if (adaptiveSamplingConfig != null ++ && adaptiveSamplingConfig.getAnomalyCaptureLimit() != null) { ++ int anomalyTracesPerSecond = ++ adaptiveSamplingConfig.getAnomalyCaptureLimit().getAnomalyTracesPerSecond(); ++ this.anomalyCaptureRateLimiter = ++ new RateLimiter(anomalyTracesPerSecond, anomalyTracesPerSecond, clock); ++ } + } + + @Override +@@ -74,10 +146,36 @@ final class XrayRulesSampler implements Sampler { + SpanKind spanKind, + Attributes attributes, + List parentLinks) { ++ String upstreamMatchedRule = ++ Span.fromContext(parentContext) ++ .getSpanContext() ++ .getTraceState() ++ .get(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY); + for (SamplingRuleApplier applier : ruleAppliers) { + if (applier.matches(attributes, resource)) { +- return applier.shouldSample( +- parentContext, traceId, name, spanKind, attributes, parentLinks); ++ SamplingResult result = ++ applier.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); ++ ++ // If the trace state has a sampling rule reference, propagate it ++ // Otherwise, encode and propagate the matched sampling rule using AwsSamplingResult ++ String ruleToPropagate; ++ if (upstreamMatchedRule != null) { ++ ruleToPropagate = hashToRuleMap.getOrDefault(upstreamMatchedRule, applier.getRuleName()); ++ } else { ++ ruleToPropagate = applier.getRuleName(); ++ } ++ String hashedRule = ruleToHashMap.getOrDefault(ruleToPropagate, ruleToPropagate); ++ if (this.adaptiveSamplingConfig != null ++ && this.adaptiveSamplingConfig.getAnomalyCaptureLimit() != null) { ++ // If the span is capturable based on local SDK config, add sampling rule attribute ++ return AwsSamplingResult.create( ++ result.getDecision(), ++ result.getAttributes().toBuilder() ++ .put(AWS_XRAY_SAMPLING_RULE.getKey(), ruleToPropagate) ++ .build(), ++ hashedRule); ++ } ++ return AwsSamplingResult.create(result.getDecision(), result.getAttributes(), hashedRule); + } + } + +@@ -96,7 +194,184 @@ final class XrayRulesSampler implements Sampler { + return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}"; + } + +- List snapshot(Date now) { ++ void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) { ++ if (this.adaptiveSamplingConfig != null) { ++ throw new IllegalStateException("Programming bug - Adaptive sampling config is already set"); ++ } else if (config != null && this.adaptiveSamplingConfig == null) { ++ this.adaptiveSamplingConfig = config; ++ ++ // Initialize anomaly capture rate limiter if error capture limit is configured ++ if (config.getAnomalyCaptureLimit() != null) { ++ int anomalyTracesPerSecond = config.getAnomalyCaptureLimit().getAnomalyTracesPerSecond(); ++ this.anomalyCaptureRateLimiter = ++ new RateLimiter(anomalyTracesPerSecond, anomalyTracesPerSecond, clock); ++ } ++ } ++ } ++ ++ void adaptSampling(ReadableSpan span, SpanData spanData, Consumer spanBatcher) { ++ if (!adaptiveSamplingRuleExists && this.adaptiveSamplingConfig == null) { ++ return; ++ } ++ Long statusCode = spanData.getAttributes().get(HTTP_RESPONSE_STATUS_CODE); ++ ++ boolean shouldBoostSampling = false; ++ boolean shouldCaptureAnomalySpan = false; ++ ++ List anomalyConditions = ++ adaptiveSamplingConfig != null ? adaptiveSamplingConfig.getAnomalyConditions() : null; ++ // Empty list -> no conditions will apply and we will not do anything ++ if (anomalyConditions != null && !anomalyConditions.isEmpty()) { ++ String operation = spanData.getAttributes().get(AwsAttributeKeys.AWS_LOCAL_OPERATION); ++ if (operation == null) { ++ operation = generateIngressOperation(spanData); ++ } ++ for (AwsXrayAdaptiveSamplingConfig.AnomalyConditions condition : anomalyConditions) { ++ // Skip condition if it would only re-apply action already being taken ++ if ((shouldBoostSampling ++ && AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST.equals( ++ condition.getUsage())) ++ || (shouldCaptureAnomalySpan ++ && AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE.equals( ++ condition.getUsage()))) { ++ continue; ++ } ++ // Check if the operation matches any in the list or if operations list is null (match all) ++ List operations = condition.getOperations(); ++ if (!(operations == null || operations.isEmpty() || operations.contains(operation))) { ++ continue; ++ } ++ // Check if any anomalyConditions detect an anomaly either through error code or latency ++ boolean isAnomaly = false; ++ ++ String errorCodeRegex = condition.getErrorCodeRegex(); ++ if (statusCode != null && errorCodeRegex != null) { ++ isAnomaly = statusCode.toString().matches(errorCodeRegex); ++ } ++ ++ Long highLatencyMs = condition.getHighLatencyMs(); ++ if (highLatencyMs != null) { ++ isAnomaly = ++ (errorCodeRegex == null || isAnomaly) ++ && (span.getLatencyNanos() / 1_000_000.0) >= highLatencyMs; ++ } ++ ++ if (isAnomaly) { ++ AwsXrayAdaptiveSamplingConfig.UsageType usage = condition.getUsage(); ++ if (usage != null) { ++ switch (usage) { ++ case BOTH: ++ shouldBoostSampling = true; ++ shouldCaptureAnomalySpan = true; ++ break; ++ case SAMPLING_BOOST: ++ shouldBoostSampling = true; ++ break; ++ case ANOMALY_TRACE_CAPTURE: ++ shouldCaptureAnomalySpan = true; ++ break; ++ default: // do nothing ++ } ++ } else { ++ shouldBoostSampling = true; ++ shouldCaptureAnomalySpan = true; ++ } ++ } ++ if (shouldBoostSampling && shouldCaptureAnomalySpan) { ++ break; ++ } ++ } ++ } else if ((statusCode != null && statusCode > 499) ++ || (statusCode == null ++ && spanData.getStatus() != null ++ && StatusCode.ERROR.equals(spanData.getStatus().getStatusCode()))) { ++ shouldBoostSampling = true; ++ shouldCaptureAnomalySpan = true; ++ } ++ ++ String traceId = spanData.getTraceId(); ++ AwsXrayAdaptiveSamplingConfig.UsageType existingUsage = traceUsageCache.getIfPresent(traceId); ++ boolean isNewTrace = existingUsage == null; ++ ++ // Anomaly Capture ++ boolean isSpanCaptured = false; ++ if (AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForAnomalyTraceCapture(existingUsage) ++ || (shouldCaptureAnomalySpan ++ && !span.getSpanContext().isSampled() ++ && anomalyCaptureRateLimiter != null ++ && anomalyCaptureRateLimiter.trySpend(1))) { ++ spanBatcher.accept(span); ++ isSpanCaptured = true; ++ } ++ ++ // Sampling Boost ++ boolean isCountedAsAnomalyForBoost = false; ++ if (shouldBoostSampling || isNewTrace) { ++ String traceStateValue = ++ span.getSpanContext() ++ .getTraceState() ++ .get(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY); ++ String ruleNameForBoostStats = ++ traceStateValue != null ++ ? hashToRuleMap.getOrDefault(traceStateValue, traceStateValue) ++ : traceStateValue; ++ SamplingRuleApplier ruleToReportTo = null; ++ SamplingRuleApplier matchedRule = null; ++ for (SamplingRuleApplier applier : ruleAppliers) { ++ // Rule propagated from when sampling decision was made, otherwise the matched rule ++ if (applier.getRuleName().equals(ruleNameForBoostStats)) { ++ ruleToReportTo = applier; ++ break; ++ } ++ if (applier.matches(spanData.getAttributes(), resource)) { ++ matchedRule = applier; ++ } ++ } ++ if (ruleToReportTo == null) { ++ if (matchedRule == null) { ++ logger.log( ++ Level.FINE, ++ "No sampling rule matched the request. This is a bug in either the OpenTelemetry SDK or X-Ray."); ++ } else { ++ ruleToReportTo = matchedRule; ++ } ++ } ++ if (shouldBoostSampling ++ && ruleToReportTo != null ++ && ruleToReportTo.hasBoost() ++ && !AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForBoost(existingUsage)) { ++ ruleToReportTo.countAnomalyTrace(span); ++ isCountedAsAnomalyForBoost = true; ++ } ++ if (isNewTrace && ruleToReportTo != null && ruleToReportTo.hasBoost()) { ++ ruleToReportTo.countTrace(); ++ } ++ } ++ ++ // Any interaction with a cache entry will reset the expiration timer of that entry ++ if (isSpanCaptured && isCountedAsAnomalyForBoost) { ++ this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); ++ } else if (isSpanCaptured) { ++ if (AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForBoost(existingUsage)) { ++ this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); ++ } else { ++ this.traceUsageCache.put( ++ traceId, AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE); ++ } ++ } else if (isCountedAsAnomalyForBoost) { ++ if (AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForAnomalyTraceCapture(existingUsage)) { ++ this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); ++ } else { ++ this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST); ++ } ++ } else if (existingUsage != null) { ++ this.traceUsageCache.put(traceId, existingUsage); ++ } else { ++ this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.NEITHER); ++ } ++ } ++ ++ List snapshot(Date now) { + return Arrays.stream(ruleAppliers) + .map(rule -> rule.snapshot(now)) + .filter(Objects::nonNull) +@@ -115,15 +390,16 @@ final class XrayRulesSampler implements Sampler { + Map ruleTargets, + Set requestedTargetRuleNames, + Date now) { ++ long currentNanoTime = clock.nanoTime(); + long defaultNextSnapshotTimeNanos = +- clock.nanoTime() + AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; ++ currentNanoTime + AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; + SamplingRuleApplier[] newAppliers = + Arrays.stream(ruleAppliers) + .map( + rule -> { + SamplingTargetDocument target = ruleTargets.get(rule.getRuleName()); + if (target != null) { +- return rule.withTarget(target, now); ++ return rule.withTarget(target, now, currentNanoTime); + } + if (requestedTargetRuleNames.contains(rule.getRuleName())) { + // In practice X-Ray should return a target for any rule we requested but +@@ -135,6 +411,92 @@ final class XrayRulesSampler implements Sampler { + return rule; + }) + .toArray(SamplingRuleApplier[]::new); +- return new XrayRulesSampler(clientId, resource, clock, fallbackSampler, newAppliers); ++ return new XrayRulesSampler( ++ clientId, ++ resource, ++ clock, ++ fallbackSampler, ++ newAppliers, ++ ruleToHashMap, ++ adaptiveSamplingRuleExists, ++ adaptiveSamplingConfig, ++ traceUsageCache); ++ } ++ ++ static boolean isKeyPresent(SpanData span, AttributeKey key) { ++ return span.getAttributes().get(key) != null; ++ } ++ ++ private static String generateIngressOperation(SpanData span) { ++ String operation = UNKNOWN_OPERATION; ++ if (isKeyPresent(span, URL_PATH) || isKeyPresent(span, HTTP_TARGET)) { ++ String httpTarget = ++ isKeyPresent(span, URL_PATH) ++ ? span.getAttributes().get(URL_PATH) ++ : span.getAttributes().get(HTTP_TARGET); ++ // get the first part from API path string as operation value ++ // the more levels/parts we get from API path the higher chance for getting high cardinality ++ // data ++ if (httpTarget != null) { ++ operation = extractApiPathValue(httpTarget); ++ if (isKeyPresent(span, HTTP_REQUEST_METHOD) || isKeyPresent(span, HTTP_METHOD)) { ++ String httpMethod = ++ isKeyPresent(span, HTTP_REQUEST_METHOD) ++ ? span.getAttributes().get(HTTP_REQUEST_METHOD) ++ : span.getAttributes().get(HTTP_METHOD); ++ if (httpMethod != null) { ++ operation = httpMethod + " " + operation; ++ } ++ } ++ } ++ } ++ return operation; ++ } ++ ++ private static String extractApiPathValue(String httpTarget) { ++ if (httpTarget == null || httpTarget.isEmpty()) { ++ return "/"; ++ } ++ String[] paths = httpTarget.split("/"); ++ if (paths.length > 1) { ++ return "/" + paths[1]; ++ } ++ return "/"; ++ } ++ ++ private static Map createRuleHashMaps( ++ List rules) { ++ Map ruleToHashMap = new HashMap<>(); ++ for (GetSamplingRulesResponse.SamplingRule rule : rules) { ++ String ruleName = rule.getRuleName(); ++ if (ruleName != null) { ++ ruleToHashMap.put(ruleName, hashRuleName(ruleName)); ++ } ++ } ++ return ruleToHashMap; ++ } ++ ++ static String hashRuleName(String ruleName) { ++ try { ++ MessageDigest digest = MessageDigest.getInstance("SHA-256"); ++ byte[] hash = digest.digest(ruleName.getBytes(StandardCharsets.UTF_8)); ++ StringBuilder hexString = new StringBuilder(); ++ for (int i = 0; i < Math.min(hash.length, 8); i++) { ++ String hex = Integer.toHexString(0xff & hash[i]); ++ if (hex.length() == 1) { ++ hexString.append('0'); ++ } ++ hexString.append(hex); ++ } ++ return hexString.toString(); ++ } catch (NoSuchAlgorithmException e) { ++ return ruleName; ++ } ++ } ++ ++ // For testing ++ Cache getTraceUsageCache() { ++ traceUsageCache.cleanUp(); ++ return traceUsageCache; + } + } +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java +index 4e5cd13b..ec256fe0 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java +@@ -7,7 +7,10 @@ package io.opentelemetry.contrib.awsxray; + + import static java.util.Objects.requireNonNull; + import static org.assertj.core.api.Assertions.assertThat; ++import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; + import static org.awaitility.Awaitility.await; ++import static org.junit.jupiter.api.Assertions.assertThrows; ++import static org.mockito.Mockito.mock; + + import com.google.common.io.ByteStreams; + import com.linecorp.armeria.common.HttpResponse; +@@ -21,6 +24,9 @@ import io.opentelemetry.api.trace.SpanKind; + import io.opentelemetry.api.trace.TraceId; + import io.opentelemetry.context.Context; + import io.opentelemetry.sdk.resources.Resource; ++import io.opentelemetry.sdk.trace.ReadableSpan; ++import io.opentelemetry.sdk.trace.data.SpanData; ++import io.opentelemetry.sdk.trace.export.SpanExporter; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; + import java.io.IOException; +@@ -187,6 +193,31 @@ class AwsXrayRemoteSamplerTest { + } + } + ++ void setAndResetSpanExporter() { ++ try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { ++ // Setting span exporter should only work once ++ sampler.setSpanExporter(mock(SpanExporter.class)); ++ assertThrows( ++ IllegalStateException.class, () -> sampler.setSpanExporter(mock(SpanExporter.class))); ++ } ++ } ++ ++ @Test ++ void adaptSamplingWithoutSpanExporter() { ++ assertThrows( ++ IllegalStateException.class, ++ () -> sampler.adaptSampling(mock(ReadableSpan.class), mock(SpanData.class))); ++ } ++ ++ @Test ++ void adaptSamplingWithSpanExporter() { ++ try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { ++ sampler.setSpanExporter(mock(SpanExporter.class)); ++ assertThatCode(() -> sampler.adaptSampling(mock(ReadableSpan.class), mock(SpanData.class))) ++ .doesNotThrowAnyException(); ++ } ++ } ++ + // https://github.com/open-telemetry/opentelemetry-java-contrib/issues/376 + @Test + void testJitterTruncation() { +@@ -206,6 +237,16 @@ class AwsXrayRemoteSamplerTest { + } + } + ++ @Test ++ void setAdaptiveSamplingConfig() { ++ try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder().setVersion(1.0).build(); ++ sampler.setAdaptiveSamplingConfig(config); ++ assertThrows(IllegalStateException.class, () -> sampler.setAdaptiveSamplingConfig(config)); ++ } ++ } ++ + private static SamplingDecision doSample(Sampler sampler, String name) { + return sampler + .shouldSample( +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java +index 920a5ffd..dcc7118a 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java +@@ -15,18 +15,25 @@ import static io.opentelemetry.semconv.incubating.HttpIncubatingAttributes.HTTP_ + import static io.opentelemetry.semconv.incubating.NetIncubatingAttributes.NET_HOST_NAME; + import static org.assertj.core.api.Assertions.assertThat; + import static org.awaitility.Awaitility.await; ++import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.when; + + import com.fasterxml.jackson.databind.ObjectMapper; + import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.common.AttributesBuilder; ++import io.opentelemetry.api.trace.SpanContext; + import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.api.trace.TraceFlags; + import io.opentelemetry.api.trace.TraceId; ++import io.opentelemetry.api.trace.TraceState; + import io.opentelemetry.context.Context; ++import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingBoost; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.common.Clock; + import io.opentelemetry.sdk.resources.Resource; + import io.opentelemetry.sdk.testing.time.TestClock; ++import io.opentelemetry.sdk.trace.ReadableSpan; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; + import io.opentelemetry.semconv.HttpAttributes; +@@ -37,6 +44,7 @@ import java.io.IOException; + import java.io.UncheckedIOException; + import java.time.Duration; + import java.time.Instant; ++import java.time.temporal.ChronoUnit; + import java.util.Collections; + import java.util.Date; + import java.util.concurrent.TimeUnit; +@@ -50,6 +58,7 @@ class SamplingRuleApplierTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String CLIENT_ID = "test-client-id"; ++ private static final String TEST_SERVICE_NAME = "test-service-name"; + + @Nested + @SuppressWarnings("ClassCanBeStatic") +@@ -57,7 +66,10 @@ class SamplingRuleApplierTest { + + private final SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-exactmatch.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + private final Resource resource = + Resource.builder() +@@ -91,7 +103,8 @@ class SamplingRuleApplierTest { + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + + Date now = new Date(); +- GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); ++ GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = ++ applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -100,7 +113,7 @@ class SamplingRuleApplierTest { + assertThat(statistics.getBorrowCount()).isEqualTo(0); + + // Reset +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getRequestCount()).isEqualTo(0); + assertThat(statistics.getSampledCount()).isEqualTo(0); + assertThat(statistics.getBorrowCount()).isEqualTo(0); +@@ -108,7 +121,7 @@ class SamplingRuleApplierTest { + doSample(applier); + doSample(applier); + now = new Date(); +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -283,7 +296,10 @@ class SamplingRuleApplierTest { + + private final SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-wildcards.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-wildcards.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + private final Resource resource = + Resource.builder() +@@ -316,7 +332,8 @@ class SamplingRuleApplierTest { + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + + Date now = new Date(); +- GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); ++ GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = ++ applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -325,7 +342,7 @@ class SamplingRuleApplierTest { + assertThat(statistics.getBorrowCount()).isEqualTo(0); + + // Reset +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getRequestCount()).isEqualTo(0); + assertThat(statistics.getSampledCount()).isEqualTo(0); + assertThat(statistics.getBorrowCount()).isEqualTo(0); +@@ -333,7 +350,7 @@ class SamplingRuleApplierTest { + doSample(applier); + doSample(applier); + now = new Date(); +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -626,7 +643,10 @@ class SamplingRuleApplierTest { + + private final SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-awslambda.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-awslambda.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + private final Resource resource = + Resource.builder() +@@ -677,7 +697,10 @@ class SamplingRuleApplierTest { + void borrowing() { + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), Clock.getDefault()); ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-reservoir.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); + + // Borrow + assertThat(doSample(applier)) +@@ -688,7 +711,8 @@ class SamplingRuleApplierTest { + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + + Date now = new Date(); +- GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); ++ GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = ++ applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -697,7 +721,7 @@ class SamplingRuleApplierTest { + assertThat(statistics.getBorrowCount()).isEqualTo(1); + + // Reset +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getRequestCount()).isEqualTo(0); + assertThat(statistics.getSampledCount()).isEqualTo(0); + assertThat(statistics.getBorrowCount()).isEqualTo(0); +@@ -713,7 +737,7 @@ class SamplingRuleApplierTest { + }); + + now = new Date(); +- statistics = applier.snapshot(now); ++ statistics = applier.snapshot(now).getStatisticsDocument(); + assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); + assertThat(statistics.getRuleName()).isEqualTo("Test"); + assertThat(statistics.getTimestamp()).isEqualTo(now); +@@ -727,7 +751,7 @@ class SamplingRuleApplierTest { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); ++ CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); + // No target yet, borrows from reservoir every second. + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); +@@ -746,8 +770,8 @@ class SamplingRuleApplierTest { + + // Got a target! + SamplingTargetDocument target = +- SamplingTargetDocument.create(0.0, 5, 2, Date.from(now.plusSeconds(10)), "test"); +- applier = applier.withTarget(target, Date.from(now)); ++ SamplingTargetDocument.create(0.0, 5, 2, Date.from(now.plusSeconds(10)), null, "test"); ++ applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); + // Statistics not expired yet + assertThat(applier.snapshot(Date.from(now))).isNull(); + +@@ -786,7 +810,7 @@ class SamplingRuleApplierTest { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); ++ CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); + // No target yet, borrows from reservoir every second. + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); +@@ -804,8 +828,8 @@ class SamplingRuleApplierTest { + assertThat(applier.snapshot(Date.from(now.plus(Duration.ofMinutes(30))))).isNotNull(); + + // Got a target! +- SamplingTargetDocument target = SamplingTargetDocument.create(0.0, 5, null, null, "test"); +- applier = applier.withTarget(target, Date.from(now)); ++ SamplingTargetDocument target = SamplingTargetDocument.create(0.0, 5, null, null, null, "test"); ++ applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); + // No reservoir, always use fixed rate (drop) + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); +@@ -815,12 +839,105 @@ class SamplingRuleApplierTest { + assertThat(applier.snapshot(Date.from(now))).isNotNull(); + } + ++ @Test ++ void ruleWithBoost() { ++ TestClock clock = TestClock.create(); ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, readSamplingRule("/sampling-rule-boost.json"), TEST_SERVICE_NAME, clock); ++ // No reservoir, always use fixed rate (drop) ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ ++ // Got a target! ++ // Boost raises sampling rate to 100% for 20 seconds ++ SamplingTargetDocument target = ++ SamplingTargetDocument.create( ++ 0.0, ++ 5, ++ null, ++ null, ++ SamplingBoost.create(1.0, Date.from(now.plus(20, ChronoUnit.SECONDS))), ++ "test"); ++ applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); ++ ++ // We should start sampling at this point ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ // After waiting 10 seconds, we should still be sampling ++ clock.advance(Duration.ofSeconds(10)); ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ assertThat(doSample(applier)) ++ .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ // After 30 seconds, we should stop sampling ++ clock.advance(Duration.ofSeconds(20)); ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ } ++ ++ @Test ++ void countTrace() { ++ TestClock clock = TestClock.create(); ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, readSamplingRule("/sampling-rule-boost.json"), TEST_SERVICE_NAME, clock); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ ++ SamplingRuleApplier.SamplingRuleStatisticsSnapshot snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ ++ applier.countTrace(); ++ applier.countTrace(); ++ applier.countTrace(); ++ ++ snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); ++ assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ ++ // Snapshotting again should've reset the statistics ++ snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ ++ // Decision to separate by trace ID is made in XrayRulesSampler class, so we can ignore ++ // trace/span ID in span context here ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ // Mock sampling the first two traces ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getSampled(), TraceState.getDefault())); ++ applier.countTrace(); ++ applier.countAnomalyTrace(readableSpanMock); ++ applier.countTrace(); ++ applier.countAnomalyTrace(readableSpanMock); ++ ++ // Mock not sampling the last trace ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ applier.countTrace(); ++ applier.countAnomalyTrace(readableSpanMock); ++ ++ snapshot = applier.snapshot(Date.from(now)); ++ assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); ++ assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(3); ++ assertThat(snapshot.getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(2); ++ } ++ + @Test + void withNextSnapshotTime() { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( +- CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); ++ CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + assertThat(applier.snapshot(Date.from(now))).isNotNull(); +@@ -839,6 +956,71 @@ class SamplingRuleApplierTest { + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + } + ++ @Test ++ void hasBoostMethod() { ++ SamplingRuleApplier applierWithBoost = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-boost.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); ++ assertThat(applierWithBoost.hasBoost()).isTrue(); ++ ++ SamplingRuleApplier applierWithoutBoost = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); ++ assertThat(applierWithoutBoost.hasBoost()).isFalse(); ++ } ++ ++ @Test ++ void getServiceNameMethod() { ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ TEST_SERVICE_NAME, ++ Clock.getDefault()); ++ assertThat(applier.getServiceName()).isEqualTo(TEST_SERVICE_NAME); ++ } ++ ++ @Test ++ void nullRuleName() { ++ GetSamplingRulesResponse.SamplingRule ruleWithNullName = ++ GetSamplingRulesResponse.SamplingRule.create( ++ Collections.emptyMap(), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ null, // null rule name ++ null, ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier(CLIENT_ID, ruleWithNullName, TEST_SERVICE_NAME, Clock.getDefault()); ++ assertThat(applier.getRuleName()).isEqualTo("default"); ++ } ++ ++ @Test ++ void nullServiceName() { ++ SamplingRuleApplier applier = ++ new SamplingRuleApplier( ++ CLIENT_ID, ++ readSamplingRule("/sampling-rule-exactmatch.json"), ++ null, // null service name ++ Clock.getDefault()); ++ assertThat(applier.getServiceName()).isEqualTo("default"); ++ } ++ + private static SamplingResult doSample(SamplingRuleApplier applier) { + return applier.shouldSample( + Context.current(), +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java +index 1ca8df34..72ec524b 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java +@@ -5,17 +5,28 @@ + + package io.opentelemetry.contrib.awsxray; + ++import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; + import static org.assertj.core.api.Assertions.assertThat; ++import static org.junit.jupiter.api.Assertions.assertThrows; ++import static org.mockito.ArgumentMatchers.any; ++import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.when; + + import io.opentelemetry.api.common.AttributeKey; + import io.opentelemetry.api.common.Attributes; ++import io.opentelemetry.api.trace.SpanContext; + import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.api.trace.TraceFlags; + import io.opentelemetry.api.trace.TraceId; ++import io.opentelemetry.api.trace.TraceState; + import io.opentelemetry.context.Context; ++import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRateBoost; + import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRule; + import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; + import io.opentelemetry.sdk.resources.Resource; + import io.opentelemetry.sdk.testing.time.TestClock; ++import io.opentelemetry.sdk.trace.ReadableSpan; ++import io.opentelemetry.sdk.trace.data.SpanData; + import io.opentelemetry.sdk.trace.samplers.Sampler; + import io.opentelemetry.sdk.trace.samplers.SamplingDecision; + import io.opentelemetry.sdk.trace.samplers.SamplingResult; +@@ -25,14 +36,20 @@ import java.util.Arrays; + import java.util.Collections; + import java.util.Date; + import java.util.HashMap; ++import java.util.List; + import java.util.Map; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.LongAdder; ++import java.util.function.Consumer; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import org.junit.jupiter.api.Test; + + class XrayRulesSamplerTest { + ++ private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); ++ private static final AttributeKey HTTP_METHOD = AttributeKey.stringKey("http.method"); ++ + @Test + void updateTargets() { + SamplingRule rule1 = +@@ -49,7 +66,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + SamplingRule rule2 = + SamplingRule.create( + Collections.singletonMap("test", "dog-service"), +@@ -64,7 +82,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + SamplingRule rule3 = + SamplingRule.create( + Collections.singletonMap("test", "*-service"), +@@ -79,7 +98,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + SamplingRule rule4 = + SamplingRule.create( + Collections.emptyMap(), +@@ -94,7 +114,8 @@ class XrayRulesSamplerTest { + "*", + "*", + "*", +- 1); ++ 1, ++ null); + + TestClock clock = TestClock.create(); + XrayRulesSampler sampler = +@@ -103,22 +124,58 @@ class XrayRulesSamplerTest { + Resource.getDefault(), + clock, + Sampler.alwaysOn(), +- Arrays.asList(rule1, rule4, rule3, rule2)); ++ Arrays.asList(rule1, rule4, rule3, rule2), ++ null); + + assertThat(doSample(sampler, "cat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("cat-rule"))); + assertThat(doSample(sampler, "cat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("cat-rule"))); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "bat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("bat-rule"))); + assertThat(doSample(sampler, "bat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("bat-rule"))); + assertThat(doSample(sampler, "unknown")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("default-rule"))); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); +@@ -128,10 +185,10 @@ class XrayRulesSamplerTest { + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + + SamplingTargetDocument catTarget = +- SamplingTargetDocument.create(0.0, 10, null, null, "cat-rule"); ++ SamplingTargetDocument.create(0.0, 10, null, null, null, "cat-rule"); + + SamplingTargetDocument batTarget = +- SamplingTargetDocument.create(0.0, 5, null, null, "bat-rule"); ++ SamplingTargetDocument.create(0.0, 5, null, null, null, "bat-rule"); + + clock.advance(Duration.ofSeconds(10)); + now = Instant.ofEpochSecond(0, clock.now()); +@@ -145,16 +202,41 @@ class XrayRulesSamplerTest { + .collect(Collectors.toSet()), + Date.from(now)); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "dog-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "unknown")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("default-rule"))); + // Targets overridden to always drop. + assertThat(doSample(sampler, "cat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("cat-rule"))); + assertThat(doSample(sampler, "bat-service")) +- .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.empty(), ++ XrayRulesSampler.hashRuleName("bat-rule"))); + + // Minimum is batTarget, 5s from now + assertThat(sampler.nextTargetFetchTimeNanos()) +@@ -169,6 +251,867 @@ class XrayRulesSamplerTest { + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + } + ++ @Test ++ void updateTargetsWithLocalAdaptiveSamplingConfig() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "cat-service"), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "cat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule2 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "dog-service"), ++ 0.0, ++ "*", ++ "*", ++ 2, ++ 1, ++ "*", ++ "*", ++ "dog-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule3 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "*-service"), ++ 1.0, ++ "*", ++ "*", ++ 3, ++ 1, ++ "*", ++ "*", ++ "bat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule4 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 4, ++ 0, ++ "*", ++ "*", ++ "default-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() ++ .setAnomalyTracesPerSecond(2) ++ .build()) ++ .build(); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1, rule4, rule3, rule2), ++ config); ++ ++ assertThat(doSample(sampler, "cat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("cat-rule"))); ++ assertThat(doSample(sampler, "cat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("cat-rule"))); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); ++ assertThat(doSample(sampler, "bat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("bat-rule"))); ++ assertThat(doSample(sampler, "bat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("bat-rule"))); ++ assertThat(doSample(sampler, "unknown")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "default-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("default-rule"))); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(4); ++ assertThat(sampler.nextTargetFetchTimeNanos()).isEqualTo(clock.nanoTime()); ++ clock.advance(Duration.ofSeconds(10)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(4); ++ ++ SamplingTargetDocument catTarget = ++ SamplingTargetDocument.create(0.0, 10, null, null, null, "cat-rule"); ++ ++ SamplingTargetDocument batTarget = ++ SamplingTargetDocument.create(0.0, 5, null, null, null, "bat-rule"); ++ ++ clock.advance(Duration.ofSeconds(10)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ Map targets = new HashMap<>(); ++ targets.put("cat-rule", catTarget); ++ targets.put("bat-rule", batTarget); ++ sampler = ++ sampler.withTargets( ++ targets, ++ Stream.of("cat-rule", "bat-rule", "dog-rule", "default-rule") ++ .collect(Collectors.toSet()), ++ Date.from(now)); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.RECORD_AND_SAMPLE, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); ++ assertThat(doSample(sampler, "dog-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("dog-rule"))); ++ assertThat(doSample(sampler, "unknown")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "default-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("default-rule"))); ++ // Targets overridden to always drop. ++ assertThat(doSample(sampler, "cat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("cat-rule"))); ++ assertThat(doSample(sampler, "bat-service")) ++ .usingRecursiveComparison() ++ .isEqualTo( ++ AwsSamplingResult.create( ++ SamplingDecision.DROP, ++ Attributes.builder() ++ .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") ++ .build(), ++ XrayRulesSampler.hashRuleName("bat-rule"))); ++ ++ // Minimum is batTarget, 5s from now ++ assertThat(sampler.nextTargetFetchTimeNanos()) ++ .isEqualTo(clock.nanoTime() + TimeUnit.SECONDS.toNanos(5)); ++ ++ assertThat(sampler.snapshot(Date.from(now))).isEmpty(); ++ clock.advance(Duration.ofSeconds(5)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(1); ++ clock.advance(Duration.ofSeconds(5)); ++ now = Instant.ofEpochSecond(0, clock.now()); ++ assertThat(sampler.snapshot(Date.from(now))).hasSize(4); ++ } ++ ++ @Test ++ void noAdaptiveSamplingUsesNoSpace() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "cat-service"), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "cat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ null); ++ ++ LongAdder exportCounter = new LongAdder(); ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ SpanData spanDataMock = mock(SpanData.class); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(sampler.getTraceUsageCache().asMap().size()).isEqualTo(0); ++ } ++ ++ @Test ++ void recordErrors() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.singletonMap("test", "cat-service"), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "cat-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ SamplingRule rule2 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 4, ++ 0, ++ "*", ++ "*", ++ "default-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() ++ .setAnomalyTracesPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^500$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH) ++ .build())) ++ .build(); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1, rule2), ++ config); ++ ++ Instant now = Instant.ofEpochSecond(0, clock.now()); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L); ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ ++ // First span should be captured, second should be rate limited ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ // Only first span captured due to rate limiting ++ assertThat(exportCounter.sumThenReset()).isEqualTo(2L); ++ ++ List snapshot = ++ sampler.snapshot(Date.from(now)); ++ ++ // Rules are ordered by priority, so cat-rule is first ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(3); ++ ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ ++ // Mock trace coming from upstream service where it was sampled by cat-rule ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4"); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID4", ++ "SPAN_ID", ++ TraceFlags.getDefault(), ++ TraceState.builder() ++ .put( ++ AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, ++ XrayRulesSampler.hashRuleName("cat-rule")) ++ .build())); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ ++ // Ensure snapshot shows correctly saved statistics ++ snapshot = sampler.snapshot(Date.from(now)); ++ // cat-rule has no boost config and therefore records no statistics ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(0).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); ++ assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); ++ ++ // Assert the trace ID cache is filled with appropriate data and is cleared after TTL passes ++ assertThat(sampler.getTraceUsageCache().asMap().size()).isEqualTo(4); ++ clock.advance(Duration.ofMinutes(100)); ++ assertThat(sampler.getTraceUsageCache().asMap().size()).isEqualTo(0); ++ } ++ ++ @Test ++ void setAdaptiveSamplingConfigTwice() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 1.0, ++ "*", ++ "*", ++ 1, ++ 1, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ null); ++ ++ TestClock clock = TestClock.create(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ null); ++ ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder().setVersion(1.0).build(); ++ sampler.setAdaptiveSamplingConfig(config); ++ assertThrows(IllegalStateException.class, () -> sampler.setAdaptiveSamplingConfig(config)); ++ } ++ ++ @Test ++ void captureErrorBasedOnErrorCodeRegex() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() ++ .setAnomalyTracesPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^456$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(1L); ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(2L); ++ } ++ ++ @Test ++ void captureErrorBasedOnHighLatency() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() ++ .setAnomalyTracesPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setHighLatencyMs(100L) ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.add(1); ++ ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(2L); ++ } ++ ++ @Test ++ void captureErrorBasedOnErroCodeAndLatency() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() ++ .setAnomalyTracesPerSecond(2) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^456$") ++ .setHighLatencyMs(100L) ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.add(1); ++ ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(0L); ++ ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(1L); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(0L); ++ ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID5"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sum()).isEqualTo(2L); ++ } ++ ++ @Test ++ void recordAndCaptureErrorBasedOnSeparateConditions() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyCaptureLimit( ++ AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() ++ .setAnomalyTracesPerSecond(10) ++ .build()) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^5\\d\\d$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST) ++ .build(), ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setErrorCodeRegex("^4\\d\\d$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.add(1); ++ ++ // Boost condition triggered - count new trace + count anomaly ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(511L); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) ++ .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(0L); ++ ++ // Anomaly capture triggered - capture and update cache value ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(411L); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) ++ .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ ++ // Boost condition triggered - capture span even though anomaly capture not included ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(511L); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) ++ .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ ++ // Non-anomaly span - should still be captured since trace is anomalous overall ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) ++ .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ } ++ ++ @Test ++ void operationFilteringInAdaptSampling() { ++ SamplingRule rule1 = ++ SamplingRule.create( ++ Collections.emptyMap(), ++ 0.0, ++ "*", ++ "*", ++ 1, ++ 0, ++ "*", ++ "*", ++ "test-rule", ++ "*", ++ "*", ++ "*", ++ 1, ++ SamplingRateBoost.create(1, 300)); ++ ++ TestClock clock = TestClock.create(); ++ // Error span capture should default to 1/s ++ AwsXrayAdaptiveSamplingConfig config = ++ AwsXrayAdaptiveSamplingConfig.builder() ++ .setVersion(1.0) ++ .setAnomalyConditions( ++ Arrays.asList( ++ AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() ++ .setOperations(Arrays.asList("GET /api1", "GET /api2")) ++ .setErrorCodeRegex("^500$") ++ .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) ++ .build())) ++ .build(); ++ XrayRulesSampler sampler = ++ new XrayRulesSampler( ++ "CLIENT_ID", ++ Resource.getDefault(), ++ clock, ++ Sampler.alwaysOn(), ++ Arrays.asList(rule1), ++ config); ++ ++ ReadableSpan readableSpanMock = mock(ReadableSpan.class); ++ when(readableSpanMock.getSpanContext()) ++ .thenReturn( ++ SpanContext.create( ++ "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); ++ when(readableSpanMock.getLatencyNanos()).thenReturn(1L); ++ ++ SpanData spanDataMock = mock(SpanData.class); ++ Attributes attributesMock = mock(Attributes.class); ++ when(spanDataMock.getAttributes()).thenReturn(attributesMock); ++ when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L); ++ ++ LongAdder exportCounter = new LongAdder(); ++ Consumer stubbedConsumer = x -> exportCounter.increment(); ++ ++ // Test matching operations ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); ++ when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ ++ clock.advance(Duration.ofSeconds(5)); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); ++ when(attributesMock.get(URL_PATH)).thenReturn("/api2"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(2L); ++ ++ // Not enough time elapsed, error rate limit was hit ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); ++ when(attributesMock.get(URL_PATH)).thenReturn("/api2"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(0L); ++ ++ // Test non-matching operation ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4"); ++ when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("POST"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID5"); ++ when(attributesMock.get(URL_PATH)).thenReturn("/non-matching"); ++ when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(0L); ++ ++ // Test aws.local.operation takes priority ++ clock.advance(Duration.ofSeconds(5)); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID6"); ++ when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /api1"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ ++ // Test sending previously matched traceIDs gets captured ++ clock.advance(Duration.ofSeconds(5)); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); ++ when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /non-matching"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(2L); ++ ++ // Test sending previously matched traceIDs gets captured as long as trace is active ++ clock.advance(Duration.ofSeconds(45)); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ clock.advance(Duration.ofSeconds(45)); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ clock.advance(Duration.ofSeconds(45)); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(1L); ++ ++ // Test sending non-matching trace after expire-time elapses ++ clock.advance(Duration.ofMinutes(100)); ++ sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); ++ assertThat(exportCounter.sumThenReset()).isEqualTo(0L); ++ } ++ + private static SamplingResult doSample(Sampler sampler, String name) { + return sampler.shouldSample( + Context.current(), +diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java +index 283e3b3c..cf0cb072 100644 +--- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java ++++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java +@@ -126,7 +126,8 @@ class XraySamplerClientTest { + .setRequestCount(10500) + .setSampledCount(31) + .setBorrowCount(0) +- .build())); ++ .build()), ++ Collections.emptyList()); + GetSamplingTargetsResponse response = client.getSamplingTargets(samplingTargetsRequest); + + AggregatedHttpRequest request = server.takeRequest().request(); +@@ -174,7 +175,8 @@ class XraySamplerClientTest { + assertThatThrownBy( + () -> + client.getSamplingTargets( +- GetSamplingTargetsRequest.create(Collections.emptyList()))) ++ GetSamplingTargetsRequest.create( ++ Collections.emptyList(), Collections.emptyList()))) + .isInstanceOf(UncheckedIOException.class) + .hasMessage("Failed to deserialize response."); + } +diff --git a/aws-xray/src/test/resources/sampling-rule-boost.json b/aws-xray/src/test/resources/sampling-rule-boost.json +new file mode 100644 +index 00000000..32752d5e +--- /dev/null ++++ b/aws-xray/src/test/resources/sampling-rule-boost.json +@@ -0,0 +1,22 @@ ++{ ++ "RuleName": "Test", ++ "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test", ++ "ResourceARN": "arn:aws:xray:us-east-1:595986152929:my-service", ++ "Priority": 1, ++ "FixedRate": 0.0, ++ "ReservoirSize": 0, ++ "ServiceName": "*", ++ "ServiceType": "*", ++ "Host": "*", ++ "HTTPMethod": "*", ++ "URLPath": "*", ++ "Version": 1, ++ "SamplingRateBoost": { ++ "MaxRate": 0.2, ++ "CooldownWindowMinutes": 300 ++ }, ++ "Attributes": { ++ "animal": "cat", ++ "speed": "0" ++ } ++} +diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts +index 8250c1bd..74a1a24c 100644 +--- a/disk-buffering/build.gradle.kts ++++ b/disk-buffering/build.gradle.kts +@@ -77,6 +77,10 @@ tasks.named("shadowJar") { + mustRunAfter("jar") + } + ++tasks.withType().configureEach { ++ dependsOn("shadowJar") ++} ++ + // The javadoc from wire's generated classes has errors that make the task that generates the "javadoc" artifact to fail. This + // makes the javadoc task to ignore those generated classes. + tasks.withType(Javadoc::class.java) { +diff --git a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java +index 4c2c9293..4dddd975 100644 +--- a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java ++++ b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java +@@ -44,7 +44,7 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest { + + @Container + GenericContainer kafka = +- new GenericContainer<>("bitnami/kafka:2.8.1") ++ new GenericContainer<>("bitnamilegacy/kafka:2.8.1") + .withNetwork(Network.SHARED) + .withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181") + .withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") +@@ -80,7 +80,7 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest { + }; + + protected GenericContainer kafkaProducerContainer() { +- return new GenericContainer<>("bitnami/kafka:2.8.1") ++ return new GenericContainer<>("bitnamilegacy/kafka:2.8.1") + .withNetwork(Network.SHARED) + .withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181") + .withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") +@@ -207,7 +207,7 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest { + + @Container + GenericContainer consumer = +- new GenericContainer<>("bitnami/kafka:2.8.1") ++ new GenericContainer<>("bitnamilegacy/kafka:2.8.1") + .withNetwork(Network.SHARED) + .withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181") + .withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") +diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java +index 8eb9432a..e46ed07b 100644 +--- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java ++++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java +@@ -12,7 +12,7 @@ import org.testcontainers.containers.wait.strategy.Wait; + public class KafkaContainerFactory { + private static final int KAFKA_PORT = 9092; + private static final String KAFKA_BROKER = "kafka:" + KAFKA_PORT; +- private static final String KAFKA_DOCKER_IMAGE = "bitnami/kafka:2.8.1"; ++ private static final String KAFKA_DOCKER_IMAGE = "bitnamilegacy/kafka:2.8.1"; + + private KafkaContainerFactory() {} + +diff --git a/opamp-client/build.gradle.kts b/opamp-client/build.gradle.kts +index e41d1fff..84a1d559 100644 +--- a/opamp-client/build.gradle.kts ++++ b/opamp-client/build.gradle.kts +@@ -1,6 +1,4 @@ + import de.undercouch.gradle.tasks.download.DownloadExtension +-import java.net.HttpURLConnection +-import java.net.URL + + plugins { + id("otel.java-conventions") +@@ -50,19 +48,7 @@ abstract class DownloadOpampProtos @Inject constructor( + + @TaskAction + fun execute() { +- // Get the latest release tag by following the redirect from GitHub's latest release URL +- val latestReleaseUrl = "https://github.com/open-telemetry/opamp-spec/releases/latest" +- val connection = URL(latestReleaseUrl).openConnection() as HttpURLConnection +- connection.instanceFollowRedirects = false +- connection.requestMethod = "HEAD" +- +- val redirectLocation = connection.getHeaderField("Location") +- connection.disconnect() +- +- // Extract tag from URL like: https://github.com/open-telemetry/opamp-spec/releases/tag/v0.12.0 +- val latestTag = redirectLocation.substringAfterLast("/") +- // Download the source code for the latest release +- val zipUrl = "https://github.com/open-telemetry/opamp-spec/zipball/$latestTag" ++ val zipUrl = "https://github.com/open-telemetry/opamp-spec/zipball/v0.14.0" + + download.run { + src(zipUrl) +diff --git a/version.gradle.kts b/version.gradle.kts +index f8358006..1f7c517f 100644 +--- a/version.gradle.kts ++++ b/version.gradle.kts +@@ -1,5 +1,5 @@ +-val stableVersion = "1.48.0" +-val alphaVersion = "1.48.0-alpha" ++val stableVersion = "1.48.0-adot1" ++val alphaVersion = "1.48.0-alpha-adot1" + + allprojects { + if (findProperty("otel.stable") != "true") { +-- +2.45.1 + diff --git a/.github/patches/versions b/.github/patches/versions index 03f4b3f51f..b1b741c455 100644 --- a/.github/patches/versions +++ b/.github/patches/versions @@ -1 +1,2 @@ OTEL_JAVA_INSTRUMENTATION_VERSION=v2.18.1 +OTEL_JAVA_CONTRIB_VERSION=v1.48.0 \ No newline at end of file diff --git a/awsagentprovider/build.gradle.kts b/awsagentprovider/build.gradle.kts index 3aeb79f935..5cc97b14fc 100644 --- a/awsagentprovider/build.gradle.kts +++ b/awsagentprovider/build.gradle.kts @@ -38,8 +38,10 @@ dependencies { implementation("io.opentelemetry.contrib:opentelemetry-aws-xray") // AWS Resource Detectors implementation("io.opentelemetry.contrib:opentelemetry-aws-resources") - // Json file reader + // JSON file reader implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1") + // YAML file reader + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.16.1") // Import AWS SDK v1 core for ARN parsing utilities implementation("com.amazonaws:aws-java-sdk-core:1.12.773") // Export configuration diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AttributePropagatingSpanProcessor.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AttributePropagatingSpanProcessor.java index 7b03ec068b..a5e59d0540 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AttributePropagatingSpanProcessor.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AttributePropagatingSpanProcessor.java @@ -112,6 +112,7 @@ public void onStart(Context parentContext, ReadWriteSpan span) { if (propagationData != null) { span.setAttribute(propagationDataKey, propagationData); } + span.setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, span.getSpanContext().isSampled()); } private boolean isConsumerKind(ReadableSpan span) { diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java index 073e345de0..4480092c19 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAgentPropertiesCustomizerProvider.java @@ -26,7 +26,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) { () -> new HashMap() { { - put("otel.propagators", "baggage,xray,tracecontext,b3,b3multi"); + put("otel.propagators", "baggage,xray,tracecontext"); put("otel.instrumentation.aws-sdk.experimental-span-attributes", "true"); put( "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java index 13cb4ddd81..1073b52bc3 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java @@ -15,9 +15,14 @@ package software.amazon.opentelemetry.javaagent.providers; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler; +import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig; +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; import io.opentelemetry.contrib.awsxray.ResourceHolder; import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; @@ -42,6 +47,11 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -142,11 +152,16 @@ public final class AwsApplicationSignalsCustomizerProvider private static final String OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG = "otel.exporter.otlp.logs.compression"; + private static final String AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = + "aws.xray.adaptive.sampling.config"; + // UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size. // This is a bit of a magic number, as there is no simple way to tell how many spans can make a // 64KB batch since spans can vary in size. private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10; + private Sampler sampler; + public void customize(AutoConfigurationCustomizer autoConfiguration) { autoConfiguration.addPropertiesCustomizer(this::customizeProperties); autoConfiguration.addPropertiesCustomizer(this::customizeLambdaEnvProperties); @@ -281,6 +296,27 @@ private Resource customizeResource(Resource resource, ConfigProperties configPro } private Sampler customizeSampler(Sampler sampler, ConfigProperties configProps) { + if (sampler instanceof AwsXrayRemoteSampler) { + String config = configProps.getString(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG); + AwsXrayAdaptiveSamplingConfig parsedConfig = null; + + try { + parsedConfig = parseConfigString(config); + } catch (Exception e) { + logger.log( + Level.WARNING, "Failed to parse adaptive sampling configuration: {0}", e.getMessage()); + } + + if (parsedConfig != null) { + try { + ((AwsXrayRemoteSampler) sampler).setAdaptiveSamplingConfig(parsedConfig); + } catch (Exception e) { + logger.log( + Level.WARNING, "Error processing adaptive sampling config: {0}", e.getMessage()); + } + } + this.sampler = sampler; + } if (isApplicationSignalsEnabled(configProps)) { return AlwaysRecordSampler.create(sampler); } @@ -344,10 +380,13 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder( .build(); // Construct and set application signals metrics processor - SpanProcessor spanMetricsProcessor = + AwsSpanMetricsProcessorBuilder awsSpanMetricsProcessorBuilder = AwsSpanMetricsProcessorBuilder.create( - meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush) - .build(); + meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush); + if (this.sampler != null) { + awsSpanMetricsProcessorBuilder.setSampler(this.sampler); + } + SpanProcessor spanMetricsProcessor = awsSpanMetricsProcessorBuilder.build(); tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor); } return tracerProviderBuilder; @@ -423,11 +462,14 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c } if (isApplicationSignalsEnabled(configProps)) { - return AwsMetricAttributesSpanExporterBuilder.create( - spanExporter, ResourceHolder.getResource()) - .build(); + spanExporter = + AwsMetricAttributesSpanExporterBuilder.create(spanExporter, ResourceHolder.getResource()) + .build(); } + if (this.sampler instanceof AwsXrayRemoteSampler) { + ((AwsXrayRemoteSampler) this.sampler).setSpanExporter(spanExporter); + } return spanExporter; } @@ -467,6 +509,44 @@ LogRecordExporter customizeLogsExporter( return logsExporter; } + static AwsXrayAdaptiveSamplingConfig parseConfigString(String config) + throws JsonProcessingException { + if (config == null) { + return null; + } + + // Check if the config is a file path and the file exists + Path path = Paths.get(config); + if (Files.exists(path)) { + try { + config = String.join("\n", Files.readAllLines(path, StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to read adaptive sampling configuration file: " + e.getMessage(), e); + } + } + + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + Map configMap = + yamlMapper.readValue(config, new TypeReference>() {}); + + Object versionObj = configMap.get("version"); + if (versionObj == null) { + throw new IllegalArgumentException( + "Missing required 'version' field in adaptive sampling configuration"); + } + + double version = ((Number) versionObj).doubleValue(); + if (version >= 2L) { + throw new IllegalArgumentException( + "Incompatible adaptive sampling config version: " + + version + + ". This version of the AWS X-Ray remote sampler only supports versions strictly below 2.0."); + } + + return yamlMapper.readValue(config, AwsXrayAdaptiveSamplingConfig.class); + } + private enum ApplicationSignalsExporterProvider { INSTANCE; diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java index 37436f5d3b..9dabe2c3fb 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java @@ -25,12 +25,14 @@ import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.samplers.Sampler; import java.util.Map; import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; @@ -75,6 +77,8 @@ public final class AwsSpanMetricsProcessor implements SpanProcessor { private final Resource resource; private final Supplier forceFlushAction; + private Sampler sampler; + /** Use {@link AwsSpanMetricsProcessorBuilder} to construct this processor. */ static AwsSpanMetricsProcessor create( LongHistogram errorHistogram, @@ -82,9 +86,16 @@ static AwsSpanMetricsProcessor create( DoubleHistogram latencyHistogram, MetricAttributeGenerator generator, Resource resource, + Sampler sampler, Supplier forceFlushAction) { return new AwsSpanMetricsProcessor( - errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction); + errorHistogram, + faultHistogram, + latencyHistogram, + generator, + resource, + sampler, + forceFlushAction); } private AwsSpanMetricsProcessor( @@ -93,12 +104,14 @@ private AwsSpanMetricsProcessor( DoubleHistogram latencyHistogram, MetricAttributeGenerator generator, Resource resource, + Sampler sampler, Supplier forceFlushAction) { this.errorHistogram = errorHistogram; this.faultHistogram = faultHistogram; this.latencyHistogram = latencyHistogram; this.generator = generator; this.resource = resource; + this.sampler = sampler; this.forceFlushAction = forceFlushAction; } @@ -125,6 +138,9 @@ public void onEnd(ReadableSpan span) { for (Map.Entry attribute : attributeMap.entrySet()) { recordMetrics(span, spanData, attribute.getValue()); } + if (sampler != null) { + ((AwsXrayRemoteSampler) sampler).adaptSampling(span, spanData); + } } @Override diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java index 25ae0bd46e..e808543783 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.samplers.Sampler; import java.util.function.Supplier; /** A builder for {@link AwsSpanMetricsProcessor} */ @@ -51,6 +52,7 @@ public final class AwsSpanMetricsProcessorBuilder { // Optional builder elements private MetricAttributeGenerator generator = DEFAULT_GENERATOR; + private Sampler sampler; private String scopeName = DEFAULT_SCOPE_NAME; public static AwsSpanMetricsProcessorBuilder create( @@ -80,6 +82,17 @@ public AwsSpanMetricsProcessorBuilder setGenerator(MetricAttributeGenerator gene return this; } + /** + * Sets the sampler used to determine if the spans should be sampled This will be used to increase + * sampling rate in the case of errors + */ + @CanIgnoreReturnValue + public AwsSpanMetricsProcessorBuilder setSampler(Sampler sampler) { + requireNonNull(sampler, "sampler"); + this.sampler = sampler; + return this; + } + /** * Sets the scope name used in the creation of metrics by the span metrics processor. If unset, * defaults to {@link #DEFAULT_SCOPE_NAME}. Must not be null. @@ -99,6 +112,12 @@ public AwsSpanMetricsProcessor build() { meter.histogramBuilder(LATENCY).setUnit(LATENCY_UNITS).build(); return AwsSpanMetricsProcessor.create( - errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction); + errorHistogram, + faultHistogram, + latencyHistogram, + generator, + resource, + sampler, + forceFlushAction); } } diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java new file mode 100644 index 0000000000..93d6a97f11 --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java @@ -0,0 +1,102 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatException; +import static org.assertj.core.api.Assertions.assertThatNoException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig; +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import org.junit.jupiter.api.Test; + +class AwsApplicationSignalsCustomizerProviderTest { + + @Test + void setAdaptiveSamplingConfigFromString_validConfig() throws JsonProcessingException { + assertThat(AwsApplicationSignalsCustomizerProvider.parseConfigString("version: 1").getVersion()) + .isEqualTo(1); + } + + @Test + void setAdaptiveSamplingConfigFromString_nullConfig() { + assertThatNoException() + .isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString(null)); + } + + @Test + void setAdaptiveSamplingConfigFromString_missingVersion() { + assertThatException() + .isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString("")); + } + + @Test + void setAdaptiveSamplingConfigFromString_unsupportedVersion() { + assertThatException() + .isThrownBy( + () -> AwsApplicationSignalsCustomizerProvider.parseConfigString("{version: 5000.1}")); + } + + @Test + void setAdaptiveSamplingConfigFromString_invalidYaml() { + assertThatException() + .isThrownBy( + () -> + AwsApplicationSignalsCustomizerProvider.parseConfigString( + "{version: 1, invalid: yaml: structure}")); + } + + @Test + void setAdaptiveSamplingConfigFromFile_validYaml() + throws JsonProcessingException, URISyntaxException { + // Get the resource file path + URL resourceUrl = + getClass().getClassLoader().getResource("adaptive-sampling-config-valid.yaml"); + assertThat(resourceUrl).isNotNull(); + + // Get the absolute file path + File configFile = new File(resourceUrl.toURI()); + String absolutePath = configFile.getAbsolutePath(); + + // Parse the config using the file path + AwsXrayAdaptiveSamplingConfig config = + AwsApplicationSignalsCustomizerProvider.parseConfigString(absolutePath); + + // Assert the configuration was parsed correctly + assertThat(config).isNotNull(); + assertThat(config.getVersion()).isEqualTo(1); + assertThat(config.getAnomalyCaptureLimit().getAnomalyTracesPerSecond()).isEqualTo(10); + } + + @Test + void setAdaptiveSamplingConfigFromFile_invalidYaml() throws URISyntaxException { + // Get the resource file path + URL resourceUrl = + getClass().getClassLoader().getResource("adaptive-sampling-config-invalid.yaml"); + assertThat(resourceUrl).isNotNull(); + + // Get the absolute file path + File configFile = new File(resourceUrl.toURI()); + String absolutePath = configFile.getAbsolutePath(); + + // Parse the config using the file path + assertThatException() + .isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString(absolutePath)); + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java index 28a628f526..ad436651c4 100644 --- a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java @@ -36,6 +36,7 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.resources.Resource; @@ -76,6 +77,7 @@ private enum ExpectedStatusMetric { private LongHistogram faultHistogramMock; private DoubleHistogram latencyHistogramMock; private MetricAttributeGenerator generatorMock; + private AwsXrayRemoteSampler samplerMock; private AwsSpanMetricsProcessor awsSpanMetricsProcessor; // Mock forceFlush function that returns success when invoked similar @@ -90,6 +92,7 @@ public void setUpMocks() { faultHistogramMock = mock(LongHistogram.class); latencyHistogramMock = mock(DoubleHistogram.class); generatorMock = mock(MetricAttributeGenerator.class); + samplerMock = mock(AwsXrayRemoteSampler.class); awsSpanMetricsProcessor = AwsSpanMetricsProcessor.create( @@ -98,6 +101,7 @@ public void setUpMocks() { latencyHistogramMock, generatorMock, testResource, + samplerMock, this::forceFlushAction); } diff --git a/awsagentprovider/src/test/resources/adaptive-sampling-config-invalid.yaml b/awsagentprovider/src/test/resources/adaptive-sampling-config-invalid.yaml new file mode 100644 index 0000000000..888ae7ee3e --- /dev/null +++ b/awsagentprovider/src/test/resources/adaptive-sampling-config-invalid.yaml @@ -0,0 +1,13 @@ +version: 1.0 +anomalyConditions: + - errorCodeRegex: "^5\\d\\d$" + usage: both + - errorCodeRegex: "^4\\d\\d$" + usage: both + - errorCodeRegex: "^3\\d\\d$" + usage: both + - errorCodeRegex: "^2\\d\\d$" + operations: invalid part of config + usage: both +anomalyCaptureLimit: + anomalyTracesPerSecond: 10 \ No newline at end of file diff --git a/awsagentprovider/src/test/resources/adaptive-sampling-config-valid.yaml b/awsagentprovider/src/test/resources/adaptive-sampling-config-valid.yaml new file mode 100644 index 0000000000..dcfd187628 --- /dev/null +++ b/awsagentprovider/src/test/resources/adaptive-sampling-config-valid.yaml @@ -0,0 +1,12 @@ +version: 1.0 +anomalyConditions: + - errorCodeRegex: "^5\\d\\d$" + usage: both + - errorCodeRegex: "^4\\d\\d$" + usage: both + - errorCodeRegex: "^3\\d\\d$" + usage: both + - errorCodeRegex: "^2\\d\\d$" + usage: both +anomalyCaptureLimit: + anomalyTracesPerSecond: 10 \ No newline at end of file diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index d186406009..b1f0bcf30b 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -76,7 +76,7 @@ val dependencyLists = listOf( "commons-logging:commons-logging:1.2", "com.sparkjava:spark-core:2.9.4", "com.squareup.okhttp3:okhttp:4.12.0", - "io.opentelemetry.contrib:opentelemetry-aws-xray:1.48.0", + "io.opentelemetry.contrib:opentelemetry-aws-xray:1.48.0-adot1", "io.opentelemetry.contrib:opentelemetry-aws-resources:1.48.0-alpha", "io.opentelemetry.proto:opentelemetry-proto:1.0.0-alpha", "io.opentelemetry.javaagent:opentelemetry-javaagent:$otelJavaAgentVersion", diff --git a/smoke-tests/runner/src/test/java/io/awsobservability/instrumentation/smoketests/runner/SpringBootSmokeTest.java b/smoke-tests/runner/src/test/java/io/awsobservability/instrumentation/smoketests/runner/SpringBootSmokeTest.java index f22b29cf03..2cc06551ee 100644 --- a/smoke-tests/runner/src/test/java/io/awsobservability/instrumentation/smoketests/runner/SpringBootSmokeTest.java +++ b/smoke-tests/runner/src/test/java/io/awsobservability/instrumentation/smoketests/runner/SpringBootSmokeTest.java @@ -165,11 +165,7 @@ void hello() { assertThat(response.status().isSuccess()).isTrue(); assertThat(response.headers()) .extracting(e -> e.getKey().toString()) - .contains( - "received-x-amzn-trace-id", - "received-b3", - "received-x-b3-traceid", - "received-traceparent"); + .contains("received-x-amzn-trace-id", "received-traceparent"); var exported = getExported(); assertThat(exported)