Skip to content

Commit 7f82362

Browse files
Add more UTs
1 parent abfe672 commit 7f82362

File tree

4 files changed

+109
-42
lines changed

4 files changed

+109
-42
lines changed

docs/changelog/132889.yaml

Lines changed: 0 additions & 6 deletions
This file was deleted.

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -698,26 +698,43 @@ private Map<String, Object> fetchMvLongs() throws IOException {
698698
public void testLookupExplosion() throws IOException {
699699
int sensorDataCount = 400;
700700
int lookupEntries = 10000;
701-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
701+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
702702
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
703703
}
704704

705705
public void testLookupExplosionManyFields() throws IOException {
706706
int sensorDataCount = 400;
707707
int lookupEntries = 1000;
708708
int joinFieldsCount = 990;
709-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
709+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
710710
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
711711
}
712712

713713
public void testLookupExplosionManyMatchesManyFields() throws IOException {
714714
// 1500, 10000 is enough locally, but some CI machines need more.
715-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
715+
int lookupEntries = 10000;
716+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
716717
}
717718

718719
public void testLookupExplosionManyMatches() throws IOException {
719720
// 1500, 10000 is enough locally, but some CI machines need more.
720-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
721+
int lookupEntries = 10000;
722+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
723+
}
724+
725+
public void testLookupExplosionManyMatchesFiltered() throws IOException {
726+
// This test will only work with the expanding join optimization
727+
// that pushes the filter to the right side of the lookup.
728+
// Without the optimization, it will fail with circuit_breaking_exception
729+
// lookupEntries % reductionFactor must be 0 to ensure that the number of matches is reduced
730+
int sensorDataCount = 10000;
731+
int lookupEntries = 10000;
732+
int reductionFactor = 1000; // reduce the number of matches by this factor
733+
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
734+
assertTrue(0 == lookupEntries % reductionFactor);
735+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
736+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));
737+
721738
}
722739

723740
public void testLookupExplosionNoFetch() throws IOException {
@@ -744,7 +761,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
744761
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
745762
}
746763

747-
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
764+
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
765+
throws IOException {
748766
try {
749767
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
750768
StringBuilder query = startQuery();
@@ -755,7 +773,10 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
755773
}
756774
query.append("id").append(i);
757775
}
758-
query.append(" | STATS COUNT(location)\"}");
776+
if (lookupEntries != lookupEntriesToKeep) {
777+
query.append(" | WHERE filter_key < ").append(lookupEntriesToKeep);
778+
}
779+
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
759780
return responseAsMap(query(query.toString(), null));
760781
} finally {
761782
deleteIndex("sensor_data");
@@ -1038,7 +1059,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
10381059
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
10391060
}
10401061
createIndexBuilder.append("""
1041-
"location": { "type": "geo_point" }
1062+
"location": { "type": "geo_point" },
1063+
"filter_key": { "type": "integer" }
10421064
}
10431065
}""");
10441066
CreateIndexResponse response = createIndex(
@@ -1058,7 +1080,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
10581080
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
10591081
}
10601082
data.append(String.format(Locale.ROOT, """
1061-
"location": "POINT(%s)"}\n""", location.apply(sensor)));
1083+
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
10621084
if (i % docsPerBulk == docsPerBulk - 1) {
10631085
bulk("sensor_lookup", data.toString());
10641086
data.setLength(0);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.compute.data.Block;
2424
import org.elasticsearch.compute.data.BlockFactory;
2525
import org.elasticsearch.compute.data.LongBlock;
26-
import org.elasticsearch.compute.data.LongVector;
2726
import org.elasticsearch.compute.lucene.DataPartitioning;
2827
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
2928
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
@@ -54,12 +53,15 @@
5453
import org.elasticsearch.threadpool.ThreadPool;
5554
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
5655
import org.elasticsearch.xpack.esql.core.expression.Alias;
56+
import org.elasticsearch.xpack.esql.core.expression.Expression;
5757
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
58+
import org.elasticsearch.xpack.esql.core.expression.Literal;
5859
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
5960
import org.elasticsearch.xpack.esql.core.tree.Source;
6061
import org.elasticsearch.xpack.esql.core.type.DataType;
6162
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
6263
import org.elasticsearch.xpack.esql.enrich.MatchConfig;
64+
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan;
6365
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
6466
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
6567
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -75,6 +77,7 @@
7577
import java.util.Map;
7678
import java.util.Set;
7779
import java.util.concurrent.CopyOnWriteArrayList;
80+
import java.util.function.Predicate;
7881

7982
import static org.elasticsearch.test.ListMatcher.matchesList;
8083
import static org.elasticsearch.test.MapMatcher.assertMap;
@@ -83,13 +86,14 @@
8386

8487
public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
8588
public void testKeywordKey() throws IOException {
86-
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }));
89+
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }), null);
8790
}
8891

