Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bcde6be
LookupJoin prejoin filter POC WIP
julian-elastic Jul 28, 2025
d24dab3
Get basic case with translatable filters to work
julian-elastic Jul 29, 2025
9603a7c
Fix failing UTs
julian-elastic Aug 13, 2025
e73996f
Fix failing UTs part 2
julian-elastic Aug 14, 2025
278877a
Add additional checks for right pushable filters
julian-elastic Aug 14, 2025
ec9817d
Merge branch 'main' into lookupPrefilter_v2
julian-elastic Aug 15, 2025
a59d0de
Update docs/changelog/132889.yaml
julian-elastic Aug 15, 2025
6e6e28e
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
018b40d
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
2ee02a1
bugfix
julian-elastic Aug 16, 2025
abfe672
Limit the change to pushable filters only, make the filter optional
julian-elastic Aug 19, 2025
7f82362
Add more UTs
julian-elastic Aug 20, 2025
ba9ab52
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 20, 2025
2aa2b49
Clean up, add more UTs
julian-elastic Aug 20, 2025
0e7b3ed
Update docs/changelog/133166.yaml
julian-elastic Aug 20, 2025
66c126f
Fix a bug where a mix of pushable and non-pushable filters resulted i…
julian-elastic Aug 21, 2025
84d2dcb
Address code review comments, add UTs
julian-elastic Aug 22, 2025
6b41aa9
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 22, 2025
d32cdc4
Fix formatting for UT
julian-elastic Aug 22, 2025
68d319b
Switch to storing the optional filter in the RHS of the Join
julian-elastic Aug 26, 2025
5405235
Address code review feedback
julian-elastic Aug 26, 2025
03796e0
Fix merge errors
julian-elastic Aug 27, 2025
c15df0a
Address a missed comment
julian-elastic Aug 27, 2025
63018a7
Switch to passing local logical plan to lookup node
julian-elastic Aug 27, 2025
76b4042
Switch to passing local logical plan to lookup node
julian-elastic Aug 28, 2025
4205693
Address more code review feedback
julian-elastic Aug 28, 2025
5a2b2fd
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
3c39e90
fix failing UT
julian-elastic Aug 28, 2025
35121eb
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
40c6d7a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
242455f
Address more code review comments
julian-elastic Aug 28, 2025
f5cb543
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 2, 2025
f7ff90e
Address code review comments
julian-elastic Sep 2, 2025
3116545
Address code review comments, part 2
julian-elastic Sep 2, 2025
7a8af28
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
c0733f7
Address more code review comments
julian-elastic Sep 3, 2025
166130f
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
0550dae
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
328af0a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133166.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133166
summary: Improve Expanding Lookup Join performance by pushing a filter to the right
side of the lookup join
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00);
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_147_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,26 +698,42 @@ private Map<String, Object> fetchMvLongs() throws IOException {
public void testLookupExplosion() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFields() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 990;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyMatchesManyFields() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
}

public void testLookupExplosionManyMatches() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
}

public void testLookupExplosionManyMatchesFiltered() throws IOException {
// This test will only work with the expanding join optimization
// that pushes the filter to the right side of the lookup.
// Without the optimization, it will fail with circuit_breaking_exception
int sensorDataCount = 10000;
int lookupEntries = 10000;
int reductionFactor = 1000; // reduce the number of matches by this factor
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
assertTrue(0 == lookupEntries % reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));

}

public void testLookupExplosionNoFetch() throws IOException {
Expand All @@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
}

private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
StringBuilder query = startQuery();
Expand All @@ -755,7 +772,14 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
}
query.append("id").append(i);
}
query.append(" | STATS COUNT(location)\"}");
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
// we add both a Lucine pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);

}
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
Expand Down Expand Up @@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"location": { "type": "geo_point" }
"location": { "type": "geo_point" },
"filter_key": { "type": "integer" }
}
}""");
CreateIndexResponse response = createIndex(
Expand All @@ -1058,7 +1083,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
}
data.append(String.format(Locale.ROOT, """
"location": "POINT(%s)"}\n""", location.apply(sensor)));
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_lookup", data.toString());
data.setLength(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil
return page;
}

private Query nextQuery() {
private Query nextQuery() throws IOException {
++queryPosition;
while (isFinished() == false) {
Query query = queryList.getQuery(queryPosition);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected final Operator.OperatorFactory simple() {
/**
* Makes sure the description of {@link #simple} matches the {@link #expectedDescriptionOfSimple}.
*/
public final void testSimpleDescription() {
public void testSimpleDescription() {
Operator.OperatorFactory factory = simple();
String description = factory.describe();
assertThat(description, expectedDescriptionOfSimple());
Expand Down
Loading