1616import org .elasticsearch .compute .data .DocBlock ;
1717import org .elasticsearch .compute .data .DocVector ;
1818import org .elasticsearch .compute .data .DoubleVector ;
19- import org .elasticsearch .compute .data .IntBlock ;
2019import org .elasticsearch .compute .data .IntVector ;
2120import org .elasticsearch .compute .data .Page ;
2221import org .elasticsearch .compute .operator .DriverContext ;
22+ import org .elasticsearch .compute .operator .Limiter ;
2323import org .elasticsearch .compute .operator .SourceOperator ;
2424import org .elasticsearch .core .Releasables ;
2525
@@ -37,6 +37,7 @@ public class LuceneSourceOperator extends LuceneOperator {
3737
3838 private int currentPagePos = 0 ;
3939 private int remainingDocs ;
40+ private final Limiter limiter ;
4041
4142 private IntVector .Builder docsBuilder ;
4243 private DoubleVector .Builder scoreBuilder ;
@@ -46,6 +47,7 @@ public class LuceneSourceOperator extends LuceneOperator {
4647 public static class Factory extends LuceneOperator .Factory {
4748
4849 private final int maxPageSize ;
50+ private final Limiter limiter ;
4951
5052 public Factory (
5153 List <? extends ShardContext > contexts ,
@@ -58,11 +60,13 @@ public Factory(
5860 ) {
5961 super (contexts , queryFunction , dataPartitioning , taskConcurrency , limit , scoring ? COMPLETE : COMPLETE_NO_SCORES );
6062 this .maxPageSize = maxPageSize ;
63+ // TODO: use a single limiter for multiple stage execution
64+ this .limiter = limit == NO_LIMIT ? Limiter .NO_LIMIT : new Limiter (limit );
6165 }
6266
6367 @ Override
6468 public SourceOperator get (DriverContext driverContext ) {
65- return new LuceneSourceOperator (driverContext .blockFactory (), maxPageSize , sliceQueue , limit , scoreMode );
69+ return new LuceneSourceOperator (driverContext .blockFactory (), maxPageSize , sliceQueue , limit , limiter , scoreMode );
6670 }
6771
6872 public int maxPageSize () {
@@ -84,10 +88,18 @@ public String describe() {
8488 }
8589
8690 @ SuppressWarnings ("this-escape" )
87- public LuceneSourceOperator (BlockFactory blockFactory , int maxPageSize , LuceneSliceQueue sliceQueue , int limit , ScoreMode scoreMode ) {
91+ public LuceneSourceOperator (
92+ BlockFactory blockFactory ,
93+ int maxPageSize ,
94+ LuceneSliceQueue sliceQueue ,
95+ int limit ,
96+ Limiter limiter ,
97+ ScoreMode scoreMode
98+ ) {
8899 super (blockFactory , maxPageSize , sliceQueue );
89100 this .minPageSize = Math .max (1 , maxPageSize / 2 );
90101 this .remainingDocs = limit ;
102+ this .limiter = limiter ;
91103 int estimatedSize = Math .min (limit , maxPageSize );
92104 boolean success = false ;
93105 try {
@@ -140,7 +152,7 @@ public void collect(int doc) throws IOException {
140152
141153 @ Override
142154 public boolean isFinished () {
143- return doneCollecting || remainingDocs <= 0 ;
155+ return doneCollecting || limiter . remaining () <= 0 ;
144156 }
145157
146158 @ Override
@@ -160,6 +172,7 @@ public Page getCheckedOutput() throws IOException {
160172 if (scorer == null ) {
161173 return null ;
162174 }
175+ final int remainingDocsStart = remainingDocs = limiter .remaining ();
163176 try {
164177 scorer .scoreNextRange (
165178 leafCollector ,
@@ -171,28 +184,32 @@ public Page getCheckedOutput() throws IOException {
171184 );
172185 } catch (CollectionTerminatedException ex ) {
173186 // The leaf collector terminated the execution
187+ doneCollecting = true ;
174188 scorer .markAsDone ();
175189 }
190+ final int collectedDocs = remainingDocsStart - remainingDocs ;
191+ final int discardedDocs = collectedDocs - limiter .tryAccumulateHits (collectedDocs );
176192 Page page = null ;
177- if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer . isDone () ) {
178- IntBlock shard = null ;
179- IntBlock leaf = null ;
193+ if (currentPagePos >= minPageSize || scorer . isDone () || limiter . remaining () == 0 ) {
194+ IntVector shard = null ;
195+ IntVector leaf = null ;
180196 IntVector docs = null ;
181197 DoubleVector scores = null ;
182198 DocBlock docBlock = null ;
199+ currentPagePos -= discardedDocs ;
183200 try {
184- shard = blockFactory .newConstantIntBlockWith (scorer .shardContext ().index (), currentPagePos );
185- leaf = blockFactory .newConstantIntBlockWith (scorer .leafReaderContext ().ord , currentPagePos );
186- docs = docsBuilder . build ( );
201+ shard = blockFactory .newConstantIntVector (scorer .shardContext ().index (), currentPagePos );
202+ leaf = blockFactory .newConstantIntVector (scorer .leafReaderContext ().ord , currentPagePos );
203+ docs = buildDocsVector ( currentPagePos );
187204 docsBuilder = blockFactory .newIntVectorBuilder (Math .min (remainingDocs , maxPageSize ));
188- docBlock = new DocVector (shard . asVector () , leaf . asVector () , docs , true ).asBlock ();
205+ docBlock = new DocVector (shard , leaf , docs , true ).asBlock ();
189206 shard = null ;
190207 leaf = null ;
191208 docs = null ;
192209 if (scoreBuilder == null ) {
193210 page = new Page (currentPagePos , docBlock );
194211 } else {
195- scores = scoreBuilder . build ( );
212+ scores = buildScoresVector ( currentPagePos );
196213 scoreBuilder = blockFactory .newDoubleVectorBuilder (Math .min (remainingDocs , maxPageSize ));
197214 page = new Page (currentPagePos , docBlock , scores .asBlock ());
198215 }
@@ -209,6 +226,36 @@ public Page getCheckedOutput() throws IOException {
209226 }
210227 }
211228
229+ private IntVector buildDocsVector (int upToPositions ) {
230+ final IntVector docs = docsBuilder .build ();
231+ assert docs .getPositionCount () >= upToPositions : docs .getPositionCount () + " < " + upToPositions ;
232+ if (docs .getPositionCount () == upToPositions ) {
233+ return docs ;
234+ }
235+ try (var slice = blockFactory .newIntVectorFixedBuilder (upToPositions )) {
236+ for (int i = 0 ; i < upToPositions ; i ++) {
237+ slice .appendInt (docs .getInt (i ));
238+ }
239+ docs .close ();
240+ return slice .build ();
241+ }
242+ }
243+
244+ private DoubleVector buildScoresVector (int upToPositions ) {
245+ final DoubleVector scores = scoreBuilder .build ();
246+ assert scores .getPositionCount () >= upToPositions : scores .getPositionCount () + " < " + upToPositions ;
247+ if (scores .getPositionCount () == upToPositions ) {
248+ return scores ;
249+ }
250+ try (var slice = blockFactory .newDoubleVectorBuilder (upToPositions )) {
251+ for (int i = 0 ; i < upToPositions ; i ++) {
252+ slice .appendDouble (scores .getDouble (i ));
253+ }
254+ scores .close ();
255+ return slice .build ();
256+ }
257+ }
258+
212259 @ Override
213260 public void close () {
214261 Releasables .close (docsBuilder , scoreBuilder );
0 commit comments