99
1010package org .elasticsearch .search .aggregations .bucket .sampler .random ;
1111
12+ import org .apache .lucene .search .BooleanClause ;
13+ import org .apache .lucene .search .BooleanQuery ;
1214import org .apache .lucene .search .CollectionTerminatedException ;
1315import org .apache .lucene .search .DocIdSetIterator ;
16+ import org .apache .lucene .search .ScoreMode ;
1417import org .apache .lucene .search .Scorer ;
1518import org .apache .lucene .search .Weight ;
1619import org .apache .lucene .util .Bits ;
17- import org .elasticsearch .common .CheckedSupplier ;
1820import org .elasticsearch .search .aggregations .AggregationExecutionContext ;
1921import org .elasticsearch .search .aggregations .Aggregator ;
2022import org .elasticsearch .search .aggregations .AggregatorFactories ;
@@ -33,14 +35,13 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single
3335 private final int seed ;
3436 private final Integer shardSeed ;
3537 private final double probability ;
36- private final CheckedSupplier < Weight , IOException > weightSupplier ;
38+ private Weight weight ;
3739
3840 RandomSamplerAggregator (
3941 String name ,
4042 int seed ,
4143 Integer shardSeed ,
4244 double probability ,
43- CheckedSupplier <Weight , IOException > weightSupplier ,
4445 AggregatorFactories factories ,
4546 AggregationContext context ,
4647 Aggregator parent ,
@@ -55,10 +56,33 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single
5556 RandomSamplerAggregationBuilder .NAME + " aggregation [" + name + "] must have sub aggregations configured"
5657 );
5758 }
58- this .weightSupplier = weightSupplier ;
5959 this .shardSeed = shardSeed ;
6060 }
6161
62+ /**
63+ * This creates the query weight which will be used in the aggregator.
64+ *
65+ * This weight is a boolean query between {@link RandomSamplingQuery} and the configured top level query of the search. This allows
66+ * the aggregation to iterate the documents directly, thus sampling in the background instead of the foreground.
67+ * @return weight to be used, is cached for additional usages
68+ * @throws IOException when building the weight or queries fails;
69+ */
70+ private Weight getWeight () throws IOException {
71+ if (weight == null ) {
72+ RandomSamplingQuery query = new RandomSamplingQuery (
73+ probability ,
74+ seed ,
75+ shardSeed == null ? context .shardRandomSeed () : shardSeed
76+ );
77+ ScoreMode scoreMode = scoreMode ();
78+ BooleanQuery booleanQuery = new BooleanQuery .Builder ().add (query , BooleanClause .Occur .FILTER )
79+ .add (context .query (), scoreMode .needsScores () ? BooleanClause .Occur .MUST : BooleanClause .Occur .FILTER )
80+ .build ();
81+ weight = context .searcher ().createWeight (context .searcher ().rewrite (booleanQuery ), scoreMode , 1f );
82+ }
83+ return weight ;
84+ }
85+
6286 @ Override
6387 public InternalAggregation [] buildAggregations (long [] owningBucketOrds ) throws IOException {
6488 return buildAggregationsForSingleBucket (
@@ -111,7 +135,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
111135 };
112136 }
113137 // TODO know when sampling would be much slower and skip sampling: https://github.com/elastic/elasticsearch/issues/84353
114- Scorer scorer = weightSupplier . get ().scorer (aggCtx .getLeafReaderContext ());
138+ Scorer scorer = getWeight ().scorer (aggCtx .getLeafReaderContext ());
115139 // This means there are no docs to iterate, possibly due to the fields not existing
116140 if (scorer == null ) {
117141 return LeafBucketCollector .NO_OP_COLLECTOR ;
0 commit comments