Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bcde6be
LookupJoin prejoin filter POC WIP
julian-elastic Jul 28, 2025
d24dab3
Get basic case with translatable filters to work
julian-elastic Jul 29, 2025
9603a7c
Fix failing UTs
julian-elastic Aug 13, 2025
e73996f
Fix failing UTs part 2
julian-elastic Aug 14, 2025
278877a
Add additional checks for right pushable filters
julian-elastic Aug 14, 2025
ec9817d
Merge branch 'main' into lookupPrefilter_v2
julian-elastic Aug 15, 2025
a59d0de
Update docs/changelog/132889.yaml
julian-elastic Aug 15, 2025
6e6e28e
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
018b40d
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
2ee02a1
bugfix
julian-elastic Aug 16, 2025
abfe672
Limit the change to pushable filters only, make the filter optional
julian-elastic Aug 19, 2025
7f82362
Add more UTs
julian-elastic Aug 20, 2025
ba9ab52
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 20, 2025
2aa2b49
Clean up, add more UTs
julian-elastic Aug 20, 2025
0e7b3ed
Update docs/changelog/133166.yaml
julian-elastic Aug 20, 2025
66c126f
Fix a bug where a mix of pushable and non-pushable filters resulted i…
julian-elastic Aug 21, 2025
84d2dcb
Address code review comments, add UTs
julian-elastic Aug 22, 2025
6b41aa9
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 22, 2025
d32cdc4
Fix formatting for UT
julian-elastic Aug 22, 2025
68d319b
Switch to storing the optional filter in the RHS of the Join
julian-elastic Aug 26, 2025
5405235
Address code review feedback
julian-elastic Aug 26, 2025
03796e0
Fix merge errors
julian-elastic Aug 27, 2025
c15df0a
Address a missed comment
julian-elastic Aug 27, 2025
63018a7
Switch to passing local logical plan to lookup node
julian-elastic Aug 27, 2025
76b4042
Switch to passing local logical plan to lookup node
julian-elastic Aug 28, 2025
4205693
Address more code review feedback
julian-elastic Aug 28, 2025
5a2b2fd
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
3c39e90
fix failing UT
julian-elastic Aug 28, 2025
35121eb
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
40c6d7a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
242455f
Address more code review comments
julian-elastic Aug 28, 2025
0fd03d7
WIP POC for expression join
julian-elastic Aug 21, 2025
cfae068
Change Parser to allow AND of expressions
julian-elastic Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions docs/changelog/133166.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133166
summary: Improve Expanding Lookup Join performance by pushing a filter to the right
side of the lookup join
area: ES|QL
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_147_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_148_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public Query regexpQuery(
}

/**
* Returns a Lucine pushable Query for the current field
* Returns a Lucene pushable Query for the current field
* For now can only be AutomatonQuery or MatchAllDocsQuery() or MatchNoDocsQuery()
*/
public Query automatonQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,26 +698,42 @@ private Map<String, Object> fetchMvLongs() throws IOException {
public void testLookupExplosion() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFields() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 990;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyMatchesManyFields() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
}

public void testLookupExplosionManyMatches() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
}

public void testLookupExplosionManyMatchesFiltered() throws IOException {
// This test will only work with the expanding join optimization
// that pushes the filter to the right side of the lookup.
// Without the optimization, it will fail with circuit_breaking_exception
int sensorDataCount = 10000;
int lookupEntries = 10000;
int reductionFactor = 1000; // reduce the number of matches by this factor
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
assertTrue(0 == lookupEntries % reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));

}

public void testLookupExplosionNoFetch() throws IOException {
Expand All @@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
}

private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
StringBuilder query = startQuery();
Expand All @@ -755,7 +772,14 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
}
query.append("id").append(i);
}
query.append(" | STATS COUNT(location)\"}");
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
// we add both a Lucene pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);

}
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
Expand Down Expand Up @@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"location": { "type": "geo_point" }
"location": { "type": "geo_point" },
"filter_key": { "type": "integer" }
}
}""");
CreateIndexResponse response = createIndex(
Expand All @@ -1058,7 +1083,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
}
data.append(String.format(Locale.ROOT, """
"location": "POINT(%s)"}\n""", location.apply(sensor)));
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_lookup", data.toString());
data.setLength(0);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public final Query getQuery(int position) {
* Returns the query at the given position.
*/
@Nullable
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);
public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);

private Query wrapSingleValueQuery(Query query) {
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";
Expand Down Expand Up @@ -159,13 +159,8 @@ private Query wrapSingleValueQuery(Query query) {
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
IntFunction<Object> blockToJavaObject = switch (block.elementType()) {
public static IntFunction<Object> createBlockValueReader(Block block) {
return switch (block.elementType()) {
case BOOLEAN -> {
BooleanBlock booleanBlock = (BooleanBlock) block;
yield booleanBlock::getBoolean;
Expand Down Expand Up @@ -196,7 +191,20 @@ public static QueryList rawTermQueryList(
case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block");
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject);
}

/**
* Returns a list of term queries for the given field and the input block
* using only the {@link ElementType} of the {@link Block} to determine the
* query.
*/
public static QueryList rawTermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
AliasFilter aliasFilter,
Block block
) {
return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block));
}

/**
Expand Down Expand Up @@ -297,7 +305,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
Expand Down Expand Up @@ -360,7 +368,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
Expand Down Expand Up @@ -412,7 +420,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa
}

@Override
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
public Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> shapeQuery.apply(firstValueIndex);
Expand Down Expand Up @@ -453,5 +461,5 @@ private IntFunction<Query> shapeQuery() {
}
}

protected record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
public record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected final Operator.OperatorFactory simple() {
/**
* Makes sure the description of {@link #simple} matches the {@link #expectedDescriptionOfSimple}.
*/
public final void testSimpleDescription() {
public void testSimpleDescription() {
Operator.OperatorFactory factory = simple();
String description = factory.describe();
assertThat(description, expectedDescriptionOfSimple());
Expand Down
Loading