99import  io .opentelemetry .api .trace .SpanKind ;
1010import  io .opentelemetry .context .Context ;
1111import  io .opentelemetry .contrib .awsxray .GetSamplingRulesResponse .SamplingRuleRecord ;
12+ import  io .opentelemetry .contrib .awsxray .GetSamplingTargetsRequest .SamplingBoostStatisticsDocument ;
1213import  io .opentelemetry .contrib .awsxray .GetSamplingTargetsRequest .SamplingStatisticsDocument ;
1314import  io .opentelemetry .contrib .awsxray .GetSamplingTargetsResponse .SamplingTargetDocument ;
1415import  io .opentelemetry .sdk .common .Clock ;
1516import  io .opentelemetry .sdk .resources .Resource ;
17+ import  io .opentelemetry .sdk .trace .ReadableSpan ;
1618import  io .opentelemetry .sdk .trace .data .LinkData ;
19+ import  io .opentelemetry .sdk .trace .data .SpanData ;
20+ import  io .opentelemetry .sdk .trace .export .BatchSpanProcessor ;
21+ import  io .opentelemetry .sdk .trace .export .SpanExporter ;
1722import  io .opentelemetry .sdk .trace .samplers .Sampler ;
1823import  io .opentelemetry .sdk .trace .samplers .SamplingResult ;
1924import  java .io .Closeable ;
2025import  java .time .Duration ;
2126import  java .time .Instant ;
27+ import  java .util .ArrayList ;
2228import  java .util .Date ;
2329import  java .util .Iterator ;
2430import  java .util .List ;
@@ -43,6 +49,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable {
4349
4450  private  static  final  Logger  logger  = Logger .getLogger (AwsXrayRemoteSampler .class .getName ());
4551
52+   // Default batch size to be same as OTel BSP default 
53+   private  static  final  int  maxExportBatchSize  = 512 ;
54+ 
4655  private  final  Resource  resource ;
4756  private  final  Clock  clock ;
4857  private  final  Sampler  initialSampler ;
@@ -59,6 +68,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable {
5968  @ Nullable  private  volatile  XrayRulesSampler  internalXrayRulesSampler ;
6069  private  volatile  Sampler  sampler ;
6170
71+   @ Nullable  private  AwsXrayAdaptiveSamplingConfig  adaptiveSamplingConfig ;
72+   @ Nullable  private  BatchSpanProcessor  bsp ;
73+ 
6274  /** 
6375   * Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link 
6476   * Resource} should be the same as what the OpenTelemetry SDK is configured with. 
@@ -120,6 +132,40 @@ public String getDescription() {
120132    return  "AwsXrayRemoteSampler{"  + sampler .getDescription () + "}" ;
121133  }
122134
135+   public  void  setAdaptiveSamplingConfig (AwsXrayAdaptiveSamplingConfig  config ) {
136+     if  (this .adaptiveSamplingConfig  != null ) {
137+       throw  new  IllegalStateException ("Programming bug - Adaptive sampling config is already set" );
138+     } else  if  (config  != null  && this .adaptiveSamplingConfig  == null ) {
139+       // Save here and also pass to XrayRulesSampler directly as it already exists 
140+       this .adaptiveSamplingConfig  = config ;
141+       if  (sampler  instanceof  XrayRulesSampler ) {
142+         ((XrayRulesSampler ) sampler ).setAdaptiveSamplingConfig (config );
143+       }
144+     }
145+   }
146+ 
147+   public  void  setSpanExporter (SpanExporter  spanExporter ) {
148+     if  (this .bsp  != null ) {
149+       throw  new  IllegalStateException ("Programming bug - BatchSpanProcessor is already set" );
150+     } else  if  (spanExporter  != null  && this .bsp  == null ) {
151+       this .bsp  =
152+           BatchSpanProcessor .builder (spanExporter )
153+               .setExportUnsampledSpans (true ) // Required to capture the unsampled anomaly spans 
154+               .setMaxExportBatchSize (maxExportBatchSize )
155+               .build ();
156+     }
157+   }
158+ 
159+   public  void  adaptSampling (ReadableSpan  span , SpanData  spanData ) {
160+     if  (this .bsp  == null ) {
161+       throw  new  IllegalStateException (
162+           "Programming bug - BatchSpanProcessor is null while trying to adapt sampling" );
163+     }
164+     if  (sampler  instanceof  XrayRulesSampler ) {
165+       ((XrayRulesSampler ) sampler ).adaptSampling (span , spanData , this .bsp ::onEnd );
166+     }
167+   }
168+ 
123169  private  void  getAndUpdateSampler () {
124170    try  {
125171      // No pagination support yet, or possibly ever. 
@@ -134,8 +180,8 @@ private void getAndUpdateSampler() {
134180                initialSampler ,
135181                response .getSamplingRules ().stream ()
136182                    .map (SamplingRuleRecord ::getRule )
137-                     .collect (Collectors .toList ()))); 
138- 
183+                     .collect (Collectors .toList ()), 
184+                  adaptiveSamplingConfig )); 
139185        previousRulesResponse  = response ;
140186        ScheduledFuture <?> existingFetchTargetsFuture  = fetchTargetsFuture ;
141187        if  (existingFetchTargetsFuture  != null ) {
@@ -179,14 +225,29 @@ private void fetchTargets() {
179225    XrayRulesSampler  xrayRulesSampler  = this .internalXrayRulesSampler ;
180226    try  {
181227      Date  now  = Date .from (Instant .ofEpochSecond (0 , clock .now ()));
182-       List <SamplingStatisticsDocument > statistics  = xrayRulesSampler .snapshot (now );
228+       List <SamplingRuleApplier .SamplingRuleStatisticsSnapshot > statisticsSnapshot  =
229+           xrayRulesSampler .snapshot (now );
230+       List <SamplingStatisticsDocument > statistics  = new  ArrayList <SamplingStatisticsDocument >();
231+       List <SamplingBoostStatisticsDocument > boostStatistics  =
232+           new  ArrayList <SamplingBoostStatisticsDocument >();
233+       statisticsSnapshot .stream ()
234+           .forEach (
235+               snapshot  -> {
236+                 if  (snapshot .getStatisticsDocument () != null ) {
237+                   statistics .add (snapshot .getStatisticsDocument ());
238+                 }
239+                 if  (snapshot .getBoostStatisticsDocument () != null 
240+                     && snapshot .getBoostStatisticsDocument ().getTotalCount () > 0 ) {
241+                   boostStatistics .add (snapshot .getBoostStatisticsDocument ());
242+                 }
243+               });
183244      Set <String > requestedTargetRuleNames  =
184245          statistics .stream ()
185246              .map (SamplingStatisticsDocument ::getRuleName )
186247              .collect (Collectors .toSet ());
187248
188-       GetSamplingTargetsResponse   response  = 
189-            client .getSamplingTargets (GetSamplingTargetsRequest . create ( statistics ) );
249+       GetSamplingTargetsRequest   req  =  GetSamplingTargetsRequest . create ( statistics ,  boostStatistics ); 
250+       GetSamplingTargetsResponse   response  =  client .getSamplingTargets (req );
190251      Map <String , SamplingTargetDocument > targets  =
191252          response .getDocuments ().stream ()
192253              .collect (Collectors .toMap (SamplingTargetDocument ::getRuleName , Function .identity ()));
0 commit comments