8992
public void testJoinOnTwoKeys() throws IOException {
9093
runLookup(
9194
List.of(DataType.KEYWORD, DataType.LONG),
92-
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } })
95+
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } }),
96+
null
9397
);
9498
}
9599

@@ -101,7 +105,8 @@ public void testJoinOnThreeKeys() throws IOException {
101105
new String[] { "aa", "bb", "cc", "dd" },
102106
new Long[] { 12L, 33L, 1L, 42L },
103107
new String[] { "one", "two", "three", "four" }, }
104-
)
108+
),
109+
null
105110
);
106111
}
107112

@@ -114,25 +119,35 @@ public void testJoinOnFourKeys() throws IOException {
114119
new Long[] { 12L, 33L, 1L, 42L },
115120
new String[] { "one", "two", "three", "four" },
116121
new Integer[] { 1, 2, 3, 4 }, }
117-
)
122+
),
123+
buildGreaterThanFilter(1L)
118124
);
119125
}
120126

121127
public void testLongKey() throws IOException {
122-
runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }));
128+
runLookup(
129+
List.of(DataType.LONG),
130+
new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }),
131+
buildGreaterThanFilter(0L)
132+
);
123133
}
124134

125135
/**
126136
* LOOKUP multiple results match.
127137
*/
128138
public void testLookupIndexMultiResults() throws IOException {
129-
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }));
139+
runLookup(
140+
List.of(DataType.KEYWORD),
141+
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }),
142+
buildGreaterThanFilter(-1L)
143+
);
130144
}
131145

132146
public void testJoinOnTwoKeysMultiResults() throws IOException {
133147
runLookup(
134148
List.of(DataType.KEYWORD, DataType.LONG),
135-
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } })
149+
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } }),
150+
null
136151
);
137152
}
138153

@@ -144,12 +159,13 @@ public void testJoinOnThreeKeysMultiResults() throws IOException {
144159
new String[] { "aa", "bb", "bb", "dd" },
145160
new Long[] { 12L, 1L, 1L, 42L },
146161
new String[] { "one", "two", "two", "four" } }
147-
)
162+
),
163+
null
148164
);
149165
}
150166

151167
interface PopulateIndices {
152-
void populate(int docCount, List<String> expected) throws IOException;
168+
void populate(int docCount, List<String> expected, Predicate<Integer> filter) throws IOException;
153169
}
154170

