Skip to content

Commit 1b151ed

Browse files
authored
ESQL: Compute engine support for tagged queries (#128521)
Begins adding support for running "tagged queries" to the compute engine. Here, it's just the `LuceneSourceOperator` because that's useful and contained. Example time! Say you are running: ``` FROM foo | STATS MAX(v) BY ROUND_TO(g, 0, 100, 1000, 100000) ``` It's *often* faster to run this as four queries: * The docs that round to `0` * The docs that round to `100` * The docs that round to `1000` * The docs that round to `100000` This creates an ESQL operator that can run these queries, one after the other and attach those tags. Aggs uses this trick and it's *way* faster when it can push down count queries, but it's still faster when it pushes doc loading things. This implementation in `LuceneSourceOperator` is quite similar to the doc loading version in _search. I don't have performance measurements yet because I haven't plugged this into the language. In _search we call this `filter-by-filter` and enable it when each group averages to more than 5000 documents and when there isn't an `_doc_count` field. It's faster in those cases not to push. I expect we'll be pretty similar.
1 parent bf0dc6e commit 1b151ed

27 files changed

+431
-118
lines changed

docs/reference/query-languages/esql/_snippets/functions/appendix/values.md

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/query-languages/esql/kibana/definition/functions/values.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/query-languages/esql/kibana/docs/functions/values.md

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.lucene.search.DocIdStream;
1111
import org.apache.lucene.search.LeafCollector;
12-
import org.apache.lucene.search.Query;
1312
import org.apache.lucene.search.Scorable;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.search.Weight;
@@ -44,7 +43,7 @@ public static class Factory extends LuceneOperator.Factory {
4443

4544
public Factory(
4645
List<? extends ShardContext> contexts,
47-
Function<ShardContext, Query> queryFunction,
46+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
4847
DataPartitioning dataPartitioning,
4948
int taskConcurrency,
5049
int limit
@@ -121,6 +120,9 @@ protected Page getCheckedOutput() throws IOException {
121120
if (scorer == null) {
122121
remainingDocs = 0;
123122
} else {
123+
if (scorer.tags().isEmpty() == false) {
124+
throw new UnsupportedOperationException("tags not supported by " + getClass());
125+
}
124126
Weight weight = scorer.weight();
125127
var leafReaderContext = scorer.leafReaderContext();
126128
// see org.apache.lucene.search.TotalHitCountCollector

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.NumericDocValues;
1111
import org.apache.lucene.index.PointValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
13-
import org.apache.lucene.search.Query;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.NumericUtils;
1615
import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {
114113

115114
public LuceneMaxFactory(
116115
List<? extends ShardContext> contexts,
117-
Function<ShardContext, Query> queryFunction,
116+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
118117
DataPartitioning dataPartitioning,
119118
int taskConcurrency,
120119
String fieldName,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.NumericDocValues;
1111
import org.apache.lucene.index.PointValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
13-
import org.apache.lucene.search.Query;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.NumericUtils;
1615
import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {
114113

115114
public LuceneMinFactory(
116115
List<? extends ShardContext> contexts,
117-
Function<ShardContext, Query> queryFunction,
116+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
118117
DataPartitioning dataPartitioning,
119118
int taskConcurrency,
120119
String fieldName,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ public Page getCheckedOutput() throws IOException {
102102
if (scorer == null) {
103103
remainingDocs = 0;
104104
} else {
105+
if (scorer.tags().isEmpty() == false) {
106+
throw new UnsupportedOperationException("tags not supported by " + getClass());
107+
}
105108
final LeafReader reader = scorer.leafReaderContext().reader();
106109
final Query query = scorer.weight().getQuery();
107110
if (query == null || query instanceof MatchAllDocsQuery) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
9797
*/
9898
protected Factory(
9999
List<? extends ShardContext> contexts,
100-
Function<ShardContext, Query> queryFunction,
100+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
101101
DataPartitioning dataPartitioning,
102102
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
103103
int taskConcurrency,
@@ -155,10 +155,13 @@ LuceneScorer getCurrentOrLoadNextScorer() {
155155
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
156156
logger.trace("Starting {}", partialLeaf);
157157
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
158-
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
158+
if (currentScorer == null // First time
159+
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
160+
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
161+
) {
159162
final Weight weight = currentSlice.weight();
160163
processedQueries.add(weight.getQuery());
161-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf);
164+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
162165
}
163166
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
164167
currentScorer.maxPosition = partialLeaf.maxDoc();
@@ -177,15 +180,17 @@ static final class LuceneScorer {
177180
private final ShardContext shardContext;
178181
private final Weight weight;
179182
private final LeafReaderContext leafReaderContext;
183+
private final List<Object> tags;
180184

181185
private BulkScorer bulkScorer;
182186
private int position;
183187
private int maxPosition;
184188
private Thread executingThread;
185189

186-
LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) {
190+
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
187191
this.shardContext = shardContext;
188192
this.weight = weight;
193+
this.tags = tags;
189194
this.leafReaderContext = leafReaderContext;
190195
reinitialize();
191196
}
@@ -230,6 +235,13 @@ Weight weight() {
230235
int position() {
231236
return position;
232237
}
238+
239+
/**
240+
* Tags to add to the data returned by this query.
241+
*/
242+
List<Object> tags() {
243+
return tags;
244+
}
233245
}
234246

235247
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
/**
1515
* Holds a list of multiple partial Lucene segments
1616
*/
17-
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight) {
17+
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
1818
int numLeaves() {
1919
return leaves.size();
2020
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,47 @@
3232

3333
/**
3434
* Shared Lucene slices between Lucene operators.
35+
* <p>
36+
* Each shard is {@link #create built} with a list of queries to run and
37+
* tags to add to the queries ({@code List<QueryAndTags>}). Some examples:
38+
* </p>
39+
* <ul>
40+
* <li>
41+
* For queries like {@code FROM foo} we'll use a one element list
42+
* containing {@code match_all, []}. It loads all documents in the
43+
* index and append no extra fields to the loaded documents.
44+
* </li>
45+
* <li>
46+
* For queries like {@code FROM foo | WHERE a > 10} we'll use a one
47+
* element list containing {@code +single_value(a) +(a > 10), []}.
48+
* It loads all documents where {@code a} is single valued and
49+
* greater than 10.
50+
* </li>
51+
* <li>
52+
* For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)}
53+
* we'll use a two element list containing
54+
* <ul>
55+
* <li>{@code +single_value(a) +(a < 100), [0]}</li>
56+
* <li>{@code +single_value(a) +(a >= 100), [100]}</li>
57+
* </ul>
58+
* It loads all documents in the index where {@code a} is single
59+
* valued and adds a constant {@code 0} to the documents where
60+
* {@code a < 100} and the constant {@code 100} to the documents
61+
* where {@code a >= 100}.
62+
* </li>
63+
* </ul>
64+
* <p>
65+
* IMPORTANT: Runners make no effort to deduplicate the results from multiple
66+
* queries. If you need to only see each document one time then make sure the
67+
* queries are mutually exclusive.
68+
* </p>
3569
*/
3670
public final class LuceneSliceQueue {
71+
/**
72+
* Query to run and tags to add to the results.
73+
*/
74+
public record QueryAndTags(Query query, List<Object> tags) {}
75+
3776
public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
3877
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
3978

@@ -69,7 +108,7 @@ public Collection<String> remainingShardsIdentifiers() {
69108

70109
public static LuceneSliceQueue create(
71110
List<? extends ShardContext> contexts,
72-
Function<ShardContext, Query> queryFunction,
111+
Function<ShardContext, List<QueryAndTags>> queryFunction,
73112
DataPartitioning dataPartitioning,
74113
Function<Query, PartitioningStrategy> autoStrategy,
75114
int taskConcurrency,
@@ -78,27 +117,29 @@ public static LuceneSliceQueue create(
78117
List<LuceneSlice> slices = new ArrayList<>();
79118
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
80119
for (ShardContext ctx : contexts) {
81-
Query query = queryFunction.apply(ctx);
82-
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
83-
/*
84-
* Rewrite the query on the local index so things like fully
85-
* overlapping range queries become match all. It's important
86-
* to do this before picking the partitioning strategy so we
87-
* can pick more aggressive strategies when the query rewrites
88-
* into MatchAll.
89-
*/
90-
try {
91-
query = ctx.searcher().rewrite(query);
92-
} catch (IOException e) {
93-
throw new UncheckedIOException(e);
94-
}
95-
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
96-
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
97-
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
98-
Weight weight = weight(ctx, query, scoreMode);
99-
for (List<PartialLeafReaderContext> group : groups) {
100-
if (group.isEmpty() == false) {
101-
slices.add(new LuceneSlice(ctx, group, weight));
120+
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
121+
Query query = queryAndExtra.query;
122+
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
123+
/*
124+
* Rewrite the query on the local index so things like fully
125+
* overlapping range queries become match all. It's important
126+
* to do this before picking the partitioning strategy so we
127+
* can pick more aggressive strategies when the query rewrites
128+
* into MatchAll.
129+
*/
130+
try {
131+
query = ctx.searcher().rewrite(query);
132+
} catch (IOException e) {
133+
throw new UncheckedIOException(e);
134+
}
135+
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
136+
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
137+
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
138+
Weight weight = weight(ctx, query, scoreMode);
139+
for (List<PartialLeafReaderContext> group : groups) {
140+
if (group.isEmpty() == false) {
141+
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
142+
}
102143
}
103144
}
104145
}

0 commit comments

Comments
 (0)