Skip to content

Commit b5120c5

Browse files
Improve Expanding Lookup Join performance by pushing a filter to the right side of the lookup join (#133166)
Improve Expanding Lookup Join performance by pushing a filter to the right side of the lookup join.
1 parent dfe555c commit b5120c5

File tree

26 files changed

+1560
-196
lines changed

26 files changed

+1560
-196
lines changed

docs/changelog/133166.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133166
2+
summary: Improve Expanding Lookup Join performance by pushing a filter to the right
3+
side of the lookup join
4+
area: ES|QL
5+
type: enhancement
6+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ static TransportVersion def(int id) {
357357
public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_148_0_00);
358358
public static final TransportVersion RESOLVE_INDEX_MODE_FILTER = def(9_149_0_00);
359359
public static final TransportVersion SEMANTIC_QUERY_MULTIPLE_INFERENCE_IDS = def(9_150_0_00);
360+
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_151_0_00);
360361

361362
/*
362363
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ public Query regexpQuery(
387387
}
388388

389389
/**
390-
* Returns a Lucine pushable Query for the current field
390+
* Returns a Lucene pushable Query for the current field
391391
* For now can only be AutomatonQuery or MatchAllDocsQuery() or MatchNoDocsQuery()
392392
*/
393393
public Query automatonQuery(

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -698,26 +698,42 @@ private Map<String, Object> fetchMvLongs() throws IOException {
698698
public void testLookupExplosion() throws IOException {
699699
int sensorDataCount = 400;
700700
int lookupEntries = 10000;
701-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
701+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
702702
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
703703
}
704704

705705
public void testLookupExplosionManyFields() throws IOException {
706706
int sensorDataCount = 400;
707707
int lookupEntries = 1000;
708708
int joinFieldsCount = 990;
709-
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
709+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
710710
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
711711
}
712712

713713
public void testLookupExplosionManyMatchesManyFields() throws IOException {
714714
// 1500, 10000 is enough locally, but some CI machines need more.
715-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
715+
int lookupEntries = 10000;
716+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
716717
}
717718

718719
public void testLookupExplosionManyMatches() throws IOException {
719720
// 1500, 10000 is enough locally, but some CI machines need more.
720-
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
721+
int lookupEntries = 10000;
722+
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
723+
}
724+
725+
public void testLookupExplosionManyMatchesFiltered() throws IOException {
726+
// This test will only work with the expanding join optimization
727+
// that pushes the filter to the right side of the lookup.
728+
// Without the optimization, it will fail with circuit_breaking_exception
729+
int sensorDataCount = 10000;
730+
int lookupEntries = 10000;
731+
int reductionFactor = 1000; // reduce the number of matches by this factor
732+
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
733+
assertTrue(0 == lookupEntries % reductionFactor);
734+
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
735+
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));
736+
721737
}
722738

723739
public void testLookupExplosionNoFetch() throws IOException {
@@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
744760
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
745761
}
746762

747-
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
763+
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
764+
throws IOException {
748765
try {
749766
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
750767
StringBuilder query = startQuery();
@@ -755,7 +772,14 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
755772
}
756773
query.append("id").append(i);
757774
}
758-
query.append(" | STATS COUNT(location)\"}");
775+
if (lookupEntries != lookupEntriesToKeep) {
776+
// add a filter to reduce the number of matches
777+
// we add both a Lucene pushable filter and a non-pushable filter
778+
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
779+
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);
780+
781+
}
782+
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
759783
return responseAsMap(query(query.toString(), null));
760784
} finally {
761785
deleteIndex("sensor_data");
@@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
10381062
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
10391063
}
10401064
createIndexBuilder.append("""
1041-
"location": { "type": "geo_point" }
1065+
"location": { "type": "geo_point" },
1066+
"filter_key": { "type": "integer" }
10421067
}
10431068
}""");
10441069
CreateIndexResponse response = createIndex(
@@ -1058,7 +1083,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
10581083
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
10591084
}
10601085
data.append(String.format(Locale.ROOT, """
1061-
"location": "POINT(%s)"}\n""", location.apply(sensor)));
1086+
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
10621087
if (i % docsPerBulk == docsPerBulk - 1) {
10631088
bulk("sensor_lookup", data.toString());
10641089
data.setLength(0);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected final Operator.OperatorFactory simple() {
8787
/**
8888
* Makes sure the description of {@link #simple} matches the {@link #expectedDescriptionOfSimple}.
8989
*/
90-
public final void testSimpleDescription() {
90+
public void testSimpleDescription() {
9191
Operator.OperatorFactory factory = simple();
9292
String description = factory.describe();
9393
assertThat(description, expectedDescriptionOfSimple());

0 commit comments

Comments
 (0)