|
7 | 7 |
|
8 | 8 | package org.elasticsearch.compute.lucene; |
9 | 9 |
|
10 | | -import org.apache.lucene.index.LeafReaderContext; |
11 | | -import org.apache.lucene.search.BulkScorer; |
12 | | -import org.apache.lucene.search.IndexSearcher; |
13 | | -import org.apache.lucene.search.LeafCollector; |
14 | 10 | import org.apache.lucene.search.Query; |
15 | 11 | import org.apache.lucene.search.Scorable; |
16 | 12 | import org.apache.lucene.search.ScoreMode; |
17 | | -import org.apache.lucene.search.Scorer; |
18 | | -import org.apache.lucene.search.Weight; |
19 | | -import org.apache.lucene.util.ArrayUtil; |
20 | | -import org.apache.lucene.util.Bits; |
21 | 13 | import org.elasticsearch.compute.data.Block; |
22 | 14 | import org.elasticsearch.compute.data.BlockFactory; |
23 | 15 | import org.elasticsearch.compute.data.BooleanVector; |
24 | | -import org.elasticsearch.compute.data.DocBlock; |
25 | | -import org.elasticsearch.compute.data.DocVector; |
26 | | -import org.elasticsearch.compute.data.IntVector; |
27 | 16 | import org.elasticsearch.compute.data.Page; |
| 17 | +import org.elasticsearch.compute.data.Vector; |
28 | 18 | import org.elasticsearch.compute.operator.DriverContext; |
29 | 19 | import org.elasticsearch.compute.operator.EvalOperator; |
30 | | -import org.elasticsearch.core.Releasable; |
31 | | -import org.elasticsearch.core.Releasables; |
32 | 20 |
|
33 | 21 | import java.io.IOException; |
34 | | -import java.io.UncheckedIOException; |
35 | 22 |
|
36 | 23 | /** |
37 | 24 | * {@link EvalOperator.ExpressionEvaluator} to run a Lucene {@link Query} during |
38 | 25 | * the compute engine's normal execution, yielding matches/does not match into |
39 | | - * a {@link BooleanVector}. It's much faster to push these to the |
40 | | - * {@link LuceneSourceOperator} or the like, but sometimes this isn't possible. So |
41 | | - * this evaluator is here to save the day. |
| 26 | + * a {@link BooleanVector}. |
| 27 | + * @see LuceneQueryScoreEvaluator |
42 | 28 | */ |
43 | | -public class LuceneQueryExpressionEvaluator implements EvalOperator.ExpressionEvaluator { |
44 | | - public record ShardConfig(Query query, IndexSearcher searcher) {} |
| 29 | +public class LuceneQueryExpressionEvaluator extends LuceneQueryEvaluator<BooleanVector.Builder> |
| 30 | + implements |
| 31 | + EvalOperator.ExpressionEvaluator { |
45 | 32 |
|
46 | | - private final BlockFactory blockFactory; |
47 | | - private final ShardConfig[] shards; |
48 | | - |
49 | | - private ShardState[] perShardState = EMPTY_SHARD_STATES; |
50 | | - |
51 | | - public LuceneQueryExpressionEvaluator(BlockFactory blockFactory, ShardConfig[] shards) { |
52 | | - this.blockFactory = blockFactory; |
53 | | - this.shards = shards; |
| 33 | + LuceneQueryExpressionEvaluator(BlockFactory blockFactory, ShardConfig[] shards) { |
| 34 | + super(blockFactory, shards); |
54 | 35 | } |
55 | 36 |
|
56 | 37 | @Override |
57 | 38 | public Block eval(Page page) { |
58 | | - // Lucene based operators retrieve DocVectors as first block |
59 | | - Block block = page.getBlock(0); |
60 | | - assert block instanceof DocBlock : "LuceneQueryExpressionEvaluator expects DocBlock as input"; |
61 | | - DocVector docs = (DocVector) block.asVector(); |
62 | | - try { |
63 | | - if (docs.singleSegmentNonDecreasing()) { |
64 | | - return evalSingleSegmentNonDecreasing(docs).asBlock(); |
65 | | - } else { |
66 | | - return evalSlow(docs).asBlock(); |
67 | | - } |
68 | | - } catch (IOException e) { |
69 | | - throw new UncheckedIOException(e); |
70 | | - } |
71 | | - } |
72 | | - |
73 | | - /** |
74 | | - * Evaluate {@link DocVector#singleSegmentNonDecreasing()} documents. |
75 | | - * <p> |
76 | | - * ESQL receives documents in DocVector, and they can be in one of two |
77 | | - * states. Either the DocVector contains documents from a single segment |
78 | | - * non-decreasing order, or it doesn't. that first case is much more like |
79 | | - * how Lucene likes to process documents. and it's much more common. So we |
80 | | - * optimize for it. |
81 | | - * <p> |
82 | | - * Vectors that are {@link DocVector#singleSegmentNonDecreasing()} |
83 | | - * represent many documents from a single Lucene segment. In Elasticsearch |
84 | | - * terms that's a segment in a single shard. And the document ids are in |
85 | | - * non-decreasing order. Probably just {@code 0, 1, 2, 3, 4, 5...}. |
86 | | - * But maybe something like {@code 0, 5, 6, 10, 10, 10}. Both of those are |
87 | | - * very like how lucene "natively" processes documents and this optimizes |
88 | | - * those accesses. |
89 | | - * </p> |
90 | | - * <p> |
91 | | - * If the documents are literally {@code 0, 1, ... n} then we use |
92 | | - * {@link BulkScorer#score(LeafCollector, Bits, int, int)} which processes |
93 | | - * a whole range. This should be quite common in the case where we don't |
94 | | - * have deleted documents because that's the order that |
95 | | - * {@link LuceneSourceOperator} produces them. |
96 | | - * </p> |
97 | | - * <p> |
98 | | - * If there are gaps in the sequence we use {@link Scorer} calls to |
99 | | - * score the sequence. This'll be less fast but isn't going be particularly |
100 | | - * common. |
101 | | - * </p> |
102 | | - */ |
103 | | - private BooleanVector evalSingleSegmentNonDecreasing(DocVector docs) throws IOException { |
104 | | - ShardState shardState = shardState(docs.shards().getInt(0)); |
105 | | - SegmentState segmentState = shardState.segmentState(docs.segments().getInt(0)); |
106 | | - int min = docs.docs().getInt(0); |
107 | | - int max = docs.docs().getInt(docs.getPositionCount() - 1); |
108 | | - int length = max - min + 1; |
109 | | - if (length == docs.getPositionCount() && length > 1) { |
110 | | - return segmentState.scoreDense(min, max); |
111 | | - } |
112 | | - return segmentState.scoreSparse(docs.docs()); |
113 | | - } |
114 | | - |
115 | | - /** |
116 | | - * Evaluate non-{@link DocVector#singleSegmentNonDecreasing()} documents. See |
117 | | - * {@link #evalSingleSegmentNonDecreasing} for the meaning of |
118 | | - * {@link DocVector#singleSegmentNonDecreasing()} and how we can efficiently |
119 | | - * evaluate those segments. |
120 | | - * <p> |
121 | | - * This processes the worst case blocks of documents. These can be from any |
122 | | - * number of shards and any number of segments and in any order. We do this |
123 | | - * by iterating the docs in {@code shard ASC, segment ASC, doc ASC} order. |
124 | | - * So, that's segment by segment, docs ascending. We build a boolean block |
125 | | - * out of that. Then we <strong>sort</strong> that to put the booleans in |
126 | | - * the order that the {@link DocVector} came in. |
127 | | - * </p> |
128 | | - */ |
129 | | - private BooleanVector evalSlow(DocVector docs) throws IOException { |
130 | | - int[] map = docs.shardSegmentDocMapForwards(); |
131 | | - // Clear any state flags from the previous run |
132 | | - int prevShard = -1; |
133 | | - int prevSegment = -1; |
134 | | - SegmentState segmentState = null; |
135 | | - try (BooleanVector.Builder builder = blockFactory.newBooleanVectorFixedBuilder(docs.getPositionCount())) { |
136 | | - for (int i = 0; i < docs.getPositionCount(); i++) { |
137 | | - int shard = docs.shards().getInt(docs.shards().getInt(map[i])); |
138 | | - int segment = docs.segments().getInt(map[i]); |
139 | | - if (segmentState == null || prevShard != shard || prevSegment != segment) { |
140 | | - segmentState = shardState(shard).segmentState(segment); |
141 | | - segmentState.initScorer(docs.docs().getInt(map[i])); |
142 | | - prevShard = shard; |
143 | | - prevSegment = segment; |
144 | | - } |
145 | | - if (segmentState.noMatch) { |
146 | | - builder.appendBoolean(false); |
147 | | - } else { |
148 | | - segmentState.scoreSingleDocWithScorer(builder, docs.docs().getInt(map[i])); |
149 | | - } |
150 | | - } |
151 | | - try (BooleanVector outOfOrder = builder.build()) { |
152 | | - return outOfOrder.filter(docs.shardSegmentDocMapBackwards()); |
153 | | - } |
154 | | - } |
| 39 | + return executeQuery(page); |
155 | 40 | } |
156 | 41 |
|
157 | 42 | @Override |
158 | | - public void close() { |
159 | | - |
| 43 | + protected ScoreMode scoreMode() { |
| 44 | + return ScoreMode.COMPLETE_NO_SCORES; |
160 | 45 | } |
161 | 46 |
|
162 | | - private ShardState shardState(int shard) throws IOException { |
163 | | - if (shard >= perShardState.length) { |
164 | | - perShardState = ArrayUtil.grow(perShardState, shard + 1); |
165 | | - } else if (perShardState[shard] != null) { |
166 | | - return perShardState[shard]; |
167 | | - } |
168 | | - perShardState[shard] = new ShardState(shards[shard]); |
169 | | - return perShardState[shard]; |
| 47 | + @Override |
| 48 | + protected Vector createNoMatchVector(BlockFactory blockFactory, int size) { |
| 49 | + return blockFactory.newConstantBooleanVector(false, size); |
170 | 50 | } |
171 | 51 |
|
172 | | - private class ShardState { |
173 | | - private final Weight weight; |
174 | | - private final IndexSearcher searcher; |
175 | | - private SegmentState[] perSegmentState = EMPTY_SEGMENT_STATES; |
176 | | - |
177 | | - ShardState(ShardConfig config) throws IOException { |
178 | | - weight = config.searcher.createWeight(config.query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); |
179 | | - searcher = config.searcher; |
180 | | - } |
181 | | - |
182 | | - SegmentState segmentState(int segment) throws IOException { |
183 | | - if (segment >= perSegmentState.length) { |
184 | | - perSegmentState = ArrayUtil.grow(perSegmentState, segment + 1); |
185 | | - } else if (perSegmentState[segment] != null) { |
186 | | - return perSegmentState[segment]; |
187 | | - } |
188 | | - perSegmentState[segment] = new SegmentState(weight, searcher.getLeafContexts().get(segment)); |
189 | | - return perSegmentState[segment]; |
190 | | - } |
| 52 | + @Override |
| 53 | + protected BooleanVector.Builder createVectorBuilder(BlockFactory blockFactory, int size) { |
| 54 | + return blockFactory.newBooleanVectorFixedBuilder(size); |
191 | 55 | } |
192 | 56 |
|
193 | | - private class SegmentState { |
194 | | - private final Weight weight; |
195 | | - private final LeafReaderContext ctx; |
196 | | - |
197 | | - /** |
198 | | - * Lazily initialed {@link Scorer} for this. {@code null} here means uninitialized |
199 | | - * <strong>or</strong> that {@link #noMatch} is true. |
200 | | - */ |
201 | | - private Scorer scorer; |
202 | | - |
203 | | - /** |
204 | | - * Thread that initialized the {@link #scorer}. |
205 | | - */ |
206 | | - private Thread scorerThread; |
207 | | - |
208 | | - /** |
209 | | - * Lazily initialed {@link BulkScorer} for this. {@code null} here means uninitialized |
210 | | - * <strong>or</strong> that {@link #noMatch} is true. |
211 | | - */ |
212 | | - private BulkScorer bulkScorer; |
213 | | - |
214 | | - /** |
215 | | - * Thread that initialized the {@link #bulkScorer}. |
216 | | - */ |
217 | | - private Thread bulkScorerThread; |
218 | | - |
219 | | - /** |
220 | | - * Set to {@code true} if, in the process of building a {@link Scorer} or {@link BulkScorer}, |
221 | | - * the {@link Weight} tells us there aren't any matches. |
222 | | - */ |
223 | | - private boolean noMatch; |
224 | | - |
225 | | - private SegmentState(Weight weight, LeafReaderContext ctx) { |
226 | | - this.weight = weight; |
227 | | - this.ctx = ctx; |
228 | | - } |
229 | | - |
230 | | - /** |
231 | | - * Score a range using the {@link BulkScorer}. This should be faster |
232 | | - * than using {@link #scoreSparse} for dense doc ids. |
233 | | - */ |
234 | | - BooleanVector scoreDense(int min, int max) throws IOException { |
235 | | - int length = max - min + 1; |
236 | | - if (noMatch) { |
237 | | - return blockFactory.newConstantBooleanVector(false, length); |
238 | | - } |
239 | | - if (bulkScorer == null || // The bulkScorer wasn't initialized |
240 | | - Thread.currentThread() != bulkScorerThread // The bulkScorer was initialized on a different thread |
241 | | - ) { |
242 | | - bulkScorerThread = Thread.currentThread(); |
243 | | - bulkScorer = weight.bulkScorer(ctx); |
244 | | - if (bulkScorer == null) { |
245 | | - noMatch = true; |
246 | | - return blockFactory.newConstantBooleanVector(false, length); |
247 | | - } |
248 | | - } |
249 | | - try (DenseCollector collector = new DenseCollector(blockFactory, min, max)) { |
250 | | - bulkScorer.score(collector, ctx.reader().getLiveDocs(), min, max + 1); |
251 | | - return collector.build(); |
252 | | - } |
253 | | - } |
254 | | - |
255 | | - /** |
256 | | - * Score a vector of doc ids using {@link Scorer}. If you have a dense range of |
257 | | - * doc ids it'd be faster to use {@link #scoreDense}. |
258 | | - */ |
259 | | - BooleanVector scoreSparse(IntVector docs) throws IOException { |
260 | | - initScorer(docs.getInt(0)); |
261 | | - if (noMatch) { |
262 | | - return blockFactory.newConstantBooleanVector(false, docs.getPositionCount()); |
263 | | - } |
264 | | - try (BooleanVector.Builder builder = blockFactory.newBooleanVectorFixedBuilder(docs.getPositionCount())) { |
265 | | - for (int i = 0; i < docs.getPositionCount(); i++) { |
266 | | - scoreSingleDocWithScorer(builder, docs.getInt(i)); |
267 | | - } |
268 | | - return builder.build(); |
269 | | - } |
270 | | - } |
271 | | - |
272 | | - private void initScorer(int minDocId) throws IOException { |
273 | | - if (noMatch) { |
274 | | - return; |
275 | | - } |
276 | | - if (scorer == null || // Scorer not initialized |
277 | | - scorerThread != Thread.currentThread() || // Scorer initialized on a different thread |
278 | | - scorer.iterator().docID() > minDocId // The previous block came "after" this one |
279 | | - ) { |
280 | | - scorerThread = Thread.currentThread(); |
281 | | - scorer = weight.scorer(ctx); |
282 | | - if (scorer == null) { |
283 | | - noMatch = true; |
284 | | - } |
285 | | - } |
286 | | - } |
287 | | - |
288 | | - private void scoreSingleDocWithScorer(BooleanVector.Builder builder, int doc) throws IOException { |
289 | | - if (scorer.iterator().docID() == doc) { |
290 | | - builder.appendBoolean(true); |
291 | | - } else if (scorer.iterator().docID() > doc) { |
292 | | - builder.appendBoolean(false); |
293 | | - } else { |
294 | | - builder.appendBoolean(scorer.iterator().advance(doc) == doc); |
295 | | - } |
296 | | - } |
| 57 | + @Override |
| 58 | + protected void appendNoMatch(BooleanVector.Builder builder) { |
| 59 | + builder.appendBoolean(false); |
297 | 60 | } |
298 | 61 |
|
299 | | - private static final ShardState[] EMPTY_SHARD_STATES = new ShardState[0]; |
300 | | - private static final SegmentState[] EMPTY_SEGMENT_STATES = new SegmentState[0]; |
301 | | - |
302 | | - /** |
303 | | - * Collects matching information for dense range of doc ids. This assumes that |
304 | | - * doc ids are sent to {@link LeafCollector#collect(int)} in ascending order |
305 | | - * which isn't documented, but @jpountz swears is true. |
306 | | - */ |
307 | | - static class DenseCollector implements LeafCollector, Releasable { |
308 | | - private final BooleanVector.FixedBuilder builder; |
309 | | - private final int max; |
310 | | - |
311 | | - int next; |
312 | | - |
313 | | - DenseCollector(BlockFactory blockFactory, int min, int max) { |
314 | | - this.builder = blockFactory.newBooleanVectorFixedBuilder(max - min + 1); |
315 | | - this.max = max; |
316 | | - next = min; |
317 | | - } |
318 | | - |
319 | | - @Override |
320 | | - public void setScorer(Scorable scorable) {} |
321 | | - |
322 | | - @Override |
323 | | - public void collect(int doc) { |
324 | | - while (next++ < doc) { |
325 | | - builder.appendBoolean(false); |
326 | | - } |
327 | | - builder.appendBoolean(true); |
328 | | - } |
329 | | - |
330 | | - public BooleanVector build() { |
331 | | - return builder.build(); |
332 | | - } |
333 | | - |
334 | | - @Override |
335 | | - public void finish() { |
336 | | - while (next++ <= max) { |
337 | | - builder.appendBoolean(false); |
338 | | - } |
339 | | - } |
340 | | - |
341 | | - @Override |
342 | | - public void close() { |
343 | | - Releasables.closeExpectNoException(builder); |
344 | | - } |
| 62 | + @Override |
| 63 | + protected void appendMatch(BooleanVector.Builder builder, Scorable scorer) throws IOException { |
| 64 | + builder.appendBoolean(true); |
345 | 65 | } |
346 | 66 |
|
347 | | - public static class Factory implements EvalOperator.ExpressionEvaluator.Factory { |
348 | | - private final ShardConfig[] shardConfigs; |
349 | | - |
350 | | - public Factory(ShardConfig[] shardConfigs) { |
351 | | - this.shardConfigs = shardConfigs; |
352 | | - } |
353 | | - |
| 67 | + public record Factory(ShardConfig[] shardConfigs) implements EvalOperator.ExpressionEvaluator.Factory { |
354 | 68 | @Override |
355 | 69 | public EvalOperator.ExpressionEvaluator get(DriverContext context) { |
356 | 70 | return new LuceneQueryExpressionEvaluator(context.blockFactory(), shardConfigs); |
|
0 commit comments