Skip to content

Commit 6abba22

Browse files
committed
different collectors based on sorting
1 parent 1230eb5 commit 6abba22

File tree

3 files changed

+59
-29
lines changed

3 files changed

+59
-29
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.apache.lucene.search.ScoreDoc;
1717
import org.apache.lucene.search.ScoreMode;
1818
import org.apache.lucene.search.TopDocs;
19-
import org.apache.lucene.search.TopFieldCollector;
19+
import org.apache.lucene.search.TopDocsCollector;
2020
import org.apache.lucene.search.TopFieldCollectorManager;
2121
import org.elasticsearch.common.Strings;
2222
import org.elasticsearch.compute.data.BlockFactory;
@@ -176,7 +176,7 @@ private Page emit(boolean startEmitting) {
176176
assert isEmitting() == false : "offset=" + offset + " score_docs=" + Arrays.toString(scoreDocs);
177177
offset = 0;
178178
if (perShardCollector != null) {
179-
scoreDocs = perShardCollector.getTopDocs().scoreDocs;
179+
scoreDocs = perShardCollector.collector.topDocs().scoreDocs;
180180
} else {
181181
scoreDocs = new ScoreDoc[0];
182182
}
@@ -248,7 +248,7 @@ PerShardCollector newPerShardCollector(ShardContext shardContext, List<SortBuild
248248

249249
static class PerShardCollector {
250250
protected ShardContext shardContext;
251-
protected TopFieldCollector collector;
251+
protected TopDocsCollector<?> collector;
252252
private int leafIndex;
253253
private LeafCollector leafCollector;
254254
private Thread currentThread;
@@ -268,19 +268,12 @@ static class PerShardCollector {
268268

269269
LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOException {
270270
if (currentThread != Thread.currentThread() || leafIndex != leafReaderContext.ord) {
271-
leafCollector = getCollector().getLeafCollector(leafReaderContext);
271+
leafCollector = collector.getLeafCollector(leafReaderContext);
272272
leafIndex = leafReaderContext.ord;
273273
currentThread = Thread.currentThread();
274274
}
275275
return leafCollector;
276276
}
277277

278-
Collector getCollector() {
279-
return collector;
280-
}
281-
282-
TopDocs getTopDocs() {
283-
return collector.topDocs();
284-
}
285278
}
286279
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ScoringLuceneTopNSourceOperator.java

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,19 @@
77

88
package org.elasticsearch.compute.lucene;
99

10-
import org.apache.lucene.search.Collector;
10+
import org.apache.lucene.index.LeafReaderContext;
1111
import org.apache.lucene.search.FieldDoc;
12+
import org.apache.lucene.search.LeafCollector;
1213
import org.apache.lucene.search.Query;
14+
import org.apache.lucene.search.Scorable;
1315
import org.apache.lucene.search.ScoreDoc;
1416
import org.apache.lucene.search.ScoreMode;
1517
import org.apache.lucene.search.Sort;
1618
import org.apache.lucene.search.SortField;
17-
import org.apache.lucene.search.TopDocs;
18-
import org.apache.lucene.search.TopFieldCollector;
19+
import org.apache.lucene.search.TopDocsCollector;
1920
import org.apache.lucene.search.TopFieldCollectorManager;
21+
import org.apache.lucene.search.TopScoreDocCollectorManager;
22+
import org.apache.lucene.util.PriorityQueue;
2023
import org.elasticsearch.common.Strings;
2124
import org.elasticsearch.compute.data.Block;
2225
import org.elasticsearch.compute.data.BlockFactory;
@@ -103,15 +106,18 @@ protected Page maybeAppendScore(Page page, DoubleVector.Builder currentScoresBui
103106
}
104107

105108
float getScore(ScoreDoc scoreDoc) {
106-
FieldDoc fieldDoc = (FieldDoc) scoreDoc;
107-
if (Float.isNaN(fieldDoc.score)) {
108-
if (sorts != null) {
109-
return (Float) fieldDoc.fields[sorts.size()];
109+
if (scoreDoc instanceof FieldDoc fieldDoc) {
110+
if (Float.isNaN(fieldDoc.score)) {
111+
if (sorts != null) {
112+
return (Float) fieldDoc.fields[sorts.size()];
113+
} else {
114+
return (Float) fieldDoc.fields[0];
115+
}
110116
} else {
111-
return (Float) fieldDoc.fields[0];
117+
return fieldDoc.score;
112118
}
113119
} else {
114-
return fieldDoc.score;
120+
return scoreDoc.score;
115121
}
116122
}
117123

@@ -124,32 +130,62 @@ PerShardCollector newPerShardCollector(ShardContext shardContext, List<SortBuild
124130
l.add(SortField.FIELD_SCORE);
125131
sort = new Sort(l.toArray(SortField[]::new));
126132
} else {
127-
sort = new Sort();
133+
sort = null;
128134
}
129135
return new ScoringPerShardCollector(shardContext, sort, limit);
130136
}
131137

132138
static class ScoringPerShardCollector extends PerShardCollector {
133139

134140
// TODO : make this configurable / inferrable?
135-
private final TopFieldCollector collector;
136141
private static final int MAX_HITS = 100_000;
137142
private static final int TOTAL_HITS_THRESHOLD = 100;
138143

139144
ScoringPerShardCollector(ShardContext shardContext, Sort sort, int limit) {
140145
this.shardContext = shardContext;
141-
this.collector = new TopFieldCollectorManager(sort, Math.min(limit, MAX_HITS), TOTAL_HITS_THRESHOLD).newCollector();
142-
// TODO : use TopScoreDocCollectorManager when SORT _score DESC
146+
if (sort == null) {
147+
this.collector = new UnsortedScoreCollector(new PriorityQueue<>(Math.min(limit, MAX_HITS)) {
148+
@Override
149+
protected boolean lessThan(ScoreDoc a, ScoreDoc b) {
150+
return a.doc > b.doc;
151+
}
152+
});
153+
} else if (sort.needsScores()) {
154+
this.collector = new TopScoreDocCollectorManager(Math.min(limit, MAX_HITS), TOTAL_HITS_THRESHOLD).newCollector();
155+
} else {
156+
this.collector = new TopFieldCollectorManager(sort, Math.min(limit, MAX_HITS), TOTAL_HITS_THRESHOLD).newCollector();
157+
}
158+
}
159+
}
160+
161+
private static class UnsortedScoreCollector extends TopDocsCollector<ScoreDoc> {
162+
163+
protected UnsortedScoreCollector(PriorityQueue<ScoreDoc> pq) {
164+
super(pq);
143165
}
144166

145167
@Override
146-
Collector getCollector() {
147-
return collector;
168+
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
169+
return new LeafCollector() {
170+
private Scorable scorable;
171+
172+
@Override
173+
public void setScorer(Scorable scorable) {
174+
this.scorable = scorable;
175+
}
176+
177+
@Override
178+
public void collect(int docID) throws IOException {
179+
float score = scorable.score();
180+
pq.add(new ScoreDoc(docID, score));
181+
totalHits++;
182+
}
183+
};
148184
}
149185

150186
@Override
151-
TopDocs getTopDocs() {
152-
return collector.topDocs();
187+
public ScoreMode scoreMode() {
188+
return ScoreMode.COMPLETE;
153189
}
154190
}
155191
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1617,7 +1617,8 @@ public void testScoreFarMatchClauses() {
16171617
assertEquals(
16181618
"1:105: `_score` manipulation between fulltext expressions",
16191619
error(
1620-
"from foo metadata _score | where first_name match \"a\" | eval fs = _score | where first_name match \"b\" | keep fs, _score"
1620+
"from foo metadata _score | where first_name match \"a\" "
1621+
+ "| eval fs = _score | where first_name match \"b\" | keep fs, _score"
16211622
)
16221623
);
16231624
}

0 commit comments

Comments
 (0)