99import io .opentelemetry .api .trace .SpanKind ;
1010import io .opentelemetry .context .Context ;
1111import io .opentelemetry .contrib .awsxray .GetSamplingRulesResponse .SamplingRuleRecord ;
12+ import io .opentelemetry .contrib .awsxray .GetSamplingTargetsRequest .SamplingBoostStatisticsDocument ;
1213import io .opentelemetry .contrib .awsxray .GetSamplingTargetsRequest .SamplingStatisticsDocument ;
1314import io .opentelemetry .contrib .awsxray .GetSamplingTargetsResponse .SamplingTargetDocument ;
1415import io .opentelemetry .sdk .common .Clock ;
1516import io .opentelemetry .sdk .resources .Resource ;
17+ import io .opentelemetry .sdk .trace .ReadableSpan ;
1618import io .opentelemetry .sdk .trace .data .LinkData ;
19+ import io .opentelemetry .sdk .trace .data .SpanData ;
20+ import io .opentelemetry .sdk .trace .export .BatchSpanProcessor ;
21+ import io .opentelemetry .sdk .trace .export .SpanExporter ;
1722import io .opentelemetry .sdk .trace .samplers .Sampler ;
1823import io .opentelemetry .sdk .trace .samplers .SamplingResult ;
1924import java .io .Closeable ;
2025import java .time .Duration ;
2126import java .time .Instant ;
27+ import java .util .ArrayList ;
2228import java .util .Date ;
2329import java .util .Iterator ;
2430import java .util .List ;
@@ -43,6 +49,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable {
4349
4450 private static final Logger logger = Logger .getLogger (AwsXrayRemoteSampler .class .getName ());
4551
52+ // Default batch size to be same as OTel BSP default
53+ private static final int maxExportBatchSize = 512 ;
54+
4655 private final Resource resource ;
4756 private final Clock clock ;
4857 private final Sampler initialSampler ;
@@ -59,6 +68,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable {
5968 @ Nullable private volatile XrayRulesSampler internalXrayRulesSampler ;
6069 private volatile Sampler sampler ;
6170
71+ @ Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig ;
72+ @ Nullable private BatchSpanProcessor bsp ;
73+
6274 /**
6375 * Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link
6476 * Resource} should be the same as what the OpenTelemetry SDK is configured with.
@@ -120,6 +132,40 @@ public String getDescription() {
120132 return "AwsXrayRemoteSampler{" + sampler .getDescription () + "}" ;
121133 }
122134
135+ public void setAdaptiveSamplingConfig (AwsXrayAdaptiveSamplingConfig config ) {
136+ if (this .adaptiveSamplingConfig != null ) {
137+ throw new IllegalStateException ("Programming bug - Adaptive sampling config is already set" );
138+ } else if (config != null && this .adaptiveSamplingConfig == null ) {
139+ // Save here and also pass to XrayRulesSampler directly as it already exists
140+ this .adaptiveSamplingConfig = config ;
141+ if (internalXrayRulesSampler != null ) {
142+ internalXrayRulesSampler .setAdaptiveSamplingConfig (config );
143+ }
144+ }
145+ }
146+
147+ public void setSpanExporter (SpanExporter spanExporter ) {
148+ if (this .bsp != null ) {
149+ throw new IllegalStateException ("Programming bug - BatchSpanProcessor is already set" );
150+ } else if (spanExporter != null && this .bsp == null ) {
151+ this .bsp =
152+ BatchSpanProcessor .builder (spanExporter )
153+ .setExportUnsampledSpans (true ) // Required to capture the unsampled anomaly spans
154+ .setMaxExportBatchSize (maxExportBatchSize )
155+ .build ();
156+ }
157+ }
158+
159+ public void adaptSampling (ReadableSpan span , SpanData spanData ) {
160+ if (this .bsp == null ) {
161+ throw new IllegalStateException (
162+ "Programming bug - BatchSpanProcessor is null while trying to adapt sampling" );
163+ }
164+ if (internalXrayRulesSampler != null ) {
165+ internalXrayRulesSampler .adaptSampling (span , spanData , this .bsp ::onEnd );
166+ }
167+ }
168+
123169 private void getAndUpdateSampler () {
124170 try {
125171 // No pagination support yet, or possibly ever.
@@ -134,8 +180,8 @@ private void getAndUpdateSampler() {
134180 initialSampler ,
135181 response .getSamplingRules ().stream ()
136182 .map (SamplingRuleRecord ::getRule )
137- .collect (Collectors .toList ())));
138-
183+ .collect (Collectors .toList ()),
184+ adaptiveSamplingConfig ));
139185 previousRulesResponse = response ;
140186 ScheduledFuture <?> existingFetchTargetsFuture = fetchTargetsFuture ;
141187 if (existingFetchTargetsFuture != null ) {
@@ -179,14 +225,29 @@ private void fetchTargets() {
179225 XrayRulesSampler xrayRulesSampler = this .internalXrayRulesSampler ;
180226 try {
181227 Date now = Date .from (Instant .ofEpochSecond (0 , clock .now ()));
182- List <SamplingStatisticsDocument > statistics = xrayRulesSampler .snapshot (now );
228+ List <SamplingRuleApplier .SamplingRuleStatisticsSnapshot > statisticsSnapshot =
229+ xrayRulesSampler .snapshot (now );
230+ List <SamplingStatisticsDocument > statistics = new ArrayList <SamplingStatisticsDocument >();
231+ List <SamplingBoostStatisticsDocument > boostStatistics =
232+ new ArrayList <SamplingBoostStatisticsDocument >();
233+ statisticsSnapshot .stream ()
234+ .forEach (
235+ snapshot -> {
236+ if (snapshot .getStatisticsDocument () != null ) {
237+ statistics .add (snapshot .getStatisticsDocument ());
238+ }
239+ if (snapshot .getBoostStatisticsDocument () != null
240+ && snapshot .getBoostStatisticsDocument ().getTotalCount () > 0 ) {
241+ boostStatistics .add (snapshot .getBoostStatisticsDocument ());
242+ }
243+ });
183244 Set <String > requestedTargetRuleNames =
184245 statistics .stream ()
185246 .map (SamplingStatisticsDocument ::getRuleName )
186247 .collect (Collectors .toSet ());
187248
188- GetSamplingTargetsResponse response =
189- client .getSamplingTargets (GetSamplingTargetsRequest . create ( statistics ) );
249+ GetSamplingTargetsRequest req = GetSamplingTargetsRequest . create ( statistics , boostStatistics );
250+ GetSamplingTargetsResponse response = client .getSamplingTargets (req );
190251 Map <String , SamplingTargetDocument > targets =
191252 response .getDocuments ().stream ()
192253 .collect (Collectors .toMap (SamplingTargetDocument ::getRuleName , Function .identity ()));
0 commit comments