1313import org .apache .lucene .search .Scorable ;
1414import org .apache .lucene .search .ScoreMode ;
1515import org .elasticsearch .compute .data .BlockFactory ;
16+ import org .elasticsearch .compute .data .DocBlock ;
1617import org .elasticsearch .compute .data .DocVector ;
18+ import org .elasticsearch .compute .data .DoubleVector ;
1719import org .elasticsearch .compute .data .IntBlock ;
1820import org .elasticsearch .compute .data .IntVector ;
1921import org .elasticsearch .compute .data .Page ;
2527import java .util .List ;
2628import java .util .function .Function ;
2729
30+ import static org .apache .lucene .search .ScoreMode .COMPLETE ;
31+ import static org .apache .lucene .search .ScoreMode .COMPLETE_NO_SCORES ;
32+
2833/**
2934 * Source operator that incrementally runs Lucene searches
3035 */
@@ -34,6 +39,7 @@ public class LuceneSourceOperator extends LuceneOperator {
3439 private int remainingDocs ;
3540
3641 private IntVector .Builder docsBuilder ;
42+ private DoubleVector .Builder scoreBuilder ;
3743 private final LeafCollector leafCollector ;
3844 private final int minPageSize ;
3945
@@ -47,15 +53,16 @@ public Factory(
4753 DataPartitioning dataPartitioning ,
4854 int taskConcurrency ,
4955 int maxPageSize ,
50- int limit
56+ int limit ,
57+ boolean scoring
5158 ) {
52- super (contexts , queryFunction , dataPartitioning , taskConcurrency , limit , ScoreMode . COMPLETE_NO_SCORES );
59+ super (contexts , queryFunction , dataPartitioning , taskConcurrency , limit , scoring ? COMPLETE : COMPLETE_NO_SCORES );
5360 this .maxPageSize = maxPageSize ;
5461 }
5562
5663 @ Override
5764 public SourceOperator get (DriverContext driverContext ) {
58- return new LuceneSourceOperator (driverContext .blockFactory (), maxPageSize , sliceQueue , limit );
65+ return new LuceneSourceOperator (driverContext .blockFactory (), maxPageSize , sliceQueue , limit , scoreMode );
5966 }
6067
6168 public int maxPageSize () {
@@ -70,32 +77,65 @@ public String describe() {
7077 + maxPageSize
7178 + ", limit = "
7279 + limit
80+ + ", scoreMode = "
81+ + scoreMode
7382 + "]" ;
7483 }
7584 }
7685
77- public LuceneSourceOperator (BlockFactory blockFactory , int maxPageSize , LuceneSliceQueue sliceQueue , int limit ) {
86+ @ SuppressWarnings ("this-escape" )
87+ public LuceneSourceOperator (BlockFactory blockFactory , int maxPageSize , LuceneSliceQueue sliceQueue , int limit , ScoreMode scoreMode ) {
7888 super (blockFactory , maxPageSize , sliceQueue );
7989 this .minPageSize = Math .max (1 , maxPageSize / 2 );
8090 this .remainingDocs = limit ;
81- this .docsBuilder = blockFactory .newIntVectorBuilder (Math .min (limit , maxPageSize ));
82- this .leafCollector = new LeafCollector () {
83- @ Override
84- public void setScorer (Scorable scorer ) {
85-
91+ int estimatedSize = Math .min (limit , maxPageSize );
92+ boolean success = false ;
93+ try {
94+ this .docsBuilder = blockFactory .newIntVectorBuilder (estimatedSize );
95+ if (scoreMode .needsScores ()) {
96+ scoreBuilder = blockFactory .newDoubleVectorBuilder (estimatedSize );
97+ this .leafCollector = new ScoringCollector ();
98+ } else {
99+ scoreBuilder = null ;
100+ this .leafCollector = new LimitingCollector ();
86101 }
102+ success = true ;
103+ } finally {
104+ if (success == false ) {
105+ close ();
106+ }
107+ }
108+ }
87109
88- @ Override
89- public void collect (int doc ) {
90- if (remainingDocs > 0 ) {
91- --remainingDocs ;
92- docsBuilder .appendInt (doc );
93- currentPagePos ++;
94- } else {
95- throw new CollectionTerminatedException ();
96- }
110+ class LimitingCollector implements LeafCollector {
111+ @ Override
112+ public void setScorer (Scorable scorer ) {}
113+
114+ @ Override
115+ public void collect (int doc ) throws IOException {
116+ if (remainingDocs > 0 ) {
117+ --remainingDocs ;
118+ docsBuilder .appendInt (doc );
119+ currentPagePos ++;
120+ } else {
121+ throw new CollectionTerminatedException ();
97122 }
98- };
123+ }
124+ }
125+
126+ final class ScoringCollector extends LuceneSourceOperator .LimitingCollector {
127+ private Scorable scorable ;
128+
129+ @ Override
130+ public void setScorer (Scorable scorer ) {
131+ this .scorable = scorer ;
132+ }
133+
134+ @ Override
135+ public void collect (int doc ) throws IOException {
136+ super .collect (doc );
137+ scoreBuilder .appendDouble (scorable .score ());
138+ }
99139 }
100140
101141 @ Override
@@ -139,15 +179,27 @@ public Page getCheckedOutput() throws IOException {
139179 IntBlock shard = null ;
140180 IntBlock leaf = null ;
141181 IntVector docs = null ;
182+ DoubleVector scores = null ;
183+ DocBlock docBlock = null ;
142184 try {
143185 shard = blockFactory .newConstantIntBlockWith (scorer .shardContext ().index (), currentPagePos );
144186 leaf = blockFactory .newConstantIntBlockWith (scorer .leafReaderContext ().ord , currentPagePos );
145187 docs = docsBuilder .build ();
146188 docsBuilder = blockFactory .newIntVectorBuilder (Math .min (remainingDocs , maxPageSize ));
147- page = new Page (currentPagePos , new DocVector (shard .asVector (), leaf .asVector (), docs , true ).asBlock ());
189+ docBlock = new DocVector (shard .asVector (), leaf .asVector (), docs , true ).asBlock ();
190+ shard = null ;
191+ leaf = null ;
192+ docs = null ;
193+ if (scoreBuilder == null ) {
194+ page = new Page (currentPagePos , docBlock );
195+ } else {
196+ scores = scoreBuilder .build ();
197+ scoreBuilder = blockFactory .newDoubleVectorBuilder (Math .min (remainingDocs , maxPageSize ));
198+ page = new Page (currentPagePos , docBlock , scores .asBlock ());
199+ }
148200 } finally {
149201 if (page == null ) {
150- Releasables .closeExpectNoException (shard , leaf , docs );
202+ Releasables .closeExpectNoException (shard , leaf , docs , docBlock , scores );
151203 }
152204 }
153205 currentPagePos = 0 ;
@@ -160,7 +212,7 @@ public Page getCheckedOutput() throws IOException {
160212
161213 @ Override
162214 public void close () {
163- docsBuilder .close ();
215+ Releasables .close (docsBuilder , scoreBuilder );
164216 }
165217
166218 @ Override
0 commit comments