155171
class UsingSingleLookupTable implements PopulateIndices {
@@ -171,7 +187,7 @@ class UsingSingleLookupTable implements PopulateIndices {
171187
}
172188

173189
@Override
174-
public void populate(int docCount, List<String> expected) {
190+
public void populate(int docCount, List<String> expected, Predicate<Integer> filter) {
175191
List<IndexRequestBuilder> docs = new ArrayList<>();
176192
int numFields = lookupData.length;
177193
int numRows = lookupData[0].length;
@@ -190,8 +206,13 @@ public void populate(int docCount, List<String> expected) {
190206
} else {
191207
keyString = String.join(",", key.stream().map(String::valueOf).toArray(String[]::new));
192208
}
193-
for (Integer match : matches.get(key)) {
194-
expected.add(keyString + ":" + match);
209+
List<Integer> filteredMatches = matches.get(key).stream().filter(filter).toList();
210+
if (filteredMatches.isEmpty()) {
211+
expected.add(keyString + ":null");
212+
} else {
213+
for (Integer match : filteredMatches) {
214+
expected.add(keyString + ":" + match);
215+
}
195216
}
196217
}
197218
for (int i = 0; i < numRows; i++) {
@@ -207,7 +228,21 @@ public void populate(int docCount, List<String> expected) {
207228
}
208229
}
209230

210-
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices) throws IOException {
231+
Expression buildGreaterThanFilter(long value) {
232+
FieldAttribute filterAttribute = new FieldAttribute(
233+
Source.EMPTY,
234+
"l",
235+
new org.elasticsearch.xpack.esql.core.type.EsField(
236+
"l",
237+
org.elasticsearch.xpack.esql.core.type.DataType.LONG,
238+
java.util.Collections.emptyMap(),
239+
true
240+
)
241+
);
242+
return new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
243+
}
244+
245+
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, Expression filter) throws IOException {
211246
String[] fieldMappers = new String[keyTypes.size() * 2];
212247
for (int i = 0; i < keyTypes.size(); i++) {
213248
fieldMappers[2 * i] = "key" + i;
@@ -236,9 +271,22 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
236271

237272
client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();
238273

274+
Predicate<Integer> filterPredicate = l -> true;
275+
if (filter != null) {
276+
if (filter instanceof GreaterThan gt
277+
&& gt.left() instanceof FieldAttribute fa
278+
&& fa.name().equals("l")
279+
&& gt.right() instanceof Literal lit) {
280+
long value = ((Number) lit.value()).longValue();
281+
filterPredicate = l -> l > value;
282+
} else {
283+
fail("Unsupported filter type in test baseline generation: " + filter);
284+
}
285+
}
286+
239287
int docCount = between(10, 1000);
240288
List<String> expected = new ArrayList<>(docCount);
241-
populateIndices.populate(docCount, expected);
289+
populateIndices.populate(docCount, expected, filterPredicate);
242290

243291
/*
244292
* Find the data node hosting the only shard of the source index.
@@ -330,7 +378,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
330378
"lookup",
331379
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
332380
Source.EMPTY,
333-
null
381+
filter
334382
);
335383
DriverContext driverContext = driverContext();
336384
try (
@@ -344,7 +392,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
344392
for (int i = 0; i < keyTypes.size(); i++) {
345393
keyBlocks.add(page.getBlock(i + 1));
346394
}
347-
LongVector loadedBlock = page.<LongBlock>getBlock(keyTypes.size() + 1).asVector();
395+
LongBlock loadedBlock = page.<LongBlock>getBlock(keyTypes.size() + 1);
348396
for (int p = 0; p < page.getPositionCount(); p++) {
349397
StringBuilder result = new StringBuilder();
350398
for (int j = 0; j < keyBlocks.size(); j++) {
@@ -360,7 +408,11 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
360408
}
361409

362410
}
363-
result.append(":" + loadedBlock.getLong(p));
411+
if (loadedBlock.isNull(p)) {
412+
result.append(":null");
413+
} else {
414+
result.append(":" + loadedBlock.getLong(loadedBlock.getFirstValueIndex(p)));
415+
}
364416
results.add(result.toString());
365417
}
366418
} finally {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,18 +173,17 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
173173
// So we end up applying the right filters twice, once on the right side and once on top of the join
174174
// This will result in major performance optimization when the lookup join is expanding
175175
// and applying the right filters reduces the expansion significantly.
176-
// For example, consider a lookup join where the right side is a 1Bln rows index with the value join value of 1.
177-
// We have 10 rows on the left side with the value join value of 1.
178-
// and there is a filter on the right side that filters out all rows on another column
179-
// If we push the filter down to the right side, we will have 10 rows after the join (there were no matches)
180-
// If we do not push the filter down to the right side, we will have 10 * 1Bln rows after the join (all rows matched)
181-
// as the join is expanding.
182-
// They would be filtered out in the next operator, but it is too late, as we already expanded the join
183-
// In other cases, we might not get any performance benefit of this optimization,
184-
// especially when the selectivity of the filter pushed down is very high or the join is not expanding.
176+
// For example, consider an expanding lookup join of 100,000 rows table with 10,000 lookup table
177+
// with filter of selectivity 0.1% on the right side(keeps 10 out of 10,000 rows of the lookup table).
178+
// In the non-optimized version the filter is not pushed to the right, and we get an explosion of records.
179+
// We have 100,000 x10,000 = 1,000,000,000 rows after the join without the optimization.
180+
// Then we filter then out to only 1,000,000 rows.
181+
// With the optimization we apply the filter early so after the expanding join we only have 1,000,000 rows.
182+
// This reduced max number of rows used by a factor of 1,000
185183

186184
// In the future, once we have inner join support, it is usually possible to convert the lookup join into an inner join
187-
// and then we don't need to reapply the filters on top of the join.
185+
// This would allow us to not reapply the filters pushed to the right side again above the join,
186+
// as the inner join would only return rows that match on both sides.
188187
}
189188
if (optimizationApplied) {
190189
// if we pushed down some filters, we need to update the filters to reapply above the join

0 commit comments

Comments
 (0)