diff --git a/docs/changelog/133166.yaml b/docs/changelog/133166.yaml new file mode 100644 index 0000000000000..eadd1b2bad8be --- /dev/null +++ b/docs/changelog/133166.yaml @@ -0,0 +1,6 @@ +pr: 133166 +summary: Improve Expanding Lookup Join performance by pushing a filter to the right + side of the lookup join +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 1338339b01173..f5bed66d707bc 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -368,6 +368,7 @@ static TransportVersion def(int id) { public static final TransportVersion DATA_STREAM_WRITE_INDEX_ONLY_SETTINGS = def(9_142_0_00); public static final TransportVersion SCRIPT_RESCORER = def(9_143_0_00); public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00); + public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_145_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index b463202221dc1..a85620ee29b3d 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -698,7 +698,7 @@ private Map fetchMvLongs() throws IOException { public void testLookupExplosion() throws IOException { int sensorDataCount = 400; int lookupEntries = 10000; - Map map = lookupExplosion(sensorDataCount, lookupEntries, 1); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); } @@ -706,18 +706,34 @@ public void testLookupExplosionManyFields() throws IOException { int sensorDataCount = 400; int lookupEntries = 1000; int joinFieldsCount = 990; - Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount); + Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); } public void testLookupExplosionManyMatchesManyFields() throws IOException { // 1500, 10000 is enough locally, but some CI machines need more. - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30)); + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries)); } public void testLookupExplosionManyMatches() throws IOException { // 1500, 10000 is enough locally, but some CI machines need more. - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1)); + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries)); + } + + public void testLookupExplosionManyMatchesFiltered() throws IOException { + // This test will only work with the expanding join optimization + // that pushes the filter to the right side of the lookup. + // Without the optimization, it will fail with circuit_breaking_exception + int sensorDataCount = 10000; + int lookupEntries = 10000; + int reductionFactor = 1000; // reduce the number of matches by this factor + // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value + assertTrue(0 == lookupEntries % reductionFactor); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor)))); + } public void testLookupExplosionNoFetch() throws IOException { @@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException { assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1)); } - private Map lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException { + private Map lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep) + throws IOException { try { lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount); StringBuilder query = startQuery(); @@ -755,7 +772,14 @@ private Map lookupExplosion(int sensorDataCount, int lookupEntri } query.append("id").append(i); } - query.append(" | STATS COUNT(location)\"}"); + if (lookupEntries != lookupEntriesToKeep) { + // add a filter to reduce the number of matches + // we add both a Lucine pushable filter and a non-pushable filter + // this is to make sure that even if there are non-pushable filters the pushable filters is still applied + query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep); + + } + query.append(" | STATS COUNT(location) | LIMIT 100\"}"); return responseAsMap(query(query.toString(), null)); } finally { deleteIndex("sensor_data"); @@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction queryLists; - - public ExpressionQueryList(List queryLists) { - if (queryLists.size() < 2) { - throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); - } - this.queryLists = queryLists; - } - - @Override - public Query getQuery(int position) { - BooleanQuery.Builder builder = new BooleanQuery.Builder(); - for (QueryList queryList : queryLists) { - Query q = queryList.getQuery(position); - if (q == null) { - // if any of the matchFields are null, it means there is no match for this position - // A AND NULL is always NULL, so we can skip this position - return null; - } - builder.add(q, BooleanClause.Occur.FILTER); - } - return builder.build(); - } - - @Override - public int getPositionCount() { - int positionCount = queryLists.get(0).getPositionCount(); - for (QueryList queryList : queryLists) { - if (queryList.getPositionCount() != positionCount) { - throw new IllegalStateException( - "All QueryLists must have the same position count, expected: " - + positionCount - + ", but got: " - + queryList.getPositionCount() - ); - } - } - return positionCount; - } -} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java index cf581d9e83b43..b71e1373df859 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.Query; import org.elasticsearch.core.Nullable; +import java.io.IOException; + /** * An interface to generates queries for the lookup and enrich operators. * This interface is used to retrieve queries based on a position index. @@ -20,7 +22,7 @@ public interface LookupEnrichQueryGenerator { * Returns the query at the given position. */ @Nullable - Query getQuery(int position); + Query getQuery(int position) throws IOException; /** * Returns the number of queries in this generator diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 4d93ab63aaa5d..075d6205c8ecb 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -89,7 +89,7 @@ public int getPositionCount() { public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage); @Override - public final Query getQuery(int position) { + public final Query getQuery(int position) throws IOException { final int valueCount = block.getValueCount(position); if (onlySingleValueParams != null && valueCount != 1) { if (valueCount > 1) { @@ -125,7 +125,7 @@ public final Query getQuery(int position) { * Returns the query at the given position. */ @Nullable - abstract Query doGetQuery(int position, int firstValueIndex, int valueCount); + public abstract Query doGetQuery(int position, int firstValueIndex, int valueCount) throws IOException; private Query wrapSingleValueQuery(Query query) { assert onlySingleValueParams != null : "Requested to wrap single value query without single value params"; @@ -159,13 +159,8 @@ private Query wrapSingleValueQuery(Query query) { * using only the {@link ElementType} of the {@link Block} to determine the * query. */ - public static QueryList rawTermQueryList( - MappedFieldType field, - SearchExecutionContext searchExecutionContext, - AliasFilter aliasFilter, - Block block - ) { - IntFunction blockToJavaObject = switch (block.elementType()) { + public static IntFunction createBlockValueReader(Block block) { + return switch (block.elementType()) { case BOOLEAN -> { BooleanBlock booleanBlock = (BooleanBlock) block; yield booleanBlock::getBoolean; @@ -196,7 +191,20 @@ public static QueryList rawTermQueryList( case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block"); case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; - return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, blockToJavaObject); + } + + /** + * Returns a list of term queries for the given field and the input block + * using only the {@link ElementType} of the {@link Block} to determine the + * query. + */ + public static QueryList rawTermQueryList( + MappedFieldType field, + SearchExecutionContext searchExecutionContext, + AliasFilter aliasFilter, + Block block + ) { + return new TermQueryList(field, searchExecutionContext, aliasFilter, block, null, createBlockValueReader(block)); } /** @@ -297,7 +305,7 @@ public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarnin } @Override - Query doGetQuery(int position, int firstValueIndex, int valueCount) { + public Query doGetQuery(int position, int firstValueIndex, int valueCount) { return switch (valueCount) { case 0 -> null; case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext); @@ -360,7 +368,7 @@ public DateNanosQueryList onlySingleValues(Warnings warnings, String multiValueW } @Override - Query doGetQuery(int position, int firstValueIndex, int valueCount) { + public Query doGetQuery(int position, int firstValueIndex, int valueCount) { return switch (valueCount) { case 0 -> null; case 1 -> dateFieldType.equalityQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext); @@ -412,7 +420,7 @@ public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWa } @Override - Query doGetQuery(int position, int firstValueIndex, int valueCount) { + public Query doGetQuery(int position, int firstValueIndex, int valueCount) { return switch (valueCount) { case 0 -> null; case 1 -> shapeQuery.apply(firstValueIndex); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 8694505411291..5866776131cbc 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -2006,6 +2006,100 @@ language_code_float:integer | language_name:keyword 127 | max_byte ; +lookupWithExpressionEquals +required_capability: join_lookup_v12 +required_capability: lookup_join_on_expression + +FROM employees +| WHERE emp_no == 10001 +| LOOKUP JOIN languages_lookup ON languages == language_code +| KEEP emp_no, languages, language_code, language_name +; + +emp_no:integer | languages:integer | language_code:integer | language_name:keyword +10001 | 2 | 2 | French +; + +lookupWithExpressionNotEquals +required_capability: join_lookup_v12 +required_capability: lookup_join_on_expression + +FROM employees +| WHERE emp_no == 10001 +| LOOKUP JOIN languages_lookup ON languages != language_code +| KEEP emp_no, languages, language_code, language_name +| SORT language_code +; + +emp_no:integer | languages:integer | language_code:integer | language_name:keyword +10001 | 2 | 1 | English +10001 | 2 | 3 | Spanish +10001 | 2 | 4 | German +; + +lookupWithExpressionGreater +required_capability: join_lookup_v12 +required_capability: lookup_join_on_expression + +FROM employees +| WHERE emp_no == 10001 +| LOOKUP JOIN languages_lookup ON languages > language_code +| KEEP emp_no, languages, language_code, language_name +; + +emp_no:integer | languages:integer | language_code:integer | language_name:keyword +10001 | 2 | 1 | English +; + +lookupWithExpressionGreaterOrEquals +required_capability: join_lookup_v12 +required_capability: lookup_join_on_expression + +FROM employees +| WHERE emp_no == 10001 +| LOOKUP JOIN languages_lookup ON languages >= language_code +| KEEP emp_no, languages, language_code, language_name +| SORT language_code +; + +emp_no:integer | languages:integer | language_code:integer | language_name:keyword +10001 | 2 | 1 | English +10001 | 2 | 2 | French +; + +lookupWithExpressionLess +required_capability: join_lookup_v12 +required_capability: lookup_join_on_expression + +FROM employees +| WHERE emp_no == 10001 +| LOOKUP JOIN languages_lookup ON languages < language_code +| KEEP emp_no, languages, language_code, language_name +| SORT language_code +; + +emp_no:integer | languages:integer | language_code:integer | language_name:keyword +10001 | 2 | 3 | Spanish +10001 | 2 | 4 | German +; + +lookupWithExpressionLessOrEquals +required_capability: join_lookup_v12 +required_capability: lookup_join_on_expression + +FROM employees +| WHERE emp_no == 10001 +| LOOKUP JOIN languages_lookup ON languages <= language_code +| KEEP emp_no, languages, language_code, language_name +| SORT language_code +; + +emp_no:integer | languages:integer | language_code:integer | language_name:keyword +10001 | 2 | 2 | French +10001 | 2 | 3 | Spanish +10001 | 2 | 4 | German +; + byteJoinDouble required_capability: join_lookup_v12 required_capability: lookup_join_on_mixed_numeric_fields @@ -5149,3 +5243,200 @@ null | null | bar2 | null | null null | null | corge | null | null null | null | fred | null | null ; + + +lookupJoinWithPushableFilterOnRight +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithTwoPushableFiltersOnRight +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| WHERE other1 like "*ta" +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +; + +lookupJoinWithMixLeftAndRightFilters +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithMixLeftAndRightFiltersNotPushableToLucene +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsNotPushable +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > id_int + 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:23: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:23: java.lang.IllegalArgumentException: single-value function encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsPushable + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE other2 > id_int + 5000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:18: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:18: java.lang.IllegalArgumentException: single-value function encountered multi-value + + id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer + 4 | David | qux | zeta | 6000 + 5 | Eve | quux | eta | 7000 + 5 | Eve | quux | theta | 8000 + 6 | null | corge | iota | 9000 + 7 | Grace | grault | kappa | 10000 + 8 | Hank | garply | lambda | 11000 + 12 | Liam | xyzzy | nu | 13000 + 13 | Mia | thud | xi | 14000 + 14 | Nina | foo2 | omicron | 15000 + ; + + lookupJoinWithJoinAttrFilter + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE id_int > 7 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 3:9: evaluation of [id_int > 7] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:9: java.lang.IllegalArgumentException: single-value function encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +8 | Hank | garply | lambda | 11000 +9 | null | waldo | null | null +10 | null | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | null | bar2 | null | null +; + + +lookupJoinWithExpressionOfOtherFields + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE ABS(other2) > LENGTH(other1)*1000 + 2000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index dc8a14abf1e12..ab42603f7e3ed 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -22,7 +22,6 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.LongBlock; -import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSliceQueue; @@ -54,12 +53,16 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator; import org.elasticsearch.xpack.esql.enrich.MatchConfig; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -75,6 +78,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Predicate; import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; @@ -83,13 +87,14 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase { public void testKeywordKey() throws IOException { - runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } })); + runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }), null); } public void testJoinOnTwoKeys() throws IOException { runLookup( List.of(DataType.KEYWORD, DataType.LONG), - new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } }) + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } }), + null ); } @@ -101,7 +106,8 @@ public void testJoinOnThreeKeys() throws IOException { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L }, new String[] { "one", "two", "three", "four" }, } - ) + ), + null ); } @@ -114,25 +120,35 @@ public void testJoinOnFourKeys() throws IOException { new Long[] { 12L, 33L, 1L, 42L }, new String[] { "one", "two", "three", "four" }, new Integer[] { 1, 2, 3, 4 }, } - ) + ), + buildGreaterThanFilter(1L) ); } public void testLongKey() throws IOException { - runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } })); + runLookup( + List.of(DataType.LONG), + new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), + buildGreaterThanFilter(0L) + ); } /** * LOOKUP multiple results match. */ public void testLookupIndexMultiResults() throws IOException { - runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } })); + runLookup( + List.of(DataType.KEYWORD), + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), + buildGreaterThanFilter(-1L) + ); } public void testJoinOnTwoKeysMultiResults() throws IOException { runLookup( List.of(DataType.KEYWORD, DataType.LONG), - new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } }) + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } }), + null ); } @@ -144,12 +160,13 @@ public void testJoinOnThreeKeysMultiResults() throws IOException { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L }, new String[] { "one", "two", "two", "four" } } - ) + ), + null ); } interface PopulateIndices { - void populate(int docCount, List expected) throws IOException; + void populate(int docCount, List expected, Predicate filter) throws IOException; } class UsingSingleLookupTable implements PopulateIndices { @@ -171,7 +188,7 @@ class UsingSingleLookupTable implements PopulateIndices { } @Override - public void populate(int docCount, List expected) { + public void populate(int docCount, List expected, Predicate filter) { List docs = new ArrayList<>(); int numFields = lookupData.length; int numRows = lookupData[0].length; @@ -190,8 +207,13 @@ public void populate(int docCount, List expected) { } else { keyString = String.join(",", key.stream().map(String::valueOf).toArray(String[]::new)); } - for (Integer match : matches.get(key)) { - expected.add(keyString + ":" + match); + List filteredMatches = matches.get(key).stream().filter(filter).toList(); + if (filteredMatches.isEmpty()) { + expected.add(keyString + ":null"); + } else { + for (Integer match : filteredMatches) { + expected.add(keyString + ":" + match); + } } } for (int i = 0; i < numRows; i++) { @@ -207,7 +229,17 @@ public void populate(int docCount, List expected) { } } - private void runLookup(List keyTypes, PopulateIndices populateIndices) throws IOException { + private List buildGreaterThanFilter(long value) { + FieldAttribute filterAttribute = new FieldAttribute( + Source.EMPTY, + "l", + new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) + ); + Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG)); + return List.of(greaterThan); + } + + private void runLookup(List keyTypes, PopulateIndices populateIndices, List filters) throws IOException { String[] fieldMappers = new String[keyTypes.size() * 2]; for (int i = 0; i < keyTypes.size(); i++) { fieldMappers[2 * i] = "key" + i; @@ -236,9 +268,23 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get(); + Predicate filterPredicate = l -> true; + if (filters != null) { + if (filters.size() == 1 + && filters.get(0) instanceof GreaterThan gt + && gt.left() instanceof FieldAttribute fa + && fa.name().equals("l") + && gt.right() instanceof Literal lit) { + long value = ((Number) lit.value()).longValue(); + filterPredicate = l -> l > value; + } else { + fail("Unsupported filter type in test baseline generation: " + filters); + } + } + int docCount = between(10, 1000); List expected = new ArrayList<>(docCount); - populateIndices.populate(docCount, expected); + populateIndices.populate(docCount, expected, filterPredicate); /* * Find the data node hosting the only shard of the source index. @@ -331,7 +377,8 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) "lookup", "lookup", List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))), - Source.EMPTY + Source.EMPTY, + filters ); DriverContext driverContext = driverContext(); try ( @@ -345,7 +392,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) for (int i = 0; i < keyTypes.size(); i++) { keyBlocks.add(page.getBlock(i + 1)); } - LongVector loadedBlock = page.getBlock(keyTypes.size() + 1).asVector(); + LongBlock loadedBlock = page.getBlock(keyTypes.size() + 1); for (int p = 0; p < page.getPositionCount(); p++) { StringBuilder result = new StringBuilder(); for (int j = 0; j < keyBlocks.size(); j++) { @@ -361,7 +408,11 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) } } - result.append(":" + loadedBlock.getLong(p)); + if (loadedBlock.isNull(p)) { + result.append(":null"); + } else { + result.append(":" + loadedBlock.getLong(loadedBlock.getFirstValueIndex(p))); + } results.add(result.toString()); } } finally { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index b23f8e60fd88d..eaf4ee49bafe8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -85,6 +85,7 @@ import org.elasticsearch.xpack.esql.expression.function.vector.VectorFunction; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.DateTimeArithmeticOperation; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.EsqlArithmeticOperation; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; @@ -704,10 +705,25 @@ private LogicalPlan resolveLookup(Lookup l, List childrenOutput) { return l; } + private List resolveJoinFilters(List filters, List leftOutput, List rightOutput) { + if (filters.isEmpty()) { + return emptyList(); + } + List childrenOutput = new ArrayList<>(leftOutput); + childrenOutput.addAll(rightOutput); + + List resolvedFilters = new ArrayList<>(filters.size()); + for (Expression filter : filters) { + resolvedFilters.add(filter.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput))); + } + return resolvedFilters; + } + private Join resolveLookupJoin(LookupJoin join) { JoinConfig config = join.config(); // for now, support only (LEFT) USING clauses JoinType type = config.type(); + // rewrite the join into an equi-join between the field with the same name between left and right if (type instanceof UsingJoinType using) { List cols = using.columns(); @@ -727,12 +743,33 @@ private Join resolveLookupJoin(LookupJoin join) { ); return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList())); } - // resolve the using columns against the left and the right side then assemble the new join config - List leftKeys = resolveUsingColumns(cols, join.left().output(), "left"); - List rightKeys = resolveUsingColumns(cols, join.right().output(), "right"); + List leftKeys = new ArrayList<>(); + List rightKeys = new ArrayList<>(); + List resolvedFilters = new ArrayList<>(); + List matchKeys; + if (join.candidateRightHandFilters().isEmpty() == false) { + resolvedFilters = resolveJoinFilters(join.candidateRightHandFilters(), join.left().output(), join.right().output()); + // build leftKeys and rightKeys using the left side of the resolvedFilters. + for (Expression expression : resolvedFilters) { + if (expression instanceof EsqlBinaryComparison binaryComparison) { + leftKeys.add((Attribute) binaryComparison.left()); + rightKeys.add((Attribute) binaryComparison.right()); + } else { + throw new EsqlIllegalArgumentException("Unsupported join filter expression: " + expression); + } + } + Set matchKeysSet = new HashSet<>(leftKeys); + matchKeysSet.addAll(rightKeys); + matchKeys = new ArrayList<>(matchKeysSet); + } else { + // resolve the using columns against the left and the right side then assemble the new join config + leftKeys = resolveUsingColumns(cols, join.left().output(), "left"); + rightKeys = resolveUsingColumns(cols, join.right().output(), "right"); + matchKeys = leftKeys; + } - config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys); - join = new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote()); + config = new JoinConfig(coreJoin, matchKeys, leftKeys, rightKeys); + return new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote(), resolvedFilters); } else if (type != JoinTypes.LEFT) { // everything else is unsupported for now // LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run @@ -741,6 +778,7 @@ private Join resolveLookupJoin(LookupJoin join) { // add error message return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList())); } + return join; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index b52c6472f962a..c834b538c5e86 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -558,6 +558,10 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen this.source = source; } + public Page getInputPage() { + return inputPage; + } + @Override public final String[] indices() { return new String[] { indexPattern }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/BinaryComparisonQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/BinaryComparisonQueryList.java new file mode 100644 index 0000000000000..c7c9fc87374be --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/BinaryComparisonQueryList.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.apache.lucene.search.Query; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; +import org.elasticsearch.xpack.esql.planner.TranslatorHandler; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.stats.SearchContextStats; + +import java.io.IOException; +import java.util.List; +import java.util.function.IntFunction; + +public class BinaryComparisonQueryList extends QueryList { + private final EsqlBinaryComparison binaryComparison; + private final IntFunction blockValueReader; + private final SearchExecutionContext searchExecutionContext; + private final LucenePushdownPredicates lucenePushdownPredicates; + + public BinaryComparisonQueryList( + MappedFieldType field, + SearchExecutionContext searchExecutionContext, + Block block, + EsqlBinaryComparison binaryComparison, + ClusterService clusterService, + AliasFilter aliasFilter + ) { + super(field, searchExecutionContext, aliasFilter, block, null); + // swap left and right if the field is on the right + // We get a filter in the form left_expr >= right_expr + // here we will swap it to right_expr <= left_expr + // and later in doGetQuery we will replace left_expr with the value from the block + this.binaryComparison = (EsqlBinaryComparison) binaryComparison.swapLeftAndRight(); + this.blockValueReader = QueryList.createBlockValueReader(block); + this.searchExecutionContext = searchExecutionContext; + lucenePushdownPredicates = LucenePushdownPredicates.from( + SearchContextStats.from(List.of(searchExecutionContext)), + new EsqlFlags(clusterService.getClusterSettings()) + ); + } + + @Override + public QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage) { + throw new UnsupportedOperationException(); + } + + @Override + public Query doGetQuery(int position, int firstValueIndex, int valueCount) throws IOException { + if (valueCount == 0) { + return null; + } + Object value = blockValueReader.apply(firstValueIndex); + // create a new comparison with the value from the block as a literal + EsqlBinaryComparison comparison = binaryComparison.getFunctionType() + .buildNewInstance( + binaryComparison.source(), + binaryComparison.left(), + new Literal(binaryComparison.right().source(), value, binaryComparison.right().dataType()) + ); + + return comparison.asQuery(lucenePushdownPredicates, TranslatorHandler.TRANSLATOR_HANDLER) + .toQueryBuilder() + .toQuery(searchExecutionContext); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java new file mode 100644 index 0000000000000..9a0e647469b92 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; +import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.xpack.esql.capabilities.TranslationAware; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.stats.SearchContextStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; + +/** + * A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query. + * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position. + * In the future we can extend this to support more complex expressions, such as disjunctions or negations. + */ +public class ExpressionQueryList implements LookupEnrichQueryGenerator { + private static final Logger logger = LogManager.getLogger(ExpressionQueryList.class); + private List queryLists; + private final List preJoinFilters = new ArrayList<>(); + private final SearchExecutionContext context; + boolean isExpressionJoin = false; + private final AliasFilter aliasFilter; + + public ExpressionQueryList( + List queryLists, + SearchExecutionContext context, + List candidateRightHandFilters, + ClusterService clusterService, + LookupFromIndexService.TransportRequest request, + AliasFilter aliasFilter + ) { + if (queryLists.size() < 2 && (candidateRightHandFilters == null || candidateRightHandFilters.isEmpty())) { + throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); + } + this.queryLists = new ArrayList<>(); + this.context = context; + this.aliasFilter = aliasFilter; + buildPrePostJoinFilter(candidateRightHandFilters, clusterService, request); + if (isExpressionJoin == false) { + this.queryLists.addAll(queryLists); + } + + } + + private void buildPrePostJoinFilter( + List candidateRightHandFilters, + ClusterService clusterService, + LookupFromIndexService.TransportRequest request + ) { + if (candidateRightHandFilters == null || candidateRightHandFilters.isEmpty()) { + return; // no filters to apply + } + for (Expression filter : candidateRightHandFilters) { + try { + if (filter instanceof TranslationAware translationAware) { + LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from( + SearchContextStats.from(List.of(context)), + new EsqlFlags(clusterService.getClusterSettings()) + ); + if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { + preJoinFilters.add( + translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context) + ); + } else { + applyFilterWithQueryList(filter, request, clusterService); + } + } else { + applyFilterWithQueryList(filter, request, clusterService); + } + // If the filter is not translatable we will not apply it for now + // as performance testing showed no performance improvement. + // We can revisit this in the future if needed, once we have more optimized workflow in place. + // The filter is optional, so it is OK to ignore it if it cannot be translated. + } catch (IOException e) { + // as the filter is optional an error in its application will be ignored + logger.error(() -> "Failed to translate optional pre-join filter: [" + filter + "]", e); + } + } + } + + private void applyFilterWithQueryList( + Expression filter, + LookupFromIndexService.TransportRequest request, + ClusterService clusterService + ) { + if (filter instanceof EsqlBinaryComparison binaryComparison) { + // the left side comes from the page that was sent to the lookup node + // the right side is the field from the lookup index + // check if the left side is in the request.getMatchFields() + // if it is its corresponding page is the corresponding number in request.inputPage + Expression left = binaryComparison.left(); + if (left instanceof Attribute leftAttribute) { + for (int i = 0; i < request.getMatchFields().size(); i++) { + if (request.getMatchFields().get(i).fieldName().string().equals(leftAttribute.name())) { + Block block = request.getInputPage().getBlock(i); + Expression right = binaryComparison.right(); + if (right instanceof Attribute rightAttribute) { + MappedFieldType fieldType = context.getFieldType(rightAttribute.name()); + if (fieldType != null) { + isExpressionJoin = true; + queryLists.add( + new BinaryComparisonQueryList(fieldType, context, block, binaryComparison, clusterService, aliasFilter) + ); + } + } + } + } + } + } + } + + @Override + public Query getQuery(int position) throws IOException { + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + for (QueryList queryList : queryLists) { + Query q = queryList.getQuery(position); + if (q == null) { + // if any of the matchFields are null, it means there is no match for this position + // A AND NULL is always NULL, so we can skip this position + return null; + } + builder.add(q, BooleanClause.Occur.FILTER); + } + // also attach the pre-join filter if it exists + for (Query preJoinFilter : preJoinFilters) { + builder.add(preJoinFilter, BooleanClause.Occur.FILTER); + } + return builder.build(); + } + + @Override + public int getPositionCount() { + int positionCount = queryLists.get(0).getPositionCount(); + for (QueryList queryList : queryLists) { + if (queryList.getPositionCount() != positionCount) { + throw new IllegalArgumentException( + "All QueryLists must have the same position count, expected: " + + positionCount + + ", but got: " + + queryList.getPositionCount() + ); + } + } + return positionCount; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index 6f2f37119210e..aaeeaace353a4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -24,6 +24,7 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -34,6 +35,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; // TODO rename package public final class LookupFromIndexOperator extends AsyncOperator { @@ -47,7 +49,8 @@ public record Factory( String lookupIndexPattern, String lookupIndex, List loadFields, - Source source + Source source, + List candidateRightHandFilters ) implements OperatorFactory { @Override public String describe() { @@ -61,6 +64,8 @@ public String describe() { .append(" inputChannel=") .append(matchField.channel()); } + stringBuilder.append(" optional_filter=") + .append(candidateRightHandFilters.stream().map(Expression::toString).collect(Collectors.joining(", "))); stringBuilder.append("]"); return stringBuilder.toString(); } @@ -77,7 +82,8 @@ public Operator get(DriverContext driverContext) { lookupIndexPattern, lookupIndex, loadFields, - source + source, + candidateRightHandFilters ); } } @@ -90,7 +96,8 @@ public Operator get(DriverContext driverContext) { private final List loadFields; private final Source source; private long totalRows = 0L; - private List matchFields; + private final List matchFields; + private final List candidateRightHandFilters; /** * Total number of pages emitted by this {@link Operator}. */ @@ -114,7 +121,8 @@ public LookupFromIndexOperator( String lookupIndexPattern, String lookupIndex, List loadFields, - Source source + Source source, + List candidateRightHandFilters ) { super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests); this.matchFields = matchFields; @@ -125,6 +133,7 @@ public LookupFromIndexOperator( this.lookupIndex = lookupIndex; this.loadFields = loadFields; this.source = source; + this.candidateRightHandFilters = candidateRightHandFilters != null ? candidateRightHandFilters : new ArrayList<>(); } @Override @@ -151,7 +160,8 @@ protected void performAsync(Page inputPage, ActionListener listener newMatchFields, new Page(inputBlockArray), loadFields, - source + source, + candidateRightHandFilters ); lookupService.lookupAsync( request, @@ -211,6 +221,8 @@ public String toString() { .append(" inputChannel=") .append(matchField.channel()); } + stringBuilder.append(" optional_filter=") + .append(candidateRightHandFilters.stream().map(Expression::toString).collect(Collectors.joining(", "))); stringBuilder.append("]"); return stringBuilder.toString(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 3eab54a7e0efc..4bcfd665585aa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -20,7 +20,6 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; -import org.elasticsearch.compute.operator.lookup.ExpressionQueryList; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Releasables; @@ -32,6 +31,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -88,7 +88,8 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque null, request.extractFields, request.matchFields, - request.source + request.source, + request.candidateRightHandFilters ); } @@ -112,10 +113,11 @@ protected LookupEnrichQueryGenerator queryList( ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); queryLists.add(q); } - if (queryLists.size() == 1) { + if (queryLists.size() == 1 && (request.candidateRightHandFilters == null || request.candidateRightHandFilters.isEmpty())) { return queryLists.getFirst(); } - return new ExpressionQueryList(queryLists); + return new ExpressionQueryList(queryLists, context, request.candidateRightHandFilters, clusterService, request, aliasFilter); + } @Override @@ -130,6 +132,7 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in public static class Request extends AbstractLookupService.Request { private final List matchFields; + private final List candidateRightHandFilters; Request( String sessionId, @@ -138,15 +141,19 @@ public static class Request extends AbstractLookupService.Request { List matchFields, Page inputPage, List extractFields, - Source source + Source source, + List candidateRightHandFilters ) { super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source); this.matchFields = matchFields; + this.candidateRightHandFilters = candidateRightHandFilters != null ? candidateRightHandFilters : new ArrayList<>(); + ; } } protected static class TransportRequest extends AbstractLookupService.TransportRequest { private final List matchFields; + private final List candidateRightHandFilters; // Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order // The channel information inside the MatchConfig, should say the same thing @@ -158,10 +165,12 @@ protected static class TransportRequest extends AbstractLookupService.TransportR Page toRelease, List extractFields, List matchFields, - Source source + Source source, + List candidateRightHandFilters ) { super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source); this.matchFields = matchFields; + this.candidateRightHandFilters = candidateRightHandFilters != null ? candidateRightHandFilters : new ArrayList<>(); } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -207,6 +216,12 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro String sourceText = in.readString(); source = new Source(source.source(), sourceText); } + List candidateRightHandFilters = new ArrayList<>(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + candidateRightHandFilters = planIn.readNamedWriteableCollectionAsList(Expression.class); + } else { + candidateRightHandFilters = new ArrayList<>(); + } TransportRequest result = new TransportRequest( sessionId, shardId, @@ -215,12 +230,17 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro inputPage, extractFields, matchFields, - source + source, + candidateRightHandFilters ); result.setParentTask(parentTaskId); return result; } + public List getMatchFields() { + return matchFields; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -258,11 +278,18 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) { out.writeString(source.text()); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + planOut.writeNamedWriteableCollection(candidateRightHandFilters); + } + // otherwise we will not send the candidateRightHandFilters, as it is optional that is OK } @Override protected String extraDescription() { - return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")); + return " ,match_fields=" + + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")) + + ", optional_filter=" + + candidateRightHandFilters.stream().map(Expression::toString).collect(Collectors.joining(", ")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java index 616c6710eff48..4690c22d52d42 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java @@ -28,10 +28,10 @@ public MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType typ this.type = type; } - public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) { + public MatchConfig(FieldAttribute.FieldName fieldName, Layout.ChannelAndType input) { // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup // indices, so the call to exactAttribute looks redundant now. - this(match.exactAttribute().fieldName(), input.channel(), input.type()); + this(fieldName, input.channel(), input.type()); } public MatchConfig(StreamInput in) throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java index 692bab7d653b1..6d77d8f6ca32e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java @@ -13,9 +13,12 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; @@ -45,9 +48,14 @@ * * Also combines adjacent filters using a logical {@code AND}. */ -public final class PushDownAndCombineFilters extends OptimizerRules.OptimizerRule { +public final class PushDownAndCombineFilters extends OptimizerRules.ParameterizedOptimizerRule { + + public PushDownAndCombineFilters() { + super(OptimizerRules.TransformDirection.DOWN); + } + @Override - protected LogicalPlan rule(Filter filter) { + protected LogicalPlan rule(Filter filter, LogicalOptimizerContext ctx) { LogicalPlan plan = filter; LogicalPlan child = filter.child(); Expression condition = filter.condition(); @@ -100,7 +108,7 @@ protected LogicalPlan rule(Filter filter) { // TODO: could we do better here about pushing down filters for inlinestats? // See also https://github.com/elastic/elasticsearch/issues/127497 // Push down past INLINESTATS if the condition is on the groupings - return pushDownPastJoin(filter, join); + return pushDownPastJoin(filter, join, ctx.foldCtx()); } // cannot push past a Limit, this could change the tailing result set returned return plan; @@ -127,7 +135,7 @@ private static ScopedFilter scopeFilter(List filters, LogicalPlan le return new ScopedFilter(rest, leftFilters, rightFilters); } - private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { + private static LogicalPlan pushDownPastJoin(Filter filter, Join join, FoldContext foldCtx) { LogicalPlan plan = filter; // pushdown only through LEFT joins // TODO: generalize this for other join types @@ -140,14 +148,53 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { // 2. filter scoped to the right // 3. filter that requires both sides to be evaluated ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); - // push the left scoped filter down to the left child, keep the rest intact + boolean optimizationApplied = false; + // push the left scoped filter down to the left child if (scoped.leftFilters.size() > 0) { // push the filter down to the left child left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters)); // update the join with the new left child join = (Join) join.replaceLeft(left); + // we completely applied the left filters, so we can remove them from the scoped filters + scoped = new ScopedFilter(scoped.commonFilters(), List.of(), scoped.rightFilters); + optimizationApplied = true; + } + // push the right scoped filter down to the right child + // We don't want to execute this rule more than once per join, so we check if we + // already pushed some filters to the right, and we only apply the rule if join.candidateRightHandFilters() is not empty + if (scoped.rightFilters().isEmpty() == false && join.candidateRightHandFilters().isEmpty()) { + List rightPushableFilters = buildRightPushableFilters(scoped.rightFilters(), foldCtx); + if (rightPushableFilters.isEmpty() == false) { + List filters = new ArrayList<>(join.candidateRightHandFilters()); + if (filters.containsAll(rightPushableFilters) == false) { + filters.addAll(rightPushableFilters); + join = join.withCandidateRightHandFilters(filters); + optimizationApplied = true; + } + } + /* + We still want to reapply the filters that we just applied to the right child, + so we do NOT update scoped, and we do NOT mark optimizationApplied as true. + This is because by pushing them on the right side, we filter what rows we get from the right side + But we do not limit the output rows of the join as the rows are kept as not matched on the left side + So we end up applying the right filters twice, once on the right side and once on top of the join + This will result in major performance optimization when the lookup join is expanding + and applying the right filters reduces the expansion significantly. + For example, consider an expanding lookup join of 100,000 rows table with 10,000 lookup table + with filter of selectivity 0.1% on the right side(keeps 10 out of 10,000 rows of the lookup table). + In the non-optimized version the filter is not pushed to the right, and we get an explosion of records. + We have 100,000x10,000 = 1,000,000,000 rows after the join without the optimization. + Then we filter then out to only 1,000,000 rows. + With the optimization we apply the filter early so after the expanding join we only have 1,000,000 rows. + This reduced max number of rows used by a factor of 1,000 - // keep the remaining filters in place, otherwise return the new join; + In the future, once we have inner join support, it is usually possible to convert the lookup join into an inner join + This would allow us to not reapply the filters pushed to the right side again above the join, + as the inner join would only return rows that match on both sides. + */ + } + if (optimizationApplied) { + // if we pushed down some filters, we need to update the filters to reapply above the join Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters)); plan = remainingFilter != null ? filter.with(join, remainingFilter) : join; } @@ -156,6 +203,32 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { return plan; } + /** + * Builds the right pushable filters for the given expressions. + */ + private static List buildRightPushableFilters(List expressions, FoldContext foldCtx) { + return expressions.stream().filter(x -> isRightPushableFilter(x, foldCtx)).toList(); + } + + /** + * Determines if the given expression can be pushed down to the right side of a join. + * A filter is right pushable if the filter's predicate evaluates to false or null when all fields are set to null + */ + private static boolean isRightPushableFilter(Expression filter, FoldContext foldCtx) { + // traverse the filter tree + // replace any reference to an attribute with a null literal + Expression nullifiedFilter = filter.transformUp(Attribute.class, r -> new Literal(r.source(), null, r.dataType())); + // try to fold the filter + // check if the folded filter evaluates to false or null, if yes return true + // pushable WHERE field > 1 (evaluates to null), WHERE field is NOT NULL (evaluates to false) + // not pushable WHERE field is NULL (evaluates to true), WHERE coalesce(field, 10) == 10 (evaluates to true) + if (nullifiedFilter.foldable()) { + Object folded = nullifiedFilter.fold(foldCtx); + return folded == null || Boolean.FALSE.equals(folded); + } + return false; + } + private static Function NO_OP = expression -> expression; private static LogicalPlan maybePushDownPastUnary( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 3e5e64f729675..f4bae94e65666 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum; import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.ChangePoint; @@ -627,12 +628,17 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { // ON only with qualified names var predicates = expressions(condition.joinPredicate()); List joinFields = new ArrayList<>(predicates.size()); + List joinExpressions = new ArrayList<>(predicates.size()); for (var f : predicates) { // verify each field is an unresolved attribute if (f instanceof UnresolvedAttribute ua) { joinFields.add(ua); } else { - throw new ParsingException(f.source(), "JOIN ON clause only supports fields at the moment, found [{}]", f.sourceText()); + if (f instanceof EsqlBinaryComparison comparison) { + joinFields.add((Attribute) comparison.left()); + joinExpressions.add(f); + } + // throw new ParsingException(f.source(), "JOIN ON clause only supports fields at the moment, found [{}]", f.sourceText()); } } @@ -663,7 +669,7 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { if (hasRemotes && EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled() == false) { throw new ParsingException(source, "remote clusters are not supported with LOOKUP JOIN"); } - return new LookupJoin(source, p, right, joinFields, hasRemotes); + return new LookupJoin(source, p, right, joinFields, hasRemotes, joinExpressions); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java index 653722ec131f8..f3a2160546222 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java @@ -7,12 +7,14 @@ package org.elasticsearch.xpack.esql.plan.logical.join; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -152,7 +154,7 @@ public InlineJoin( List leftFields, List rightFields ) { - super(source, left, right, type, matchFields, leftFields, rightFields); + super(source, left, right, type, matchFields, leftFields, rightFields, false, null); } private static InlineJoin readFrom(StreamInput in) throws IOException { @@ -161,6 +163,9 @@ private static InlineJoin readFrom(StreamInput in) throws IOException { LogicalPlan left = in.readNamedWriteable(LogicalPlan.class); LogicalPlan right = in.readNamedWriteable(LogicalPlan.class); JoinConfig config = new JoinConfig(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + Expression ignored = in.readOptionalNamedWriteable(Expression.class); + } return new InlineJoin(source, left, replaceStub(left, right), config); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 30d67ab2cbd73..09f46aa87f2b1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plan.logical.join; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -15,6 +16,7 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; @@ -91,15 +93,24 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S private List lazyOutput; // Does this join involve remote indices? This is relevant only on the coordinating node, thus transient. private transient boolean isRemote = false; + private final List candidateRightHandFilters; public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) { - this(source, left, right, config, false); + this(source, left, right, config, false, null); } - public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config, boolean isRemote) { + public Join( + Source source, + LogicalPlan left, + LogicalPlan right, + JoinConfig config, + boolean isRemote, + List candidateRightHandFilters + ) { super(source, left, right); this.config = config; this.isRemote = isRemote; + this.candidateRightHandFilters = (candidateRightHandFilters != null) ? candidateRightHandFilters : new ArrayList<>(); } public Join( @@ -109,15 +120,24 @@ public Join( JoinType type, List matchFields, List leftFields, - List rightFields + List rightFields, + boolean isRemote, + List candidateRightHandFilters ) { super(source, left, right); this.config = new JoinConfig(type, matchFields, leftFields, rightFields); + this.isRemote = isRemote; + this.candidateRightHandFilters = (candidateRightHandFilters != null) ? candidateRightHandFilters : new ArrayList<>(); } public Join(StreamInput in) throws IOException { super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class), in.readNamedWriteable(LogicalPlan.class)); this.config = new JoinConfig(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + this.candidateRightHandFilters = in.readNamedWriteableCollectionAsList(Expression.class); + } else { + this.candidateRightHandFilters = new ArrayList<>(); + } } @Override @@ -126,6 +146,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteable(left()); out.writeNamedWriteable(right()); config.writeTo(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + out.writeNamedWriteableCollection(candidateRightHandFilters()); + } + // as the candidateRightHandFilters are optional it is OK to not write them if the node does not support it + // The query will still work and produce correct data, but performance might be worse } @Override @@ -149,7 +174,9 @@ protected NodeInfo info() { config.type(), config.matchFields(), config.leftFields(), - config.rightFields() + config.rightFields(), + isRemote(), + candidateRightHandFilters() ); } @@ -201,8 +228,13 @@ public static List computeOutput(List leftOutput, List rightOutputWithoutMatchFields = rightOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList(); + // AttributeSet rightKeys = AttributeSet.of(config.rightFields()); + Set leftAttrNames = config.leftFields().stream().map(Attribute::name).collect(java.util.stream.Collectors.toSet()); + // as we can do (left_key > right_key) now as join condition, we want to preserve the right_key, + // unless it is also a key on the left side, but we need to do name equality only + List rightOutputWithoutMatchFields = rightOutput.stream() + .filter(attr -> leftAttrNames.contains(attr.name()) == false) + .toList(); output = mergeOutputAttributes(rightOutputWithoutMatchFields, leftOutput); } else { throw new IllegalArgumentException(joinType.joinName() + " unsupported"); @@ -247,17 +279,25 @@ public boolean resolved() { } public Join withConfig(JoinConfig config) { - return new Join(source(), left(), right(), config, isRemote); + return new Join(source(), left(), right(), config, isRemote, candidateRightHandFilters()); + } + + public List candidateRightHandFilters() { + return this.candidateRightHandFilters; + } + + public Join withCandidateRightHandFilters(List candidateRightHandFilters) { + return new Join(source(), left(), right(), config(), isRemote, candidateRightHandFilters); } @Override public Join replaceChildren(LogicalPlan left, LogicalPlan right) { - return new Join(source(), left, right, config, isRemote); + return new Join(source(), left, right, config, isRemote, candidateRightHandFilters()); } @Override public int hashCode() { - return Objects.hash(config, left(), right(), isRemote); + return Objects.hash(config, left(), right(), isRemote, candidateRightHandFilters); } @Override @@ -273,7 +313,8 @@ public boolean equals(Object obj) { return config.equals(other.config) && Objects.equals(left(), other.left()) && Objects.equals(right(), other.right()) - && isRemote == other.isRemote; + && isRemote == other.isRemote + && Objects.equals(candidateRightHandFilters, other.candidateRightHandFilters); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java index 20913e0e27ce7..076398bf913be 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.plan.logical.Limit; @@ -29,12 +30,45 @@ */ public class LookupJoin extends Join implements SurrogateLogicalPlan, TelemetryAware, PostAnalysisVerificationAware { - public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List joinFields, boolean isRemote) { - this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), isRemote); + public LookupJoin( + Source source, + LogicalPlan left, + LogicalPlan right, + List joinFields, + boolean isRemote, + List candidateRightHandFilters + ) { + this( + source, + left, + right, + new UsingJoinType(LEFT, joinFields), + emptyList(), + emptyList(), + emptyList(), + isRemote, + candidateRightHandFilters + ); } - public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List joinFields) { - this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), false); + public LookupJoin( + Source source, + LogicalPlan left, + LogicalPlan right, + List joinFields, + List candidateRightHandFilters + ) { + this( + source, + left, + right, + new UsingJoinType(LEFT, joinFields), + emptyList(), + emptyList(), + emptyList(), + false, + candidateRightHandFilters + ); } public LookupJoin( @@ -45,17 +79,31 @@ public LookupJoin( List joinFields, List leftFields, List rightFields, - boolean isRemote + boolean isRemote, + List candidateRightHandFilters ) { - this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields), isRemote); + this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields), isRemote, candidateRightHandFilters); } - public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig) { - this(source, left, right, joinConfig, false); + public LookupJoin( + Source source, + LogicalPlan left, + LogicalPlan right, + JoinConfig joinConfig, + List candidateRightHandFilters + ) { + this(source, left, right, joinConfig, false, candidateRightHandFilters); } - public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig, boolean isRemote) { - super(source, left, right, joinConfig, isRemote); + public LookupJoin( + Source source, + LogicalPlan left, + LogicalPlan right, + JoinConfig joinConfig, + boolean isRemote, + List candidateRightHandFilters + ) { + super(source, left, right, joinConfig, isRemote, candidateRightHandFilters); } /** @@ -64,12 +112,12 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig @Override public LogicalPlan surrogate() { // TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now - return new Join(source(), left(), right(), config(), isRemote()); + return new Join(source(), left(), right(), config(), isRemote(), candidateRightHandFilters()); } @Override public Join replaceChildren(LogicalPlan left, LogicalPlan right) { - return new LookupJoin(source(), left, right, config(), isRemote()); + return new LookupJoin(source(), left, right, config(), isRemote(), candidateRightHandFilters()); } @Override @@ -83,7 +131,8 @@ protected NodeInfo info() { config().matchFields(), config().leftFields(), config().rightFields(), - isRemote() + isRemote(), + candidateRightHandFilters() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java index 2aff38993aa98..f1a119798c450 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java @@ -7,11 +7,13 @@ package org.elasticsearch.xpack.esql.plan.physical; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -37,6 +39,7 @@ public class LookupJoinExec extends BinaryExec implements EstimatesRowSize { * the right hand side by a {@link EsQueryExec}, and thus lose the information of which fields we'll get from the lookup index. */ private final List addedFields; + private final List candidateRightHandFilters; private List lazyOutput; public LookupJoinExec( @@ -45,12 +48,14 @@ public LookupJoinExec( PhysicalPlan lookup, List leftFields, List rightFields, - List addedFields + List addedFields, + List candidateRightHandFilters ) { super(source, left, lookup); this.leftFields = leftFields; this.rightFields = rightFields; this.addedFields = addedFields; + this.candidateRightHandFilters = (candidateRightHandFilters != null) ? candidateRightHandFilters : new ArrayList<>(); } private LookupJoinExec(StreamInput in) throws IOException { @@ -58,6 +63,11 @@ private LookupJoinExec(StreamInput in) throws IOException { this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class); this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class); this.addedFields = in.readNamedWriteableCollectionAsList(Attribute.class); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + this.candidateRightHandFilters = in.readNamedWriteableCollectionAsList(Expression.class); + } else { + this.candidateRightHandFilters = new ArrayList<>(); // For versions before the field was added, we default to null + } } @Override @@ -66,6 +76,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableCollection(leftFields); out.writeNamedWriteableCollection(rightFields); out.writeNamedWriteableCollection(addedFields); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + out.writeNamedWriteableCollection(getCandidateRightHandFilters()); + } + // as the candidateRightHandFilters are optional it is OK to not write them if the node does not support it + // it will still work, but performance might be worse + } + + public List getCandidateRightHandFilters() { + return candidateRightHandFilters; } @Override @@ -136,12 +155,12 @@ public AttributeSet rightReferences() { @Override public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) { - return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields); + return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields, candidateRightHandFilters); } @Override protected NodeInfo info() { - return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields); + return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields, candidateRightHandFilters); } @Override @@ -156,11 +175,14 @@ public boolean equals(Object o) { return false; } LookupJoinExec other = (LookupJoinExec) o; - return leftFields.equals(other.leftFields) && rightFields.equals(other.rightFields) && addedFields.equals(other.addedFields); + return leftFields.equals(other.leftFields) + && rightFields.equals(other.rightFields) + && addedFields.equals(other.addedFields) + && Objects.equals(candidateRightHandFilters, other.candidateRightHandFilters); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields); + return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields, candidateRightHandFilters); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 5ae15f4c0e844..bd7043c9e056f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -786,9 +786,23 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (input == null) { throw new IllegalArgumentException("can't plan [" + join + "][" + left + "]"); } - matchFields.add(new MatchConfig(right, input)); + // we support 2 types of joins: Field name joins and Expression joins + // for Field name join, we do not ship any join on expression. + // we built the Lucene query on the field name that is passed in the MatchConfig.fieldName + // so for Field name we need to pass the attribute name from the right side, because that is needed to build the query + // For expression joins, we pass an expression such as left_id > right_id. + // So in this case we pass in left_id as the field name, because that is what we are shipping to the lookup node + // The lookup node will replace that name, with the actual values for each row and perform the lookup join + // We need to pass the left name, because we need to know what data we have shipped. + // It is not acceptable to just use the left or right side of the operator because the same field can be joined multiple times + // e.g. LOOKUP JOIN ON left_id < right_id_1 and left_id >= right_id_2 + // we want to be able to optimize this in the future and only ship the left_id once + FieldAttribute.FieldName fieldName = right.exactAttribute().fieldName(); + if (isExpressionJoin(join)) { + fieldName = new FieldAttribute.FieldName(left.name()); + } + matchFields.add(new MatchConfig(fieldName, input)); } - return source.with( new LookupFromIndexOperator.Factory( matchFields, @@ -799,12 +813,17 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan localSourceExec.indexPattern(), indexName, join.addedFields().stream().map(f -> (NamedExpression) f).toList(), - join.source() + join.source(), + join.getCandidateRightHandFilters() ), layout ); } + private boolean isExpressionJoin(LookupJoinExec join) { + return join.getCandidateRightHandFilters().isEmpty() == false; + } + private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) { Layout.Builder layout = new Layout.Builder(); layout.append(localSourceExec.output()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index a1671bffc5c25..247f718a8fdd6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -112,7 +112,15 @@ private PhysicalPlan mapBinary(BinaryPlan binary) { ); } if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields(), + join.candidateRightHandFilters() + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index aac1d58e5f7f1..a6d7e29654bdf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -229,7 +229,15 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { if (right instanceof FragmentExec fragment && fragment.fragment() instanceof EsRelation relation && relation.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields(), + join.candidateRightHandFilters() + ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index 5f3d22957731a..16d4bd3a4d52d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -28,9 +28,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.IntBlock; -import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; @@ -62,12 +60,18 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.core.tree.Location; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.hamcrest.Matcher; import org.junit.After; @@ -76,6 +80,8 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +93,7 @@ public class LookupFromIndexOperatorTests extends AsyncOperatorTestCase { private static final int LOOKUP_SIZE = 1000; + private static final int LESS_THAN_VALUE = -40; private final ThreadPool threadPool = threadPool(); private final Directory lookupIndexDirectory = newDirectory(); private final List releasables = new ArrayList<>(); @@ -135,12 +142,17 @@ protected void assertSimpleOutput(List input, List results) { for (Page r : results) { assertThat(r.getBlockCount(), equalTo(numberOfJoinColumns + 2)); LongVector match = r.getBlock(0).asVector(); - BytesRefVector lkwd = r.getBlock(numberOfJoinColumns).asVector(); - IntVector lint = r.getBlock(numberOfJoinColumns + 1).asVector(); + BytesRefBlock lkwdBlock = r.getBlock(numberOfJoinColumns); + IntBlock lintBlock = r.getBlock(numberOfJoinColumns + 1); for (int p = 0; p < r.getPositionCount(); p++) { long m = match.getLong(p); - assertThat(lkwd.getBytesRef(p, new BytesRef()).utf8ToString(), equalTo("l" + m)); - assertThat(lint.getInt(p), equalTo((int) -m)); + if (m > Math.abs(LESS_THAN_VALUE)) { + assertThat(lkwdBlock.getBytesRef(lkwdBlock.getFirstValueIndex(p), new BytesRef()).utf8ToString(), equalTo("l" + m)); + assertThat(lintBlock.getInt(lintBlock.getFirstValueIndex(p)), equalTo((int) -m)); + } else { + assertTrue("at " + p, lkwdBlock.isNull(p)); + assertTrue("at " + p, lintBlock.isNull(p)); + } } } } @@ -162,6 +174,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match" + i); matchFields.add(new MatchConfig(matchField, i, inputDataType)); } + return new LookupFromIndexOperator.Factory( matchFields, sessionId, @@ -171,8 +184,23 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { lookupIndex, lookupIndex, loadFields, - Source.EMPTY + Source.EMPTY, + buildLessThanFilter(LESS_THAN_VALUE) + ); + } + + private List buildLessThanFilter(int value) { + FieldAttribute filterAttribute = new FieldAttribute( + Source.EMPTY, + "lint", + new EsField("lint", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) + ); + Expression lessThan = new LessThan( + new Source(new Location(0, 0), "lint < " + value), + filterAttribute, + new Literal(Source.EMPTY, value, DataType.INTEGER) ); + return List.of(lessThan); } @Override @@ -187,13 +215,15 @@ protected Matcher expectedToStringOfSimple() { for (int i = 0; i < numberOfJoinColumns; i++) { sb.append(" input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i); } - sb.append("]"); + sb.append(" optional_filter=lint < ").append(LESS_THAN_VALUE).append("]"); return matchesPattern(sb.toString()); } private LookupFromIndexService lookupService(DriverContext mainContext) { boolean beCranky = mainContext.bigArrays().breakerService() instanceof CrankyCircuitBreakerService; DiscoveryNode localNode = DiscoveryNodeUtils.create("node", "node"); + var builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + builtInClusterSettings.add(EsqlFlags.ESQL_STRING_LIKE_ON_INDEX); ClusterService clusterService = ClusterServiceUtils.createClusterService( threadPool, localNode, @@ -201,8 +231,9 @@ private LookupFromIndexService lookupService(DriverContext mainContext) { // Reserve 0 bytes in the sub-driver so we are more likely to hit the cranky breaker in it. .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofKb(0)) .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofKb(0)) + .put(EsqlFlags.ESQL_STRING_LIKE_ON_INDEX.getKey(), true) .build(), - ClusterSettings.createBuiltInClusterSettings() + new ClusterSettings(Settings.EMPTY, builtInClusterSettings) ); IndicesService indicesService = mock(IndicesService.class); IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index b6d66d673ea7a..a405ef2b77629 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -1382,7 +1382,7 @@ public void testPushdownLimitsPastLeftJoin() { var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), List.of()); var join = switch (randomIntBetween(0, 2)) { case 0 -> new Join(EMPTY, leftChild, rightChild, joinConfig); - case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig); + case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig, false, null); case 2 -> new InlineJoin(EMPTY, leftChild, rightChild, joinConfig); default -> throw new IllegalArgumentException(); }; @@ -1894,6 +1894,104 @@ public void testCombineOrderByThroughFilter() { as(filter.child(), EsRelation.class); } + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNULL(language_name{f}#19)] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testDoNotPushDownIsNullFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NULL + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + assertThat(join.right(), instanceOf(EsRelation.class)); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[language_name{f}#19 > a[KEYWORD]] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false,language_name{f}#19 > a[KEYWORD]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testPushDownGreaterThanFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name > "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + var right = as(join.right(), EsRelation.class); + assertThat(join.candidateRightHandFilters().toString(), is("[language_name > \"a\"]")); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[COALESCE(language_name{f}#19,a[KEYWORD]) == a[KEYWORD]] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testDoNotPushDownCoalesceFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE COALESCE(language_name, "a") == "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + assertThat(join.right(), instanceOf(EsRelation.class)); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, languages{f}#10 + * AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNOTNULL(language_name{f}#19)] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false,ISNOTNULL(language_name{f}#19)] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testPushDownIsNotNullFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NOT NULL + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + var right = as(join.right(), EsRelation.class); + assertThat(join.candidateRightHandFilters().toString(), is("[language_name IS NOT NULL]")); + } + /** * Expected *
{@code
@@ -7050,11 +7148,11 @@ public void testLookupJoinPushDownFilterOnLeftSideField() {
      * Expects
      *
      * 
{@code
-     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16,
-     *          languages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
+     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
      * \_Limit[1000[INTEGER],false]
-     *   \_Filter[language_name{f}#19 == [45 6e 67 6c 69 73 68][KEYWORD]]
-     *     \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]]
+     *   \_Filter[language_name{f}#19 == English[KEYWORD]]
+     *     \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false,language_name{f}#19 == English[KEYWORD]]
      *       |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
      *       \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
      * }
@@ -7085,7 +7183,9 @@ public void testLookupJoinPushDownDisabledForLookupField() { assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); var leftRel = as(join.left(), EsRelation.class); - var rightRel = as(join.right(), EsRelation.class); + assertEquals("[language_name == \"English\"]", join.candidateRightHandFilters().toString()); + var joinRightEsRelation = as(join.right(), EsRelation.class); + } /** @@ -7094,11 +7194,11 @@ public void testLookupJoinPushDownDisabledForLookupField() { * Expects * *
{@code
-     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17,
-     *          languages{f}#11 AS language_code#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
+     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan
+     * guages{f}#11 AS language_code#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
      * \_Limit[1000[INTEGER],false]
-     *   \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD]]
-     *     \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19]]
+     *   \_Filter[language_name{f}#20 == English[KEYWORD]]
+     *     \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19],false,language_name{f}#20 == English[KEYWORD]]
      *       |_Filter[emp_no{f}#8 > 1[INTEGER]]
      *       | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
      *       \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20]
@@ -7138,6 +7238,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel
         assertThat(literal.value(), equalTo(1));
 
         var leftRel = as(filter.child(), EsRelation.class);
+        assertEquals("[language_name == \"English\"]", join.candidateRightHandFilters().toString());
         var rightRel = as(join.right(), EsRelation.class);
     }
 
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java
index f3d1107983628..27e0e63e0102f 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java
@@ -14,18 +14,25 @@
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
+import org.elasticsearch.xpack.esql.core.expression.FoldContext;
 import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
 import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
 import org.elasticsearch.xpack.esql.expression.function.scalar.math.Pow;
+import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
 import org.elasticsearch.xpack.esql.expression.function.scalar.string.regex.RLike;
 import org.elasticsearch.xpack.esql.expression.function.scalar.string.regex.WildcardLike;
 import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
 import org.elasticsearch.xpack.esql.expression.predicate.logical.And;
+import org.elasticsearch.xpack.esql.expression.predicate.logical.Not;
+import org.elasticsearch.xpack.esql.expression.predicate.logical.Or;
+import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNull;
+import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan;
+import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
 import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
 import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
@@ -34,16 +41,23 @@
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
 import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.FIVE;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.FOUR;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.ONE;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.SIX;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.THREE;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TWO;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
@@ -60,6 +74,8 @@
 
 public class PushDownAndCombineFiltersTests extends ESTestCase {
 
+    private final LogicalOptimizerContext optimizerContext = new LogicalOptimizerContext(null, FoldContext.small());
+
     public void testCombineFilters() {
         EsRelation relation = relation();
         GreaterThan conditionA = greaterThanOf(getFieldAttribute("a"), ONE);
@@ -68,7 +84,10 @@ public void testCombineFilters() {
         Filter fa = new Filter(EMPTY, relation, conditionA);
         Filter fb = new Filter(EMPTY, fa, conditionB);
 
-        assertEquals(new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), new PushDownAndCombineFilters().apply(fb));
+        assertEquals(
+            new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)),
+            new PushDownAndCombineFilters().apply(fb, optimizerContext)
+        );
     }
 
     public void testCombineFiltersLikeRLike() {
@@ -79,7 +98,10 @@ public void testCombineFiltersLikeRLike() {
         Filter fa = new Filter(EMPTY, relation, conditionA);
         Filter fb = new Filter(EMPTY, fa, conditionB);
 
-        assertEquals(new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), new PushDownAndCombineFilters().apply(fb));
+        assertEquals(
+            new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)),
+            new PushDownAndCombineFilters().apply(fb, optimizerContext)
+        );
     }
 
     public void testPushDownFilter() {
@@ -93,7 +115,7 @@ public void testPushDownFilter() {
         Filter fb = new Filter(EMPTY, keep, conditionB);
 
         Filter combinedFilter = new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB));
-        assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb));
+        assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb, optimizerContext));
     }
 
     public void testPushDownFilterPastRenamingProject() {
@@ -111,7 +133,7 @@ public void testPushDownFilterPastRenamingProject() {
         LessThan bRenamedLessThanTwo = lessThanOf(bRenamed.toAttribute(), TWO);
         Filter filter = new Filter(EMPTY, project, Predicates.combineAnd(List.of(aRenamedTwiceGreaterThanOne, bRenamedLessThanTwo)));
 
-        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
 
         Project optimizedProject = as(optimized, Project.class);
         assertEquals(optimizedProject.projections(), project.projections());
@@ -184,7 +206,7 @@ public void testPushDownFilterOnAliasInEval() {
 
                 Filter filter = new Filter(EMPTY, eval, Predicates.combineAnd(conditions));
 
-                LogicalPlan plan = new PushDownAndCombineFilters().apply(filter);
+                LogicalPlan plan = new PushDownAndCombineFilters().apply(filter, optimizerContext);
 
                 if (numNonPushable > 0) {
                     Filter optimizedFilter = as(plan, Filter.class);
@@ -216,7 +238,7 @@ public void testPushDownLikeRlikeFilter() {
         Filter fb = new Filter(EMPTY, keep, conditionB);
 
         Filter combinedFilter = new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB));
-        assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb));
+        assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb, optimizerContext));
     }
 
     // from ... | where a > 1 | stats count(1) by b | where count(1) >= 3 and b < 2
@@ -244,7 +266,7 @@ public void testSelectivelyPushDownFilterPastFunctionAgg() {
             ),
             aggregateCondition
         );
-        assertEquals(expected, new PushDownAndCombineFilters().apply(fb));
+        assertEquals(expected, new PushDownAndCombineFilters().apply(fb, optimizerContext));
     }
 
     // from ... | where a > 1 | COMPLETION completion = "some prompt" WITH { "inferenceId' : "inferenceId" } | where b < 2 and
@@ -284,7 +306,7 @@ public void testPushDownFilterPastCompletion() {
             conditionCompletion
         );
 
-        assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB));
+        assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB, optimizerContext));
     }
 
     // from ... | where a > 1 | RERANK "query" ON title WITH { "inference_id" : "inferenceId" } | where b < 2 and _score > 1
@@ -317,7 +339,7 @@ public void testPushDownFilterPastRerank() {
             scoreCondition
         );
 
-        assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB));
+        assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB, optimizerContext));
     }
 
     private static Completion completion(LogicalPlan child) {
@@ -348,4 +370,172 @@ private static EsRelation relation() {
     private static EsRelation relation(List fieldAttributes) {
         return new EsRelation(EMPTY, randomIdentifier(), randomFrom(IndexMode.values()), Map.of(), fieldAttributes);
     }
+
+    public void testPushDownFilterPastLeftJoinWithPushable() {
+        Join join = createLeftJoin();
+        EsRelation left = (EsRelation) join.left();
+        FieldAttribute c = (FieldAttribute) join.right().output().get(0);
+
+        // Pushable filter
+        Expression pushableCondition = greaterThanOf(c, ONE);
+        Filter filter = new Filter(EMPTY, join, pushableCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+        // The filter should still be on top
+        Filter topFilter = as(optimized, Filter.class);
+        assertEquals(pushableCondition, topFilter.condition());
+        Join optimizedJoin = as(topFilter.child(), Join.class);
+        assertEquals(left, optimizedJoin.left());
+        assertEquals(1, optimizedJoin.candidateRightHandFilters().size());
+        assertEquals(pushableCondition, optimizedJoin.candidateRightHandFilters().get(0));
+    }
+
+    public void testPushDownFilterPastLeftJoinWithNonPushable() {
+        Join join = createLeftJoin();
+        FieldAttribute c = (FieldAttribute) join.right().output().get(0);
+
+        // Non-pushable filter
+        Expression nonPushableCondition = new IsNull(EMPTY, c);
+        Filter filter = new Filter(EMPTY, join, nonPushableCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+        // No optimization should be applied, the plan should be the same
+        assertEquals(filter, optimized);
+        // And the join inside should not have candidate filters
+        Join innerJoin = as(as(optimized, Filter.class).child(), Join.class);
+        assertTrue(innerJoin.candidateRightHandFilters().isEmpty());
+    }
+
+    public void testPushDownFilterPastLeftJoinWithPartiallyPushableAnd() {
+        Join join = createLeftJoin();
+        EsRelation left = (EsRelation) join.left();
+        FieldAttribute c = (FieldAttribute) join.right().output().get(0);
+
+        Expression pushableCondition = greaterThanOf(c, ONE);
+        Expression nonPushableCondition = new IsNull(EMPTY, c);
+
+        // Partially pushable filter
+        Expression partialCondition = new And(EMPTY, pushableCondition, nonPushableCondition);
+        Filter filter = new Filter(EMPTY, join, partialCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+        Filter topFilter = as(optimized, Filter.class);
+        // The top filter condition should be the original one
+        assertEquals(partialCondition, topFilter.condition());
+        Join optimizedJoin = as(topFilter.child(), Join.class);
+        assertEquals(left, optimizedJoin.left());
+        assertEquals(1, optimizedJoin.candidateRightHandFilters().size());
+        // Only the pushable part should be a candidate
+        assertEquals(pushableCondition, optimizedJoin.candidateRightHandFilters().get(0));
+    }
+
+    public void testPushDownFilterPastLeftJoinWithOr() {
+        Join join = createLeftJoin();
+        FieldAttribute c = (FieldAttribute) join.right().output().get(0);
+
+        Expression pushableCondition = greaterThanOf(c, ONE);
+        Expression nonPushableCondition = new IsNull(EMPTY, c);
+
+        // OR of pushable and non-pushable filter
+        Expression orCondition = new Or(EMPTY, pushableCondition, nonPushableCondition);
+        Filter filter = new Filter(EMPTY, join, orCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+        // No optimization should be applied, the plan should be the same
+        assertEquals(filter, optimized);
+        // And the join inside should not have candidate filters
+        Join innerJoin = as(as(optimized, Filter.class).child(), Join.class);
+        assertTrue(innerJoin.candidateRightHandFilters().isEmpty());
+    }
+
+    public void testPushDownFilterPastLeftJoinWithNotPushable() {
+        Join join = createLeftJoin();
+        FieldAttribute c = (FieldAttribute) join.right().output().get(0);
+
+        Expression pushableCondition = greaterThanOf(c, ONE);
+
+        // NOT pushable filter
+        Expression notPushableCondition = new Not(EMPTY, pushableCondition);
+        Filter filter = new Filter(EMPTY, join, notPushableCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+        Filter topFilter = as(optimized, Filter.class);
+        assertEquals(notPushableCondition, topFilter.condition());
+        Join optimizedJoin = as(topFilter.child(), Join.class);
+        assertEquals(1, optimizedJoin.candidateRightHandFilters().size());
+        assertEquals(notPushableCondition, optimizedJoin.candidateRightHandFilters().get(0));
+    }
+
+    public void testPushDownFilterPastLeftJoinWithNotNonPushable() {
+        Join join = createLeftJoin();
+        FieldAttribute c = (FieldAttribute) join.right().output().get(0);
+
+        Expression nonPushableCondition = new IsNull(EMPTY, c);
+
+        // NOT non-pushable filter
+        Expression notNonPushableCondition = new Not(EMPTY, nonPushableCondition);
+        Filter filter = new Filter(EMPTY, join, notNonPushableCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+        Filter topFilter = as(optimized, Filter.class);
+        assertEquals(notNonPushableCondition, topFilter.condition());
+        Join optimizedJoin = as(topFilter.child(), Join.class);
+        assertEquals(1, optimizedJoin.candidateRightHandFilters().size());
+        assertEquals(notNonPushableCondition, optimizedJoin.candidateRightHandFilters().get(0));
+    }
+
+    public void testPushDownFilterPastLeftJoinWithComplexMix() {
+        // Setup
+        FieldAttribute a = getFieldAttribute("a");
+        FieldAttribute c = getFieldAttribute("c");
+        FieldAttribute d = getFieldAttribute("d");
+        FieldAttribute e = getFieldAttribute("e");
+        FieldAttribute f = getFieldAttribute("f");
+        FieldAttribute g = getFieldAttribute("g");
+        EsRelation left = relation(List.of(a, getFieldAttribute("b")));
+        EsRelation right = relation(List.of(c, d, e, f, g));
+        JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(a), List.of(a), List.of(c));
+        Join join = new Join(EMPTY, left, right, joinConfig);
+
+        // Predicates
+        Expression p1 = greaterThanOf(c, ONE);                                  // pushable
+        Expression p2 = new Not(EMPTY, new IsNull(EMPTY, d));     // pushable (d IS NOT NULL)
+        Expression p3 = lessThanOf(e, THREE);                                   // pushable
+        Expression p4 = rlike(f, "pat");                                   // pushable
+        Expression p5 = new Not(EMPTY, new IsNull(EMPTY, g));     // pushable (g IS NOT NULL)
+        Expression p6 = greaterThanOf(c, TWO);                                  // pushable
+        Expression p7 = lessThanOf(d, FOUR);                                    // pushable
+        Expression p8 = greaterThanOf(e, FIVE);                                 // pushable
+
+        Expression np1 = new IsNull(EMPTY, c);                           // non-pushable (c IS NULL)
+        Expression np2 = new Equals(EMPTY, new Coalesce(EMPTY, d, List.of(SIX)), SIX); // non-pushable
+
+        // Build a complex condition
+        // np2 AND ((p1 AND p2 AND p3 AND p4 AND p5) AND (np1 OR (p6 AND p7) OR (p8 AND np2))) AND p1 AND p6
+        Expression pushableBranch = Predicates.combineAnd(List.of(p1, p2, p3, p4, p5));
+        Expression nonPushableBranch = new Or(EMPTY, np1, new Or(EMPTY, new And(EMPTY, p6, p7), new And(EMPTY, p8, np2)));
+        Expression complexCondition = new And(EMPTY, pushableBranch, nonPushableBranch);
+        complexCondition = Predicates.combineAnd(List.of(np2, complexCondition, p1, p6));
+
+        Filter filter = new Filter(EMPTY, join, complexCondition);
+        LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext);
+
+        // The top filter with the original condition should remain, but the structure of the AND tree might have changed.
+        // So, we flatten the conditions and compare them as a set.
+        Filter topFilter = as(optimized, Filter.class);
+        Set actualTopPredicates = new HashSet<>(Predicates.splitAnd(topFilter.condition()));
+        Set expectedTopPredicates = new HashSet<>(List.of(p1, p2, p3, p4, p5, nonPushableBranch, np2, p1, p6));
+        assertEquals(expectedTopPredicates, actualTopPredicates);
+
+        // The pushable part of the filter should be added as a candidate to the join
+        Join optimizedJoin = as(topFilter.child(), Join.class);
+        assertEquals(left, optimizedJoin.left());
+        Set actualPushable = new HashSet<>(optimizedJoin.candidateRightHandFilters());
+        Set expectedPushable = new HashSet<>(List.of(p1, p2, p3, p4, p5, p6));
+        assertEquals(expectedPushable, actualPushable);
+    }
+
+    private Join createLeftJoin() {
+        FieldAttribute a = getFieldAttribute("a");
+        FieldAttribute c = getFieldAttribute("c");
+        EsRelation left = relation(List.of(a, getFieldAttribute("b")));
+        EsRelation right = relation(List.of(c));
+
+        JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(a), List.of(a), List.of(c));
+        return new Join(EMPTY, left, right, joinConfig);
+    }
 }
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java
index e7c08e58e0e1d..bd8b5f77272c2 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java
@@ -4628,13 +4628,19 @@ public void testMixedSingleDoubleParams() {
             String param2 = randomBoolean() ? "?" : "??";
             String param3 = randomBoolean() ? "?" : "??";
             if (param1.equals("?") || param2.equals("?") || param3.equals("?")) {
-                expectError(
-                    LoggerMessageFormat.format(null, "from test | " + command, param1, param2, param3),
-                    List.of(paramAsConstant("f1", "f1"), paramAsConstant("f2", "f2"), paramAsConstant("f3", "f3")),
-                    command.contains("join")
-                        ? "JOIN ON clause only supports fields at the moment"
-                        : "declared as a constant, cannot be used as an identifier"
-                );
+                if (command.contains("join")) {
+                    plan = statement(
+                        LoggerMessageFormat.format(null, "from test | " + command, param1, param2, param3),
+                        new QueryParams(List.of(paramAsConstant("f1", "f1"), paramAsConstant("f2", "f2"), paramAsConstant("f3", "f3")))
+                    );
+                    assertNotNull(plan);
+                } else {
+                    expectError(
+                        LoggerMessageFormat.format(null, "from test | " + command, param1, param2, param3),
+                        List.of(paramAsConstant("f1", "f1"), paramAsConstant("f2", "f2"), paramAsConstant("f3", "f3")),
+                        "declared as a constant, cannot be used as an identifier"
+                    );
+                }
             }
         }
     }
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java
index e55a04c03cd1b..c018a8de7107d 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java
@@ -178,7 +178,7 @@ private static LogicalPlan createInstance(Class clazz, Lo
                 return new Sample(source, null, child);
             }
             case "LookupJoin" -> {
-                return new LookupJoin(source, child, child, List.of());
+                return new LookupJoin(source, child, child, List.of(), List.of());
             }
             case "Limit" -> {
                 return new Limit(source, null, child);