Skip to content

Commit 6fb106d

Browse files
authored
ESQL: Push more ==s on text fields to lucene (backport) (elastic#128156)
If you do: ``` | WHERE text_field == "cat" ``` we can't push to the text field because it's search index is for individual words. But most text fields have a `.keyword` sub field and we *can* query it's index. EXCEPT! It's normal for these fields to have `ignore_above` in their mapping. In that case we don't push to the field. Very sad. With this change we can push down `==`, but only when the right hand side is shorter than the `ignore_above`. This has pretty much infinite speed gain. An example using a million documents: ``` Before: "took" : 391, After: "took" : 4, ``` But this is going from totally un-indexed linear scans to totally indexed. You can make the "Before" number as high as you want by loading more data. Reenables `text ==` pushdown and adds support for `text !=` pushdown. It does so by making `TranslationAware#translatable` return something we can turn into a tri-valued function. It has these values: * `YES` * `NO` * `RECHECK` `YES` means the `Expression` is entirely pushable into Lucene. They will be pushed into Lucene and removed from the plan. `NO` means the `Expression` can't be pushed to Lucene at all and will stay in the plan. `RECHECK` mean the `Expression` can push a query that makes *candidate* matches but must be rechecked. Documents that don't match the query won't match the expression, but documents that match the query might not match the expression. These are pushed to Lucene *and* left in the plan. This is required because `txt != "b"` can build a *candidate* query against the `txt.keyword` subfield but it can't be sure of the match without loading the `_source` - which we do in the compute engine. I haven't plugged rally into this, but here's some basic performance tests: ``` Before: not text eq {"took":460,"documents_found":1000000} text eq {"took":432,"documents_found":1000000} After: text eq {"took":5,"documents_found":1} not text eq {"took":351,"documents_found":800000} ``` This comes from: ``` rm -f /tmp/bulk* for a in {1..1000}; do echo '{"index":{}}' >> /tmp/bulk echo '{"text":"text '$(printf $(($a % 5)))'"}' >> /tmp/bulk done ls -l /tmp/bulk* passwd="redacted" curl -sk -uelastic:$passwd -HContent-Type:application/json -XDELETE https://localhost:9200/test curl -sk -uelastic:$passwd -HContent-Type:application/json -XPUT https://localhost:9200/test -d'{ "settings": { "index.codec": "best_compression", "index.refresh_interval": -1 }, "mappings": { "properties": { "many": { "enabled": false } } } }' for a in {1..1000}; do printf %04d: $a curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST https://localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors done curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST https://localhost:9200/test/_forcemerge?max_num_segments=1 curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST https://localhost:9200/test/_refresh echo curl -sk -uelastic:$passwd https://localhost:9200/_cat/indices?v text_eq() { echo -n " text eq " curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'https://localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE text == \"text 1\" | STATS COUNT(*)", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' } not_text_eq() { echo -n "not text eq " curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'https://localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text == \"text 1\" | STATS COUNT(*)", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' } for a in {1..100}; do text_eq not_text_eq done ```
1 parent 4f048d8 commit 6fb106d

File tree

53 files changed

+1335
-171
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1335
-171
lines changed

docs/changelog/126641.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126641
2+
summary: Push more `==`s on text fields to lucene
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/changelog/127199.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 127199
2+
summary: Disable a bugged commit
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 127197

docs/changelog/127355.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127355
2+
summary: '`text ==` and `text !=` pushdown'
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/changelog/128156.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128156
2+
summary: Push more ==s on text fields to lucene (backport)
3+
area: ES|QL
4+
type: feature
5+
issues: []

server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,21 @@ public boolean canUseSyntheticSourceDelegateForQuerying() {
986986
&& syntheticSourceDelegate.isIndexed();
987987
}
988988

989+
/**
990+
* Returns true if the delegate sub-field can be used for querying only (ie. isIndexed must be true)
991+
*/
992+
public boolean canUseSyntheticSourceDelegateForQueryingEquality(String str) {
993+
if (syntheticSourceDelegate == null
994+
// Can't push equality to an index if there isn't an index
995+
|| syntheticSourceDelegate.isIndexed() == false
996+
// ESQL needs docs values to push equality
997+
|| syntheticSourceDelegate.hasDocValues() == false) {
998+
return false;
999+
}
1000+
// Can't push equality if the field we're checking for is so big we'd ignore it.
1001+
return str.length() <= syntheticSourceDelegate.ignoreAbove();
1002+
}
1003+
9891004
@Override
9901005
public BlockLoader blockLoader(BlockLoaderContext blContext) {
9911006
if (canUseSyntheticSourceDelegateForLoading()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.single_node;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.ResponseException;
16+
import org.elasticsearch.common.collect.Iterators;
17+
import org.elasticsearch.test.ListMatcher;
18+
import org.elasticsearch.test.MapMatcher;
19+
import org.elasticsearch.test.TestClustersThreadFilter;
20+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.elasticsearch.xcontent.XContentType;
23+
import org.elasticsearch.xpack.esql.AssertWarnings;
24+
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
25+
import org.hamcrest.Matcher;
26+
import org.junit.ClassRule;
27+
28+
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.regex.Pattern;
33+
import java.util.stream.Stream;
34+
35+
import static org.elasticsearch.test.ListMatcher.matchesList;
36+
import static org.elasticsearch.test.MapMatcher.assertMap;
37+
import static org.elasticsearch.test.MapMatcher.matchesMap;
38+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap;
39+
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder;
40+
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql;
41+
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile;
42+
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile;
43+
import static org.hamcrest.Matchers.anyOf;
44+
import static org.hamcrest.Matchers.equalTo;
45+
import static org.hamcrest.Matchers.instanceOf;
46+
import static org.hamcrest.Matchers.startsWith;
47+
48+
/**
49+
* Tests for pushing queries to lucene.
50+
*/
51+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
52+
public class PushQueriesIT extends ESRestTestCase {
53+
@ClassRule
54+
public static ElasticsearchCluster cluster = Clusters.testCluster();
55+
56+
@ParametersFactory(argumentFormatting = "%1s")
57+
public static List<Object[]> args() {
58+
return Stream.of("auto", "text", "match_only_text", "semantic_text").map(s -> new Object[] { s }).toList();
59+
}
60+
61+
private final String type;
62+
63+
public PushQueriesIT(String type) {
64+
this.type = type;
65+
}
66+
67+
public void testEquality() throws IOException {
68+
String value = "v".repeat(between(0, 256));
69+
String esqlQuery = """
70+
FROM test
71+
| WHERE test == "%value"
72+
""";
73+
String luceneQuery = switch (type) {
74+
case "text", "auto" -> "#test.keyword:%value -_ignored:test.keyword";
75+
case "match_only_text" -> "*:*";
76+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
77+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
78+
};
79+
boolean filterInCompute = switch (type) {
80+
case "text", "auto" -> false;
81+
case "match_only_text", "semantic_text" -> true;
82+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
83+
};
84+
testPushQuery(value, esqlQuery, List.of(luceneQuery), filterInCompute, true);
85+
}
86+
87+
public void testEqualityTooBigToPush() throws IOException {
88+
String value = "a".repeat(between(257, 1000));
89+
String esqlQuery = """
90+
FROM test
91+
| WHERE test == "%value"
92+
""";
93+
String luceneQuery = switch (type) {
94+
case "text", "auto", "match_only_text" -> "*:*";
95+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
96+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
97+
};
98+
testPushQuery(value, esqlQuery, List.of(luceneQuery), true, true);
99+
}
100+
101+
/**
102+
* Turns into an {@code IN} which isn't currently pushed.
103+
*/
104+
public void testEqualityOrTooBig() throws IOException {
105+
String value = "v".repeat(between(0, 256));
106+
String tooBig = "a".repeat(between(257, 1000));
107+
String esqlQuery = """
108+
FROM test
109+
| WHERE test == "%value" OR test == "%tooBig"
110+
""".replace("%tooBig", tooBig);
111+
String luceneQuery = switch (type) {
112+
case "text", "auto", "match_only_text" -> "*:*";
113+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
114+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
115+
};
116+
testPushQuery(value, esqlQuery, List.of(luceneQuery), true, true);
117+
}
118+
119+
public void testEqualityOrOther() throws IOException {
120+
String value = "v".repeat(between(0, 256));
121+
String esqlQuery = """
122+
FROM test
123+
| WHERE test == "%value" OR foo == 2
124+
""";
125+
String luceneQuery = switch (type) {
126+
case "text", "auto" -> "(#test.keyword:%value -_ignored:test.keyword) foo:[2 TO 2]";
127+
case "match_only_text" -> "*:*";
128+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
129+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
130+
};
131+
boolean filterInCompute = switch (type) {
132+
case "text", "auto" -> false;
133+
case "match_only_text", "semantic_text" -> true;
134+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
135+
};
136+
testPushQuery(value, esqlQuery, List.of(luceneQuery), filterInCompute, true);
137+
}
138+
139+
public void testEqualityAndOther() throws IOException {
140+
String value = "v".repeat(between(0, 256));
141+
String esqlQuery = """
142+
FROM test
143+
| WHERE test == "%value" AND foo == 1
144+
""";
145+
List<String> luceneQueryOptions = switch (type) {
146+
case "text", "auto" -> List.of("#(#test.keyword:%value -_ignored:test.keyword) #foo:[1 TO 1]");
147+
case "match_only_text" -> List.of("foo:[1 TO 1]");
148+
case "semantic_text" ->
149+
/*
150+
* single_value_match is here because there are extra documents hiding in the index
151+
* that don't have the `foo` field.
152+
*/
153+
List.of("#foo:[1 TO 1] #single_value_match(foo)", "foo:[1 TO 1]");
154+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
155+
};
156+
boolean filterInCompute = switch (type) {
157+
case "text", "auto" -> false;
158+
case "match_only_text", "semantic_text" -> true;
159+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
160+
};
161+
testPushQuery(value, esqlQuery, luceneQueryOptions, filterInCompute, true);
162+
}
163+
164+
public void testInequality() throws IOException {
165+
String value = "v".repeat(between(0, 256));
166+
String esqlQuery = """
167+
FROM test
168+
| WHERE test != "%different_value"
169+
""";
170+
String luceneQuery = switch (type) {
171+
case "text", "auto" -> "(-test.keyword:%different_value #*:*) _ignored:test.keyword";
172+
case "match_only_text" -> "*:*";
173+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
174+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
175+
};
176+
testPushQuery(value, esqlQuery, List.of(luceneQuery), true, true);
177+
}
178+
179+
public void testInequalityTooBigToPush() throws IOException {
180+
String value = "a".repeat(between(257, 1000));
181+
String esqlQuery = """
182+
FROM test
183+
| WHERE test != "%value"
184+
""";
185+
String luceneQuery = switch (type) {
186+
case "text", "auto", "match_only_text" -> "*:*";
187+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
188+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
189+
};
190+
testPushQuery(value, esqlQuery, List.of(luceneQuery), true, false);
191+
}
192+
193+
public void testCaseInsensitiveEquality() throws IOException {
194+
String value = "a".repeat(between(0, 256));
195+
String esqlQuery = """
196+
FROM test
197+
| WHERE TO_LOWER(test) == "%value"
198+
""";
199+
String luceneQuery = switch (type) {
200+
case "text", "auto", "match_only_text" -> "*:*";
201+
case "semantic_text" -> "FieldExistsQuery [field=_primary_term]";
202+
default -> throw new UnsupportedOperationException("unknown type [" + type + "]");
203+
};
204+
testPushQuery(value, esqlQuery, List.of(luceneQuery), true, true);
205+
}
206+
207+
private void testPushQuery(String value, String esqlQuery, List<String> luceneQueryOptions, boolean filterInCompute, boolean found)
208+
throws IOException {
209+
indexValue(value);
210+
String differentValue = randomValueOtherThan(value, () -> randomAlphaOfLength(value.isEmpty() ? 1 : value.length()));
211+
212+
String replacedQuery = esqlQuery.replaceAll("%value", value).replaceAll("%different_value", differentValue);
213+
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(replacedQuery + "\n| KEEP test");
214+
builder.profile(true);
215+
Map<String, Object> result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC);
216+
assertResultMap(
217+
result,
218+
getResultMatcher(result).entry("profile", matchesMap().entry("drivers", instanceOf(List.class))),
219+
matchesList().item(matchesMap().entry("name", "test").entry("type", "text")),
220+
equalTo(found ? List.of(List.of(value)) : List.of())
221+
);
222+
Matcher<String> luceneQueryMatcher = anyOf(
223+
() -> Iterators.map(
224+
luceneQueryOptions.iterator(),
225+
(String s) -> equalTo(s.replaceAll("%value", value).replaceAll("%different_value", differentValue))
226+
)
227+
);
228+
229+
@SuppressWarnings("unchecked")
230+
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
231+
for (Map<String, Object> p : profiles) {
232+
fixTypesOnProfile(p);
233+
assertThat(p, commonProfile());
234+
List<String> sig = new ArrayList<>();
235+
@SuppressWarnings("unchecked")
236+
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
237+
for (Map<String, Object> o : operators) {
238+
sig.add(checkOperatorProfile(o, luceneQueryMatcher));
239+
}
240+
String description = p.get("task_description").toString();
241+
switch (description) {
242+
case "data" -> {
243+
ListMatcher matcher = matchesList().item("LuceneSourceOperator").item("ValuesSourceReaderOperator");
244+
if (filterInCompute) {
245+
matcher = matcher.item("FilterOperator").item("LimitOperator");
246+
}
247+
matcher = matcher.item("ProjectOperator").item("ExchangeSinkOperator");
248+
assertMap(sig, matcher);
249+
}
250+
case "node_reduce" -> {
251+
if (sig.contains("LimitOperator")) {
252+
// TODO figure out why this is sometimes here and sometimes not
253+
assertMap(sig, matchesList().item("ExchangeSourceOperator").item("LimitOperator").item("ExchangeSinkOperator"));
254+
} else {
255+
assertMap(sig, matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"));
256+
}
257+
}
258+
case "final" -> assertMap(
259+
sig,
260+
matchesList().item("ExchangeSourceOperator").item("LimitOperator").item("ProjectOperator").item("OutputOperator")
261+
);
262+
default -> throw new IllegalArgumentException("can't match " + description);
263+
}
264+
}
265+
}
266+
267+
private void indexValue(String value) throws IOException {
268+
try {
269+
// Delete the index if it has already been created.
270+
client().performRequest(new Request("DELETE", "test"));
271+
} catch (ResponseException e) {
272+
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
273+
throw e;
274+
}
275+
}
276+
277+
Request createIndex = new Request("PUT", "test");
278+
String json = """
279+
{
280+
"settings": {
281+
"index": {
282+
"number_of_shards": 1
283+
}
284+
}""";
285+
if (false == "auto".equals(type)) {
286+
json += """
287+
,
288+
"mappings": {
289+
"properties": {
290+
"test": {
291+
"type": "%type",
292+
"fields": {
293+
"keyword": {
294+
"type": "keyword",
295+
"ignore_above": 256
296+
}
297+
}
298+
}
299+
}
300+
}
301+
}""".replace("%type", type);
302+
}
303+
json += "}";
304+
createIndex.setJsonEntity(json);
305+
Response createResponse = client().performRequest(createIndex);
306+
assertThat(
307+
entityToMap(createResponse.getEntity(), XContentType.JSON),
308+
matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true)
309+
);
310+
311+
Request bulk = new Request("POST", "/_bulk");
312+
bulk.addParameter("refresh", "");
313+
bulk.setJsonEntity(String.format("""
314+
{"create":{"_index":"test"}}
315+
{"test":"%s","foo":1}
316+
""", value));
317+
Response bulkResponse = client().performRequest(bulk);
318+
assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk());
319+
}
320+
321+
private static final Pattern TO_NAME = Pattern.compile("\\[.+", Pattern.DOTALL);
322+
323+
private static String checkOperatorProfile(Map<String, Object> o, Matcher<String> query) {
324+
String name = (String) o.get("operator");
325+
name = TO_NAME.matcher(name).replaceAll("");
326+
if (name.equals("LuceneSourceOperator")) {
327+
MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name))
328+
.entry("status", matchesMap().entry("processed_queries", List.of(query)).extraOk());
329+
assertMap(o, expectedOp);
330+
}
331+
return name;
332+
}
333+
334+
@Override
335+
protected String getTestRestCluster() {
336+
return cluster.getHttpAddresses();
337+
}
338+
339+
@Override
340+
protected boolean preserveClusterUponCompletion() {
341+
// Preserve the cluser to speed up the semantic_text tests
342+
return true;
343+
}
344+
}

0 commit comments

Comments
 (0)