diff --git a/docs/changelog/133166.yaml b/docs/changelog/133166.yaml new file mode 100644 index 0000000000000..eadd1b2bad8be --- /dev/null +++ b/docs/changelog/133166.yaml @@ -0,0 +1,6 @@ +pr: 133166 +summary: Improve Expanding Lookup Join performance by pushing a filter to the right + side of the lookup join +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 641312f10ac4c..5ffc32339d8cf 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -357,6 +357,7 @@ static TransportVersion def(int id) { public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_148_0_00); public static final TransportVersion RESOLVE_INDEX_MODE_FILTER = def(9_149_0_00); public static final TransportVersion SEMANTIC_QUERY_MULTIPLE_INFERENCE_IDS = def(9_150_0_00); + public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_151_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java b/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java index 24be16d3ee6df..77a13865f8d7c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java @@ -387,7 +387,7 @@ public Query regexpQuery( } /** - * Returns a Lucine pushable Query for the current field + * Returns a Lucene pushable Query for the current field * For now can only be AutomatonQuery or MatchAllDocsQuery() or MatchNoDocsQuery() */ public Query automatonQuery( diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index b463202221dc1..2f7b21fa3564b 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -698,7 +698,7 @@ private Map fetchMvLongs() throws IOException { public void testLookupExplosion() throws IOException { int sensorDataCount = 400; int lookupEntries = 10000; - Map map = lookupExplosion(sensorDataCount, lookupEntries, 1); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); } @@ -706,18 +706,34 @@ public void testLookupExplosionManyFields() throws IOException { int sensorDataCount = 400; int lookupEntries = 1000; int joinFieldsCount = 990; - Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount); + Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); } public void testLookupExplosionManyMatchesManyFields() throws IOException { // 1500, 10000 is enough locally, but some CI machines need more. - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30)); + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries)); } public void testLookupExplosionManyMatches() throws IOException { // 1500, 10000 is enough locally, but some CI machines need more. - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1)); + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries)); + } + + public void testLookupExplosionManyMatchesFiltered() throws IOException { + // This test will only work with the expanding join optimization + // that pushes the filter to the right side of the lookup. + // Without the optimization, it will fail with circuit_breaking_exception + int sensorDataCount = 10000; + int lookupEntries = 10000; + int reductionFactor = 1000; // reduce the number of matches by this factor + // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value + assertTrue(0 == lookupEntries % reductionFactor); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor)))); + } public void testLookupExplosionNoFetch() throws IOException { @@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException { assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1)); } - private Map lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException { + private Map lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep) + throws IOException { try { lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount); StringBuilder query = startQuery(); @@ -755,7 +772,14 @@ private Map lookupExplosion(int sensorDataCount, int lookupEntri } query.append("id").append(i); } - query.append(" | STATS COUNT(location)\"}"); + if (lookupEntries != lookupEntriesToKeep) { + // add a filter to reduce the number of matches + // we add both a Lucene pushable filter and a non-pushable filter + // this is to make sure that even if there are non-pushable filters the pushable filters is still applied + query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep); + + } + query.append(" | STATS COUNT(location) | LIMIT 100\"}"); return responseAsMap(query(query.toString(), null)); } finally { deleteIndex("sensor_data"); @@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction queryLists; - - public ExpressionQueryList(List queryLists) { - if (queryLists.size() < 2) { - throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists"); - } - this.queryLists = queryLists; - } - - @Override - public Query getQuery(int position) { - BooleanQuery.Builder builder = new BooleanQuery.Builder(); - for (QueryList queryList : queryLists) { - Query q = queryList.getQuery(position); - if (q == null) { - // if any of the matchFields are null, it means there is no match for this position - // A AND NULL is always NULL, so we can skip this position - return null; - } - builder.add(q, BooleanClause.Occur.FILTER); - } - return builder.build(); - } - - @Override - public int getPositionCount() { - int positionCount = queryLists.get(0).getPositionCount(); - for (QueryList queryList : queryLists) { - if (queryList.getPositionCount() != positionCount) { - throw new IllegalStateException( - "All QueryLists must have the same position count, expected: " - + positionCount - + ", but got: " - + queryList.getPositionCount() - ); - } - } - return positionCount; - } -} diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java index e1a2110111d1e..5bffb1c61f30e 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java @@ -87,7 +87,7 @@ protected final Operator.OperatorFactory simple() { /** * Makes sure the description of {@link #simple} matches the {@link #expectedDescriptionOfSimple}. */ - public final void testSimpleDescription() { + public void testSimpleDescription() { Operator.OperatorFactory factory = simple(); String description = factory.describe(); assertThat(description, expectedDescriptionOfSimple()); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 3cbef5f9655c0..2aa88f47a0a96 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -5285,3 +5285,311 @@ null | null | bar2 | null | null null | null | corge | null | null null | null | fred | null | null ; + + +lookupJoinWithPushableFilterOnRight +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithPushableFilterOnRightOneField +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int +| WHERE other2 > 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithTwoPushableFiltersOnRight +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 +| WHERE other1 like "*ta" +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +; + +lookupJoinWithCoalesceFilterOnRight +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE COALESCE(other1, "zeta") == "zeta" +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +[1, 19, 21] | null | zyx | null | null +4 | David | qux | zeta | 6000 +9 | null | waldo | null | null +10 | null | fred | null | null +15 | null | bar2 | null | null +[17, 18] | null | xyz | null | null +null | null | plugh | null | null +; + +lookupJoinWithIsNullFilterOnRight +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other1 IS NULL +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +[1, 19, 21] | null | zyx | null | null +9 | null | waldo | null | null +10 | null | fred | null | null +15 | null | bar2 | null | null +[17, 18] | null | xyz | null | null +null | null | plugh | null | null +; + +lookupJoinWithIsNullJoinKey +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON name_str +| WHERE name_str IS NULL +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +null | null | corge | null | null +; + +lookupJoinWithMixLeftAndRightFilters +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE other2 > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + +lookupJoinWithMixLeftAndRightFiltersNotPushableToLucene +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron") +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsNotPushable +required_capability: join_lookup_v12 +required_capability: lookup_join_on_multiple_fields + +FROM multi_column_joinable +| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool +| WHERE ABS(other2) > id_int + 5000 +| KEEP id_int, name_str, extra1, other1, other2 +| SORT id_int, name_str, extra1, other1, other2 +| LIMIT 20 +; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:23: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:23: java.lang.IllegalArgumentException: single-value function encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +4 | David | qux | zeta | 6000 +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; + + +lookupJoinWithMixJoinAndNonJoinColumnsPushable + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE other2 > id_int + 5000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:18: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:18: java.lang.IllegalArgumentException: single-value function encountered multi-value + + id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer + 4 | David | qux | zeta | 6000 + 5 | Eve | quux | eta | 7000 + 5 | Eve | quux | theta | 8000 + 6 | null | corge | iota | 9000 + 7 | Grace | grault | kappa | 10000 + 8 | Hank | garply | lambda | 11000 + 12 | Liam | xyzzy | nu | 13000 + 13 | Mia | thud | xi | 14000 + 14 | Nina | foo2 | omicron | 15000 + ; + +lookupJoinWithMixPushableAndUnpushableFilters + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE other2 > id_int + 5000 AND (extra1 == "qux" OR extra1 == "zyx") AND other1 like "*ta" AND ABS(other2) > 5500 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value +warning:Line 3:18: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:18: java.lang.IllegalArgumentException: single-value function encountered multi-value + + id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer + 4 | David | qux | zeta | 6000 + ; + + lookupJoinWithJoinAttrFilter + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE id_int > 7 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 3:9: evaluation of [id_int > 7] failed, treating result as null. Only first 20 failures recorded. +warning:Line 3:9: java.lang.IllegalArgumentException: single-value function encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +8 | Hank | garply | lambda | 11000 +9 | null | waldo | null | null +10 | null | fred | null | null +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +15 | null | bar2 | null | null +; + + +lookupJoinWithExpressionOfOtherFields + required_capability: join_lookup_v12 + required_capability: lookup_join_on_multiple_fields + + FROM multi_column_joinable + | LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool + | WHERE ABS(other2) > LENGTH(other1)*1000 + 2000 + | KEEP id_int, name_str, extra1, other1, other2 + | SORT id_int, name_str, extra1, other1, other2 + | LIMIT 20 + ; + +warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded. +warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value + +id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer +5 | Eve | quux | eta | 7000 +5 | Eve | quux | theta | 8000 +6 | null | corge | iota | 9000 +7 | Grace | grault | kappa | 10000 +8 | Hank | garply | lambda | 11000 +12 | Liam | xyzzy | nu | 13000 +13 | Mia | thud | xi | 14000 +14 | Nina | foo2 | omicron | 15000 +; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index dc8a14abf1e12..4056572d38b17 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -22,7 +22,6 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.LongBlock; -import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSliceQueue; @@ -36,6 +35,7 @@ import org.elasticsearch.compute.test.BlockTestUtils; import org.elasticsearch.compute.test.TestDriverFactory; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; @@ -54,12 +54,20 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator; import org.elasticsearch.xpack.esql.enrich.MatchConfig; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -75,6 +83,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Predicate; import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; @@ -83,13 +92,14 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase { public void testKeywordKey() throws IOException { - runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } })); + runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }), null); } public void testJoinOnTwoKeys() throws IOException { runLookup( List.of(DataType.KEYWORD, DataType.LONG), - new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } }) + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } }), + null ); } @@ -101,7 +111,8 @@ public void testJoinOnThreeKeys() throws IOException { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L }, new String[] { "one", "two", "three", "four" }, } - ) + ), + null ); } @@ -114,25 +125,35 @@ public void testJoinOnFourKeys() throws IOException { new Long[] { 12L, 33L, 1L, 42L }, new String[] { "one", "two", "three", "four" }, new Integer[] { 1, 2, 3, 4 }, } - ) + ), + buildGreaterThanFilter(1L) ); } public void testLongKey() throws IOException { - runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } })); + runLookup( + List.of(DataType.LONG), + new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), + buildGreaterThanFilter(0L) + ); } /** * LOOKUP multiple results match. */ public void testLookupIndexMultiResults() throws IOException { - runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } })); + runLookup( + List.of(DataType.KEYWORD), + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), + buildGreaterThanFilter(-1L) + ); } public void testJoinOnTwoKeysMultiResults() throws IOException { runLookup( List.of(DataType.KEYWORD, DataType.LONG), - new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } }) + new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } }), + null ); } @@ -144,12 +165,13 @@ public void testJoinOnThreeKeysMultiResults() throws IOException { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L }, new String[] { "one", "two", "two", "four" } } - ) + ), + null ); } interface PopulateIndices { - void populate(int docCount, List expected) throws IOException; + void populate(int docCount, List expected, Predicate filter) throws IOException; } class UsingSingleLookupTable implements PopulateIndices { @@ -171,7 +193,7 @@ class UsingSingleLookupTable implements PopulateIndices { } @Override - public void populate(int docCount, List expected) { + public void populate(int docCount, List expected, Predicate filter) { List docs = new ArrayList<>(); int numFields = lookupData.length; int numRows = lookupData[0].length; @@ -190,8 +212,13 @@ public void populate(int docCount, List expected) { } else { keyString = String.join(",", key.stream().map(String::valueOf).toArray(String[]::new)); } - for (Integer match : matches.get(key)) { - expected.add(keyString + ":" + match); + List filteredMatches = matches.get(key).stream().filter(filter).toList(); + if (filteredMatches.isEmpty()) { + expected.add(keyString + ":null"); + } else { + for (Integer match : filteredMatches) { + expected.add(keyString + ":" + match); + } } } for (int i = 0; i < numRows; i++) { @@ -207,7 +234,19 @@ public void populate(int docCount, List expected) { } } - private void runLookup(List keyTypes, PopulateIndices populateIndices) throws IOException { + private PhysicalPlan buildGreaterThanFilter(long value) { + FieldAttribute filterAttribute = new FieldAttribute( + Source.EMPTY, + "l", + new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) + ); + Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG)); + EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), List.of()); + Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan); + return new FragmentExec(filter); + } + + private void runLookup(List keyTypes, PopulateIndices populateIndices, PhysicalPlan filters) throws IOException { String[] fieldMappers = new String[keyTypes.size() * 2]; for (int i = 0; i < keyTypes.size(); i++) { fieldMappers[2 * i] = "key" + i; @@ -236,9 +275,23 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get(); + Predicate filterPredicate = l -> true; + if (filters instanceof FragmentExec fragmentExec) { + if (fragmentExec.fragment() instanceof Filter filter + && filter.condition() instanceof GreaterThan gt + && gt.left() instanceof FieldAttribute fa + && fa.name().equals("l") + && gt.right() instanceof Literal lit) { + long value = ((Number) lit.value()).longValue(); + filterPredicate = l -> l > value; + } else { + fail("Unsupported filter type in test baseline generation: " + filters); + } + } + int docCount = between(10, 1000); List expected = new ArrayList<>(docCount); - populateIndices.populate(docCount, expected); + populateIndices.populate(docCount, expected, filterPredicate); /* * Find the data node hosting the only shard of the source index. @@ -331,7 +384,8 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) "lookup", "lookup", List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))), - Source.EMPTY + Source.EMPTY, + filters ); DriverContext driverContext = driverContext(); try ( @@ -345,7 +399,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) for (int i = 0; i < keyTypes.size(); i++) { keyBlocks.add(page.getBlock(i + 1)); } - LongVector loadedBlock = page.getBlock(keyTypes.size() + 1).asVector(); + LongBlock loadedBlock = page.getBlock(keyTypes.size() + 1); for (int p = 0; p < page.getPositionCount(); p++) { StringBuilder result = new StringBuilder(); for (int j = 0; j < keyBlocks.size(); j++) { @@ -361,7 +415,11 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) } } - result.append(":" + loadedBlock.getLong(p)); + if (loadedBlock.isNull(p)) { + result.append(":null"); + } else { + result.append(":" + loadedBlock.getLong(loadedBlock.getFirstValueIndex(p))); + } results.add(result.toString()); } } finally { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java new file mode 100644 index 0000000000000..47d137bbbfe83 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; +import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.xpack.esql.capabilities.TranslationAware; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; +import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.stats.SearchContextStats; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; + +/** + * A {@link LookupEnrichQueryGenerator} that combines multiple conditions into a single query list. + * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position. + * In addition, we support an optional pre-join filter that will be applied to all queries if it is pushable. + * If the pre-join filter cannot be pushed down to Lucene, it will be ignored. + */ +public class ExpressionQueryList implements LookupEnrichQueryGenerator { + private final List queryLists; + private final List preJoinFilters = new ArrayList<>(); + private final SearchExecutionContext context; + + public ExpressionQueryList( + List queryLists, + SearchExecutionContext context, + PhysicalPlan rightPreJoinPlan, + ClusterService clusterService + ) { + if (queryLists.size() < 2 && (rightPreJoinPlan instanceof FilterExec == false)) { + throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists or a pre-join filter"); + } + this.queryLists = queryLists; + this.context = context; + buildPreJoinFilter(rightPreJoinPlan, clusterService); + } + + private void addToPreJoinFilters(QueryBuilder query) { + try { + if (query != null) { + preJoinFilters.add(query.toQuery(context)); + } + } catch (IOException e) { + throw new UncheckedIOException("Error while building query for PreJoinFilters filter", e); + } + } + + private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) { + if (rightPreJoinPlan instanceof FilterExec filterExec) { + List candidateRightHandFilters = Predicates.splitAnd(filterExec.condition()); + LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from( + SearchContextStats.from(List.of(context)), + new EsqlFlags(clusterService.getClusterSettings()) + ); + for (Expression filter : candidateRightHandFilters) { + if (filter instanceof TranslationAware translationAware) { + if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { + addToPreJoinFilters(translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder()); + } + } + // If the filter is not translatable we will not apply it for now + // as performance testing showed no performance improvement. + // We can revisit this in the future if needed, once we have more optimized workflow in place. + // The filter is optional, so it is OK to ignore it if it cannot be translated. + } + } else if (rightPreJoinPlan != null && rightPreJoinPlan instanceof EsSourceExec == false) { + throw new IllegalStateException( + "The right side of a LookupJoinExec can only be a FilterExec on top of an EsSourceExec or an EsSourceExec, but got: " + + rightPreJoinPlan + ); + } + } + + @Override + public Query getQuery(int position) { + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + for (QueryList queryList : queryLists) { + Query q = queryList.getQuery(position); + if (q == null) { + // if any of the matchFields are null, it means there is no match for this position + // A AND NULL is always NULL, so we can skip this position + return null; + } + builder.add(q, BooleanClause.Occur.FILTER); + } + // also attach the pre-join filter if it exists + for (Query preJoinFilter : preJoinFilters) { + builder.add(preJoinFilter, BooleanClause.Occur.FILTER); + } + return builder.build(); + } + + @Override + public int getPositionCount() { + int positionCount = queryLists.get(0).getPositionCount(); + for (QueryList queryList : queryLists) { + if (queryList.getPositionCount() != positionCount) { + throw new IllegalArgumentException( + "All QueryLists must have the same position count, expected: " + + positionCount + + ", but got: " + + queryList.getPositionCount() + ); + } + } + return positionCount; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index 641f3420bbd78..c7cc98152c92c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -26,6 +26,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import java.io.IOException; import java.util.ArrayList; @@ -47,7 +48,8 @@ public record Factory( String lookupIndexPattern, String lookupIndex, List loadFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) implements OperatorFactory { @Override public String describe() { @@ -61,6 +63,7 @@ public String describe() { .append(" inputChannel=") .append(matchField.channel()); } + stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan == null ? "null" : rightPreJoinPlan.toString()); stringBuilder.append("]"); return stringBuilder.toString(); } @@ -77,7 +80,8 @@ public Operator get(DriverContext driverContext) { lookupIndexPattern, lookupIndex, loadFields, - source + source, + rightPreJoinPlan ); } } @@ -90,7 +94,8 @@ public Operator get(DriverContext driverContext) { private final List loadFields; private final Source source; private long totalRows = 0L; - private List matchFields; + private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; /** * Total number of pages emitted by this {@link Operator}. */ @@ -114,7 +119,8 @@ public LookupFromIndexOperator( String lookupIndexPattern, String lookupIndex, List loadFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests); this.matchFields = matchFields; @@ -125,6 +131,7 @@ public LookupFromIndexOperator( this.lookupIndex = lookupIndex; this.loadFields = loadFields; this.source = source; + this.rightPreJoinPlan = rightPreJoinPlan; } @Override @@ -151,7 +158,8 @@ protected void performAsync(Page inputPage, ActionListener listener newMatchFields, new Page(inputBlockArray), loadFields, - source + source, + rightPreJoinPlan ); lookupService.lookupAsync( request, @@ -211,6 +219,8 @@ public String toString() { .append(" inputChannel=") .append(matchField.channel()); } + + stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan == null ? "null" : rightPreJoinPlan.toString()); stringBuilder.append("]"); return stringBuilder.toString(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 4cfa7adaede81..e81deb588249f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.enrich; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -21,7 +23,6 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; -import org.elasticsearch.compute.operator.lookup.ExpressionQueryList; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; import org.elasticsearch.core.Releasables; @@ -39,6 +40,10 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; import java.io.IOException; import java.util.ArrayList; @@ -53,6 +58,7 @@ */ public class LookupFromIndexService extends AbstractLookupService { public static final String LOOKUP_ACTION_NAME = EsqlQueryAction.NAME + "/lookup_from_index"; + private static final Logger logger = LogManager.getLogger(LookupFromIndexService.class); public LookupFromIndexService( ClusterService clusterService, @@ -89,7 +95,8 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque null, request.extractFields, request.matchFields, - request.source + request.source, + request.rightPreJoinPlan ); } @@ -113,10 +120,32 @@ protected LookupEnrichQueryGenerator queryList( ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); queryLists.add(q); } - if (queryLists.size() == 1) { + + PhysicalPlan physicalPlan = request.rightPreJoinPlan; + physicalPlan = localLookupNodePlanning(physicalPlan); + if (queryLists.size() == 1 && physicalPlan instanceof FilterExec == false) { return queryLists.getFirst(); } - return new ExpressionQueryList(queryLists); + return new ExpressionQueryList(queryLists, context, physicalPlan, clusterService); + + } + + /** + * This function will perform any planning needed on the local node + * For now, we will just do mapping of the logical plan to physical plan + * In the future we can also do local physical and logical optimizations. + * We only support a FragmentExec node containing a logical plan or a null plan + * If any other plan is sent we will just return null. This can happen in cases + * where the coordinator is running an older version that does not support + * keeping the plan as Logical Plan inside FragmentExec yet + * In those cases, it is safe to ignore the plan sent and return null + */ + private static PhysicalPlan localLookupNodePlanning(PhysicalPlan physicalPlan) { + if (physicalPlan instanceof FragmentExec fragmentExec) { + LocalMapper localMapper = new LocalMapper(); + return localMapper.map(fragmentExec.fragment()); + } + return null; } @Override @@ -131,6 +160,7 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in public static class Request extends AbstractLookupService.Request { private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; Request( String sessionId, @@ -139,10 +169,12 @@ public static class Request extends AbstractLookupService.Request { List matchFields, Page inputPage, List extractFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source); this.matchFields = matchFields; + this.rightPreJoinPlan = rightPreJoinPlan; } } @@ -153,6 +185,7 @@ protected static class TransportRequest extends AbstractLookupService.TransportR ); private final List matchFields; + private final PhysicalPlan rightPreJoinPlan; // Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order // The channel information inside the MatchConfig, should say the same thing @@ -164,10 +197,12 @@ protected static class TransportRequest extends AbstractLookupService.TransportR Page toRelease, List extractFields, List matchFields, - Source source + Source source, + PhysicalPlan rightPreJoinPlan ) { super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source); this.matchFields = matchFields; + this.rightPreJoinPlan = rightPreJoinPlan; } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -213,6 +248,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro String sourceText = in.readString(); source = new Source(source.source(), sourceText); } + PhysicalPlan rightPreJoinPlan = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + rightPreJoinPlan = planIn.readOptionalNamedWriteable(PhysicalPlan.class); + } TransportRequest result = new TransportRequest( sessionId, shardId, @@ -221,7 +260,8 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro inputPage, extractFields, matchFields, - source + source, + rightPreJoinPlan ); result.setParentTask(parentTaskId); return result; @@ -264,11 +304,17 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) { out.writeString(source.text()); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) { + planOut.writeOptionalNamedWriteable(rightPreJoinPlan); + } } @Override protected String extraDescription() { - return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")); + return " ,match_fields=" + + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", ")) + + ", right_pre_join_plan=" + + (rightPreJoinPlan == null ? "null" : rightPreJoinPlan.toString()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java index 692bab7d653b1..7a38ae14f87e3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java @@ -13,9 +13,12 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; @@ -45,9 +48,14 @@ * * Also combines adjacent filters using a logical {@code AND}. */ -public final class PushDownAndCombineFilters extends OptimizerRules.OptimizerRule { +public final class PushDownAndCombineFilters extends OptimizerRules.ParameterizedOptimizerRule { + + public PushDownAndCombineFilters() { + super(OptimizerRules.TransformDirection.DOWN); + } + @Override - protected LogicalPlan rule(Filter filter) { + protected LogicalPlan rule(Filter filter, LogicalOptimizerContext ctx) { LogicalPlan plan = filter; LogicalPlan child = filter.child(); Expression condition = filter.condition(); @@ -100,7 +108,7 @@ protected LogicalPlan rule(Filter filter) { // TODO: could we do better here about pushing down filters for inlinestats? // See also https://github.com/elastic/elasticsearch/issues/127497 // Push down past INLINESTATS if the condition is on the groupings - return pushDownPastJoin(filter, join); + return pushDownPastJoin(filter, join, ctx.foldCtx()); } // cannot push past a Limit, this could change the tailing result set returned return plan; @@ -127,7 +135,7 @@ private static ScopedFilter scopeFilter(List filters, LogicalPlan le return new ScopedFilter(rest, leftFilters, rightFilters); } - private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { + private static LogicalPlan pushDownPastJoin(Filter filter, Join join, FoldContext foldCtx) { LogicalPlan plan = filter; // pushdown only through LEFT joins // TODO: generalize this for other join types @@ -140,14 +148,66 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { // 2. filter scoped to the right // 3. filter that requires both sides to be evaluated ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); - // push the left scoped filter down to the left child, keep the rest intact + boolean optimizationApplied = false; + // push the left scoped filter down to the left child if (scoped.leftFilters.size() > 0) { // push the filter down to the left child left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters)); // update the join with the new left child join = (Join) join.replaceLeft(left); + // we completely applied the left filters, so we can remove them from the scoped filters + scoped = new ScopedFilter(scoped.commonFilters(), List.of(), scoped.rightFilters); + optimizationApplied = true; + } + // push the right scoped filter down to the right child + // We check if each AND component of the filter is already part of the right side filter before we add it + // In the future, this optimization can apply to other types of joins as well such as InlineJoin + // but for now we limit it to LEFT joins only, till filters are supported for other join types + if (scoped.rightFilters().isEmpty() == false) { + List rightPushableFilters = buildRightPushableFilters(scoped.rightFilters(), foldCtx); + if (rightPushableFilters.isEmpty() == false) { + if (join.right() instanceof Filter existingRightFilter) { + // merge the unique AND filter components from rightPushableFilters and existingRightFilter.condition() + + List existingFilters = new ArrayList<>(Predicates.splitAnd(existingRightFilter.condition())); + int sizeBefore = existingFilters.size(); + rightPushableFilters.stream().filter(e -> existingFilters.contains(e) == false).forEach(existingFilters::add); + if (sizeBefore != existingFilters.size()) { + right = existingRightFilter.with(Predicates.combineAnd(existingFilters)); + join = (Join) join.replaceRight(right); + optimizationApplied = true; + } // else nothing needs to be updated + } else { + // create a new filter on top of the right child + right = new Filter(right.source(), right, Predicates.combineAnd(rightPushableFilters)); + // update the join with the new right child + join = (Join) join.replaceRight(right); + optimizationApplied = true; + } + } + /* + We still want to reapply the filters that we just applied to the right child, + so we do NOT update scoped, and we do NOT mark optimizationApplied as true. + This is because by pushing them on the right side, we filter what rows we get from the right side + But we do not limit the output rows of the join as the rows are kept as not matched on the left side + So we end up applying the right filters twice, once on the right side and once on top of the join + This will result in major performance optimization when the lookup join is expanding + and applying the right filters reduces the expansion significantly. + For example, consider an expanding lookup join of 100,000 rows table with 10,000 lookup table + with filter of selectivity 0.1% on the right side(keeps 10 out of 10,000 rows of the lookup table). + In the non-optimized version the filter is not pushed to the right, and we get an explosion of records. + We have 100,000x10,000 = 1,000,000,000 rows after the join without the optimization. + Then we filter then out to only 1,000,000 rows. + With the optimization we apply the filter early so after the expanding join we only have 1,000,000 rows. + This reduced max number of rows used by a factor of 1,000 - // keep the remaining filters in place, otherwise return the new join; + In the future, once we have inner join support, it is usually possible to convert the lookup join into an inner join + This would allow us to not reapply the filters pushed to the right side again above the join, + as the inner join would only return rows that match on both sides. + */ + } + if (optimizationApplied) { + // if we pushed down some filters, we need to update the filters to reapply above the join Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters)); plan = remainingFilter != null ? filter.with(join, remainingFilter) : join; } @@ -156,6 +216,38 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { return plan; } + /** + * Builds the right pushable filters for the given expressions. + */ + private static List buildRightPushableFilters(List expressions, FoldContext foldCtx) { + return expressions.stream().filter(x -> isRightPushableFilter(x, foldCtx)).toList(); + } + + /** + * Determines if the given expression can be pushed down to the right side of a join. + * A filter is right pushable if the filter's predicate evaluates to false or null when all fields are set to null + * This rule helps us guard against the case where we don't know if a field is null because: + * 1. the field is null in the source data or + * 2. the field is null because there was no match in the join + * If the null could be an issue we just say the filter is not pushable and we avoid this issue + * In this context pushable means that we can push the filter down to the right side of a LEFT join + * We do not check if the filter is pushable to Lucene or not here + */ + private static boolean isRightPushableFilter(Expression filter, FoldContext foldCtx) { + // traverse the filter tree + // replace any reference to an attribute with a null literal + Expression nullifiedFilter = filter.transformUp(Attribute.class, r -> new Literal(r.source(), null, r.dataType())); + // try to fold the filter + // check if the folded filter evaluates to false or null, if yes return true + // pushable WHERE field > 1 (evaluates to null), WHERE field is NOT NULL (evaluates to false) + // not pushable WHERE field is NULL (evaluates to true), WHERE coalesce(field, 10) == 10 (evaluates to true) + if (nullifiedFilter.foldable()) { + Object folded = nullifiedFilter.fold(foldCtx); + return folded == null || Boolean.FALSE.equals(folded); + } + return false; + } + private static Function NO_OP = expression -> expression; private static LogicalPlan maybePushDownPastUnary( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java index 1c3f341321f6f..f75430a64af1d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.xpack.esql.core.expression.Alias; @@ -75,6 +76,11 @@ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) { } } + @Override + protected LogicalPlan getRightToSerialize(StreamOutput out) { + return right(); + } + /** * Replaces the stubbed source with the actual source. * NOTE: this will replace the first {@link StubRelation}s found with the source and the method is meant to be used to replace one node diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 3ef755d190cfe..f2f723ff9c112 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plan.logical.join; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; @@ -130,10 +132,23 @@ public Join(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { source().writeTo(out); out.writeNamedWriteable(left()); - out.writeNamedWriteable(right()); + out.writeNamedWriteable(getRightToSerialize(out)); config.writeTo(out); } + protected LogicalPlan getRightToSerialize(StreamOutput out) { + LogicalPlan rightToSerialize = right(); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER) == false) { + // Prior to TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER + // we do not support a filter on top of the right side of the join + // As we consider the filters optional, we remove them here + while (rightToSerialize instanceof Filter filter) { + rightToSerialize = filter.child(); + } + } + return rightToSerialize; + } + @Override public String getWriteableName() { return ENTRY.name; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 69d03f81ada9c..4bac4d4b0aae1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -92,7 +92,9 @@ import org.elasticsearch.xpack.esql.inference.XContentRowEncoder; import org.elasticsearch.xpack.esql.inference.completion.CompletionOperator; import org.elasticsearch.xpack.esql.inference.rerank.RerankOperator; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Fork; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; @@ -105,6 +107,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.GrokExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; @@ -740,8 +743,8 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan } Layout layout = layoutBuilder.build(); - EsQueryExec localSourceExec = (EsQueryExec) join.lookup(); - if (localSourceExec.indexMode() != IndexMode.LOOKUP) { + EsRelation esRelation = findEsRelation(join.lookup()); + if (esRelation == null || esRelation.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } @@ -749,10 +752,10 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan // 1. We've just got one entry - this should be the one relevant to the join, and it should be for this cluster // 2. We have got multiple entries - this means each cluster has its own one, and we should extract one relevant for this cluster Map.Entry entry; - if (localSourceExec.indexNameWithModes().size() == 1) { - entry = localSourceExec.indexNameWithModes().entrySet().iterator().next(); + if (esRelation.indexNameWithModes().size() == 1) { + entry = esRelation.indexNameWithModes().entrySet().iterator().next(); } else { - var maybeEntry = localSourceExec.indexNameWithModes() + var maybeEntry = esRelation.indexNameWithModes() .entrySet() .stream() .filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias)) @@ -788,7 +791,6 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan } matchFields.add(new MatchConfig(right, input)); } - return source.with( new LookupFromIndexOperator.Factory( matchFields, @@ -796,15 +798,26 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan parentTask, context.queryPragmas().enrichMaxWorkers(), ctx -> lookupFromIndexService, - localSourceExec.indexPattern(), + esRelation.indexPattern(), indexName, join.addedFields().stream().map(f -> (NamedExpression) f).toList(), - join.source() + join.source(), + join.right() ), layout ); } + private static EsRelation findEsRelation(PhysicalPlan node) { + if (node instanceof FragmentExec fragmentExec) { + List esRelations = fragmentExec.fragment().collectFirstChildren(x -> x instanceof EsRelation); + if (esRelations.size() == 1) { + return (EsRelation) esRelations.get(0); + } + } + return null; + } + private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) { Layout.Builder layout = new Layout.Builder(); layout.append(localSourceExec.output()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 6c7162ada2e7f..ef4f605b22d36 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; @@ -63,6 +64,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; @@ -208,8 +210,18 @@ public static PhysicalPlan localPlan( ) { final LocalMapper localMapper = new LocalMapper(); var isCoordPlan = new Holder<>(Boolean.TRUE); + Set lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance) + .stream() + .map(x -> ((LookupJoinExec) x).right()) + .collect(Collectors.toSet()); var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> { + if (lookupJoinExecRightChildren.contains(f)) { + // Do not optimize the right child of a lookup join exec + // The data node does not have the right stats to perform the optimization because the stats are on the lookup node + // Also we only ship logical plans across the network, so the plan needs to remain logical + return f; + } isCoordPlan.set(Boolean.FALSE); var optimizedFragment = logicalOptimizer.localOptimize(f.fragment()); var physicalFragment = localMapper.map(optimizedFragment); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index a1671bffc5c25..c0ac137f8b2c8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -22,7 +23,9 @@ import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; +import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; @@ -97,25 +100,49 @@ private PhysicalPlan mapBinary(BinaryPlan binary) { } PhysicalPlan left = map(binary.left()); - PhysicalPlan right = map(binary.right()); - // if the right is data we can use a hash join directly - if (right instanceof LocalSourceExec localData) { - return new HashJoinExec( - join.source(), - left, - localData, - config.matchFields(), - config.leftFields(), - config.rightFields(), - join.rightOutputFields() - ); + if (binary.right() instanceof LocalRelation) { + PhysicalPlan right = map(binary.right()); + if (right instanceof LocalSourceExec localData) { + return new HashJoinExec( + join.source(), + left, + localData, + config.matchFields(), + config.leftFields(), + config.rightFields(), + join.rightOutputFields() + ); + } else { + throw new EsqlIllegalArgumentException("Unsupported right plan for join [" + binary.right().nodeName() + "]"); + } + } + EsRelation rightRelation = null; + if (binary.right() instanceof EsRelation esRelation) { + rightRelation = esRelation; + } else if (binary.right() instanceof Filter filter && filter.child() instanceof EsRelation esRelation) { + rightRelation = esRelation; } - if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (rightRelation == null) { + throw new EsqlIllegalArgumentException("Unsupported right plan for lookup join [" + binary.right().nodeName() + "]"); } + if (rightRelation.indexMode() != IndexMode.LOOKUP) { + throw new EsqlIllegalArgumentException( + "To perform a lookup join with index [" + rightRelation.indexPattern() + "], it must be a in lookup index mode" + ); + } + // we want to do local physical planning on the lookup node eventually for the right side of the lookup join + // so here we will wrap the logical plan with a FragmentExec and keep it as is + FragmentExec fragmentExec = new FragmentExec(binary.right()); + return new LookupJoinExec( + join.source(), + left, + fragmentExec, + config.leftFields(), + config.rightFields(), + join.rightOutputFields() + ); } - return MapperUtils.unsupported(binary); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 22a5c3b403f3f..f515f6df91e1b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; @@ -231,16 +232,35 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { join.rightOutputFields() ); } - if (right instanceof FragmentExec fragment - && fragment.fragment() instanceof EsRelation relation - && relation.indexMode() == IndexMode.LOOKUP) { - return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields()); + if (right instanceof FragmentExec fragment) { + boolean isIndexModeLookup = isIndexModeLookup(fragment); + if (isIndexModeLookup) { + return new LookupJoinExec( + join.source(), + left, + right, + config.leftFields(), + config.rightFields(), + join.rightOutputFields() + ); + } } } - return MapperUtils.unsupported(bp); } + private static boolean isIndexModeLookup(FragmentExec fragment) { + // we support 2 cases: + // EsRelation in index_mode=lookup + boolean isIndexModeLookup = fragment.fragment() instanceof EsRelation relation && relation.indexMode() == IndexMode.LOOKUP; + // or Filter(EsRelation) in index_mode=lookup + isIndexModeLookup = isIndexModeLookup + || fragment.fragment() instanceof Filter filter + && filter.child() instanceof EsRelation relation + && relation.indexMode() == IndexMode.LOOKUP; + return isIndexModeLookup; + } + private PhysicalPlan mapFork(Fork fork) { return new MergeExec(fork.source(), fork.children().stream().map(child -> map(child)).toList(), fork.output()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java index a7eacf383fc73..a25ae9a274786 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java @@ -11,6 +11,14 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import java.util.List; + +/** + * Class holding all the flags that can be used to change behavior for certain features in ESQL. + * The flags are backed by {@link Setting}s so they can be dynamically changed. + * When adding a new flag, make sure to add it to {@link #ALL_ESQL_FLAGS_SETTINGS} + * so it gets registered and unit tests can pass. + */ public class EsqlFlags { public static final Setting ESQL_STRING_LIKE_ON_INDEX = Setting.boolSetting( "esql.query.string_like_on_index", @@ -40,6 +48,9 @@ public class EsqlFlags { Setting.Property.Dynamic ); + // this is only used for testing purposes right now + public static List> ALL_ESQL_FLAGS_SETTINGS = List.of(ESQL_STRING_LIKE_ON_INDEX, ESQL_ROUNDTO_PUSHDOWN_THRESHOLD); + private final boolean stringLikeOnIndex; private final int roundToPushdownThreshold; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index 5f3d22957731a..0c31581e514d0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -28,9 +28,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.IntBlock; -import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; @@ -45,6 +43,7 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; @@ -62,12 +61,21 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.core.tree.Location; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.hamcrest.Matcher; import org.junit.After; @@ -76,6 +84,8 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +97,7 @@ public class LookupFromIndexOperatorTests extends AsyncOperatorTestCase { private static final int LOOKUP_SIZE = 1000; + private static final int LESS_THAN_VALUE = -40; private final ThreadPool threadPool = threadPool(); private final Directory lookupIndexDirectory = newDirectory(); private final List releasables = new ArrayList<>(); @@ -135,12 +146,17 @@ protected void assertSimpleOutput(List input, List results) { for (Page r : results) { assertThat(r.getBlockCount(), equalTo(numberOfJoinColumns + 2)); LongVector match = r.getBlock(0).asVector(); - BytesRefVector lkwd = r.getBlock(numberOfJoinColumns).asVector(); - IntVector lint = r.getBlock(numberOfJoinColumns + 1).asVector(); + BytesRefBlock lkwdBlock = r.getBlock(numberOfJoinColumns); + IntBlock lintBlock = r.getBlock(numberOfJoinColumns + 1); for (int p = 0; p < r.getPositionCount(); p++) { long m = match.getLong(p); - assertThat(lkwd.getBytesRef(p, new BytesRef()).utf8ToString(), equalTo("l" + m)); - assertThat(lint.getInt(p), equalTo((int) -m)); + if (m > Math.abs(LESS_THAN_VALUE)) { + assertThat(lkwdBlock.getBytesRef(lkwdBlock.getFirstValueIndex(p), new BytesRef()).utf8ToString(), equalTo("l" + m)); + assertThat(lintBlock.getInt(lintBlock.getFirstValueIndex(p)), equalTo((int) -m)); + } else { + assertTrue("at " + p, lkwdBlock.isNull(p)); + assertTrue("at " + p, lintBlock.isNull(p)); + } } } } @@ -162,6 +178,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match" + i); matchFields.add(new MatchConfig(matchField, i, inputDataType)); } + return new LookupFromIndexOperator.Factory( matchFields, sessionId, @@ -171,8 +188,25 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { lookupIndex, lookupIndex, loadFields, - Source.EMPTY + Source.EMPTY, + buildLessThanFilter(LESS_THAN_VALUE) + ); + } + + private FragmentExec buildLessThanFilter(int value) { + FieldAttribute filterAttribute = new FieldAttribute( + Source.EMPTY, + "lint", + new EsField("lint", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) + ); + Expression lessThan = new LessThan( + new Source(new Location(0, 0), "lint < " + value), + filterAttribute, + new Literal(Source.EMPTY, value, DataType.INTEGER) ); + EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), List.of()); + Filter filter = new Filter(Source.EMPTY, esRelation, lessThan); + return new FragmentExec(filter); } @Override @@ -180,20 +214,50 @@ protected Matcher expectedDescriptionOfSimple() { return expectedToStringOfSimple(); } + @Override + public void testSimpleDescription() { + Operator.OperatorFactory factory = simple(); + String description = factory.describe(); + assertThat(description, expectedDescriptionOfSimple()); + try (Operator op = factory.get(driverContext())) { + // we use a special pattern here because the description can contain new lines for the right_pre_join_plan + String pattern = "^\\w*\\[[\\s\\S]*\\]$"; + assertThat(description, matchesPattern(pattern)); + } + } + @Override protected Matcher expectedToStringOfSimple() { StringBuilder sb = new StringBuilder(); - sb.append("LookupOperator\\[index=idx load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+]"); + sb.append("LookupOperator\\[index=idx load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] "); for (int i = 0; i < numberOfJoinColumns; i++) { - sb.append(" input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i); + sb.append("input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i).append(" "); } - sb.append("]"); + // Accept either the legacy physical plan rendering (FilterExec/EsQueryExec) or the new FragmentExec rendering + sb.append("right_pre_join_plan=(?:"); + // Legacy pattern + sb.append("FilterExec\\[lint\\{f}#\\d+ < ") + .append(LESS_THAN_VALUE) + .append( + "\\[INTEGER]]\\n\\\\_EsQueryExec\\[test], indexMode\\[lookup],\\s*(?:query\\[\\]|\\[\\])?,?\\s*" + + "limit\\[\\],?\\s*sort\\[(?:\\[\\])?\\]\\s*estimatedRowSize\\[null\\]\\s*queryBuilderAndTags \\[(?:\\[\\]\\])\\]" + ); + sb.append("|"); + // New FragmentExec pattern + sb.append("FragmentExec\\[filter=null, estimatedRowSize=\\d+, reducer=\\[\\], fragment=\\[<>\\n") + .append("Filter\\[lint\\{f}#\\d+ < ") + .append(LESS_THAN_VALUE) + .append("\\[INTEGER]]\\n") + .append("\\\\_EsRelation\\[test]\\[LOOKUP]\\[\\]<>\\]\\]\\]"); + sb.append(")"); return matchesPattern(sb.toString()); } private LookupFromIndexService lookupService(DriverContext mainContext) { boolean beCranky = mainContext.bigArrays().breakerService() instanceof CrankyCircuitBreakerService; DiscoveryNode localNode = DiscoveryNodeUtils.create("node", "node"); + var builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + builtInClusterSettings.addAll(EsqlFlags.ALL_ESQL_FLAGS_SETTINGS); ClusterService clusterService = ClusterServiceUtils.createClusterService( threadPool, localNode, @@ -202,7 +266,7 @@ private LookupFromIndexService lookupService(DriverContext mainContext) { .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofKb(0)) .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofKb(0)) .build(), - ClusterSettings.createBuiltInClusterSettings() + new ClusterSettings(Settings.EMPTY, builtInClusterSettings) ); IndicesService indicesService = mock(IndicesService.class); IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index d1bb7aeaa166a..16ab4181ccb34 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -74,6 +74,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter; import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; @@ -85,6 +86,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.GrokExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; @@ -1030,21 +1032,23 @@ public void testMissingFieldsPurgesTheJoinLocallyThroughCommands() { } /* - * LimitExec[1000[INTEGER]] + * LimitExec[1000[INTEGER],12] * \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],FINAL,[language_code{r}#12 * , $$c$count{r}#32, $$c$seen{r}#33],12] * \_ExchangeExec[[language_code{r}#12, $$c$count{r}#32, $$c$seen{r}#33],true] * \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],INITIAL,[language_code{r}# - * 12, $$c$count{r}#34, $$c$seen{r}#35],12] + * 12, $$c$count{r}#34, $$c$seen{r}#35],12] * \_LookupJoinExec[[language_code{r}#12],[language_code{f}#29],[]] - * |_GrokExec[first_name{f}#19,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@764e5109],[languag - * e_code{r}#12]] + * |_GrokExec[first_name{f}#19,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@177d8fd5],[languag + * e_code{r}#12]] * | \_MvExpandExec[emp_no{f}#18,emp_no{r}#31] * | \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]] * | \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]> * | \_EvalExec[[null[INTEGER] AS languages#21]] - * | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[66] - * \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#37], limit[], sort[] estimatedRowSize[4] + * | \_EsQueryExec[test], indexMode[standard], [_doc{f}#36], limit[], sort[] estimatedRowSize[66] + * queryBuilderAndTags [[QueryBuilderAndTags{queryBuilder=[null], tags=[]}]] + * \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<> + * EsRelation[languages_lookup][LOOKUP][language_code{f}#29]<>]] */ public void testMissingFieldsNotPurgingTheJoinLocally() { var stats = EsqlTestUtils.statsForMissingField("languages"); @@ -1072,22 +1076,26 @@ public void testMissingFieldsNotPurgingTheJoinLocally() { var extract = as(project.child(), FieldExtractExec.class); var eval = as(extract.child(), EvalExec.class); var source = as(eval.child(), EsQueryExec.class); - var right = as(join.right(), EsQueryExec.class); + var right = as(join.right(), FragmentExec.class); + var relation = as(right.fragment(), EsRelation.class); } /* - * LimitExec[1000[INTEGER]] + * LimitExec[1000[INTEGER],62] * \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]] - * |_LimitExec[1000[INTEGER]] + * |_LimitExec[1000[INTEGER],12] * | \_AggregateExec[[languages{f}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{f}#15 AS language_code#6],FINAL,[language - * s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62] + * s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62] * | \_ExchangeExec[[languages{f}#15, $$c$count{r}#25, $$c$seen{r}#26],true] * | \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{r}#15 AS language_code#6],INITIAL, - * [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12] + * [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12] + * ges{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12] * | \_FieldExtractExec[emp_no{f}#12]<[],[]> * | \_EvalExec[[null[INTEGER] AS languages#15]] - * | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12] - * \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#30], limit[], sort[] estimatedRowSize[4] + * | \_EsQueryExec[test], indexMode[standard], [_doc{f}#29], limit[], sort[] estimatedRowSize[12] + * queryBuilderAndTags [[QueryBuilderAndTags{queryBuilder=[null], tags=[]}]] + * \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<> + * EsRelation[languages_lookup][LOOKUP][language_code{f}#23, language_name{f}#24]<>]] */ public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() { var stats = EsqlTestUtils.statsForMissingField("languages"); @@ -1118,9 +1126,10 @@ public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() { assertThat(source.indexPattern(), is("test")); assertThat(source.indexMode(), is(IndexMode.STANDARD)); - source = as(join.right(), EsQueryExec.class); - assertThat(source.indexPattern(), is("languages_lookup")); - assertThat(source.indexMode(), is(IndexMode.LOOKUP)); + var right = as(join.right(), FragmentExec.class); + var relation = as(right.fragment(), EsRelation.class); + assertThat(relation.indexPattern(), is("languages_lookup")); + assertThat(relation.indexMode(), is(IndexMode.LOOKUP)); } /* diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index c3c4a9e3f1038..9964cbb89b99b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -7067,13 +7067,14 @@ public void testLookupJoinPushDownFilterOnLeftSideField() { * Expects * *
{@code
-     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16,
-     *          languages{f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
+     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, languages
+     * {f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
      * \_Limit[1000[INTEGER],false]
-     *   \_Filter[language_name{f}#19 == [45 6e 67 6c 69 73 68][KEYWORD]]
-     *     \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]]
+     *   \_Filter[language_name{f}#19 == English[KEYWORD]]
+     *     \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false]
      *       |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
-     *       \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
+     *       \_Filter[language_name{f}#19 == English[KEYWORD]]
+     *         \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19]
      * }
*/ public void testLookupJoinPushDownDisabledForLookupField() { @@ -7102,7 +7103,10 @@ public void testLookupJoinPushDownDisabledForLookupField() { assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); var leftRel = as(join.left(), EsRelation.class); - var rightRel = as(join.right(), EsRelation.class); + var rightFilter = as(join.right(), Filter.class); + assertEquals("language_name == \"English\"", rightFilter.condition().toString()); + var joinRightEsRelation = as(rightFilter.child(), EsRelation.class); + } /** @@ -7111,14 +7115,15 @@ public void testLookupJoinPushDownDisabledForLookupField() { * Expects * *
{@code
-     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17,
-     *          languages{f}#11 AS language_code#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
+     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, languages
+     * {f}#11 AS language_code#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
      * \_Limit[1000[INTEGER],false]
-     *   \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD]]
-     *     \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19]]
+     *   \_Filter[language_name{f}#20 == English[KEYWORD]]
+     *     \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19],false]
      *       |_Filter[emp_no{f}#8 > 1[INTEGER]]
      *       | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
-     *       \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20]
+     *       \_Filter[language_name{f}#20 == English[KEYWORD]]
+     *         \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20]
      * }
*/ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() { @@ -7155,7 +7160,9 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel assertThat(literal.value(), equalTo(1)); var leftRel = as(filter.child(), EsRelation.class); - var rightRel = as(join.right(), EsRelation.class); + var rightFilter = as(join.right(), Filter.class); + assertEquals("language_name == \"English\"", rightFilter.condition().toString()); + var rightRel = as(rightFilter.child(), EsRelation.class); } /** diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java index f3d1107983628..11c64a82e3f57 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java @@ -9,41 +9,56 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.fulltext.Match; import org.elasticsearch.xpack.esql.expression.function.scalar.math.Pow; +import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce; import org.elasticsearch.xpack.esql.expression.function.scalar.string.regex.RLike; import org.elasticsearch.xpack.esql.expression.function.scalar.string.regex.WildcardLike; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; import org.elasticsearch.xpack.esql.expression.predicate.logical.And; +import org.elasticsearch.xpack.esql.expression.predicate.logical.Not; +import org.elasticsearch.xpack.esql.expression.predicate.logical.Or; +import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNull; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; +import org.elasticsearch.xpack.esql.optimizer.AbstractLogicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.FIVE; import static org.elasticsearch.xpack.esql.EsqlTestUtils.FOUR; import static org.elasticsearch.xpack.esql.EsqlTestUtils.ONE; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.SIX; import static org.elasticsearch.xpack.esql.EsqlTestUtils.THREE; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TWO; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; @@ -56,9 +71,13 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.rlike; import static org.elasticsearch.xpack.esql.EsqlTestUtils.wildcardLike; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; -public class PushDownAndCombineFiltersTests extends ESTestCase { +public class PushDownAndCombineFiltersTests extends AbstractLogicalPlanOptimizerTests { + + private final LogicalOptimizerContext optimizerContext = new LogicalOptimizerContext(null, FoldContext.small()); public void testCombineFilters() { EsRelation relation = relation(); @@ -68,7 +87,10 @@ public void testCombineFilters() { Filter fa = new Filter(EMPTY, relation, conditionA); Filter fb = new Filter(EMPTY, fa, conditionB); - assertEquals(new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), new PushDownAndCombineFilters().apply(fb)); + assertEquals( + new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), + new PushDownAndCombineFilters().apply(fb, optimizerContext) + ); } public void testCombineFiltersLikeRLike() { @@ -79,7 +101,10 @@ public void testCombineFiltersLikeRLike() { Filter fa = new Filter(EMPTY, relation, conditionA); Filter fb = new Filter(EMPTY, fa, conditionB); - assertEquals(new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), new PushDownAndCombineFilters().apply(fb)); + assertEquals( + new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), + new PushDownAndCombineFilters().apply(fb, optimizerContext) + ); } public void testPushDownFilter() { @@ -93,7 +118,7 @@ public void testPushDownFilter() { Filter fb = new Filter(EMPTY, keep, conditionB); Filter combinedFilter = new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)); - assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb)); + assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb, optimizerContext)); } public void testPushDownFilterPastRenamingProject() { @@ -111,7 +136,7 @@ public void testPushDownFilterPastRenamingProject() { LessThan bRenamedLessThanTwo = lessThanOf(bRenamed.toAttribute(), TWO); Filter filter = new Filter(EMPTY, project, Predicates.combineAnd(List.of(aRenamedTwiceGreaterThanOne, bRenamedLessThanTwo))); - LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); Project optimizedProject = as(optimized, Project.class); assertEquals(optimizedProject.projections(), project.projections()); @@ -184,7 +209,7 @@ public void testPushDownFilterOnAliasInEval() { Filter filter = new Filter(EMPTY, eval, Predicates.combineAnd(conditions)); - LogicalPlan plan = new PushDownAndCombineFilters().apply(filter); + LogicalPlan plan = new PushDownAndCombineFilters().apply(filter, optimizerContext); if (numNonPushable > 0) { Filter optimizedFilter = as(plan, Filter.class); @@ -216,7 +241,7 @@ public void testPushDownLikeRlikeFilter() { Filter fb = new Filter(EMPTY, keep, conditionB); Filter combinedFilter = new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)); - assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb)); + assertEquals(new EsqlProject(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb, optimizerContext)); } // from ... | where a > 1 | stats count(1) by b | where count(1) >= 3 and b < 2 @@ -244,7 +269,7 @@ public void testSelectivelyPushDownFilterPastFunctionAgg() { ), aggregateCondition ); - assertEquals(expected, new PushDownAndCombineFilters().apply(fb)); + assertEquals(expected, new PushDownAndCombineFilters().apply(fb, optimizerContext)); } // from ... | where a > 1 | COMPLETION completion = "some prompt" WITH { "inferenceId' : "inferenceId" } | where b < 2 and @@ -284,7 +309,7 @@ public void testPushDownFilterPastCompletion() { conditionCompletion ); - assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB)); + assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB, optimizerContext)); } // from ... | where a > 1 | RERANK "query" ON title WITH { "inference_id" : "inferenceId" } | where b < 2 and _score > 1 @@ -317,7 +342,7 @@ public void testPushDownFilterPastRerank() { scoreCondition ); - assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB)); + assertEquals(expectedOptimizedPlan, new PushDownAndCombineFilters().apply(filterB, optimizerContext)); } private static Completion completion(LogicalPlan child) { @@ -348,4 +373,471 @@ private static EsRelation relation() { private static EsRelation relation(List fieldAttributes) { return new EsRelation(EMPTY, randomIdentifier(), randomFrom(IndexMode.values()), Map.of(), fieldAttributes); } + + public void testPushDownFilterPastLeftJoinWithPushable() { + Join join = createLeftJoin(); + EsRelation left = (EsRelation) join.left(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + // Pushable filter + Expression pushableCondition = greaterThanOf(c, ONE); + Filter filter = new Filter(EMPTY, join, pushableCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + // The filter should still be on top + Filter topFilter = as(optimized, Filter.class); + assertEquals(pushableCondition, topFilter.condition()); + Join optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + assertEquals(pushableCondition, rightFilter.condition()); + } + + public void testPushDownFilterPastLeftJoinWithExistingFilter() { + Join join = createLeftJoin(); + EsRelation left = (EsRelation) join.left(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + // Existing filter on the right side + Expression existingCondition = lessThanOf(c, FIVE); + Filter existingFilter = new Filter(EMPTY, join.right(), existingCondition); + join = (Join) join.replaceRight(existingFilter); + + // Pushable filter + Expression pushableCondition = greaterThanOf(c, ONE); + Filter filter = new Filter(EMPTY, join, pushableCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + // The filter should still be on top + Filter topFilter = as(optimized, Filter.class); + assertEquals(pushableCondition, topFilter.condition()); + Join optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + + // The new condition should be merged with the existing one + Expression combinedCondition = new And(EMPTY, existingCondition, pushableCondition); + assertEquals(combinedCondition, rightFilter.condition()); + + // try to apply the filter again, the plan should not change + // this verifies that the rule is idempotent + // and we will not get in an infinite loop pushing the same filter over and over + optimized = new PushDownAndCombineFilters().apply(optimized, optimizerContext); + + topFilter = as(optimized, Filter.class); + assertEquals(pushableCondition, topFilter.condition()); + optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + rightFilter = as(optimizedJoin.right(), Filter.class); + + // The new condition should be merged with the existing one + assertEquals(combinedCondition, rightFilter.condition()); + + } + + public void testDoNotPushDownExistingFilterAgain() { + Join join = createLeftJoin(); + EsRelation left = (EsRelation) join.left(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + // Existing filter on the right side + Expression existingCondition = greaterThanOf(c, ONE); + Filter existingFilter = new Filter(EMPTY, join.right(), existingCondition); + join = (Join) join.replaceRight(existingFilter); + + // A filter on top with the same condition + Filter filter = new Filter(EMPTY, join, existingCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + + // The filter should still be on top + Filter topFilter = as(optimized, Filter.class); + assertEquals(existingCondition, topFilter.condition()); + + Join optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + + // The right side should be the original filter, unchanged. + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + assertEquals(existingFilter, rightFilter); + assertEquals(existingCondition, rightFilter.condition()); + } + + public void testPushDownFilterPastLeftJoinWithExistingFilterCalledTwice() { + Join join = createLeftJoin(); + EsRelation left = (EsRelation) join.left(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + // Pushable filter + Expression pushableCondition = greaterThanOf(c, ONE); + Filter filter = new Filter(EMPTY, join, pushableCondition); + + // First optimization + LogicalPlan optimizedOnce = new PushDownAndCombineFilters().apply(filter, optimizerContext); + + // Second optimization + LogicalPlan optimizedTwice = new PushDownAndCombineFilters().apply(optimizedOnce, optimizerContext); + + // The filter should still be on top + Filter topFilter = as(optimizedTwice, Filter.class); + assertEquals(pushableCondition, topFilter.condition()); + + Join optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + + // The right side should have the filter, but not duplicated. + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + assertEquals(pushableCondition, rightFilter.condition()); + } + + public void testPushDownFilterPastLeftJoinWithNonPushable() { + Join join = createLeftJoin(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + // Non-pushable filter + Expression nonPushableCondition = new IsNull(EMPTY, c); + Filter filter = new Filter(EMPTY, join, nonPushableCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + // No optimization should be applied, the plan should be the same + assertEquals(filter, optimized); + // And the join inside should not have candidate filters + Join innerJoin = as(as(optimized, Filter.class).child(), Join.class); + assertFalse(innerJoin.right() instanceof Filter); + } + + public void testPushDownFilterPastLeftJoinWithPartiallyPushableAnd() { + Join join = createLeftJoin(); + EsRelation left = (EsRelation) join.left(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + Expression pushableCondition = greaterThanOf(c, ONE); + Expression nonPushableCondition = new IsNull(EMPTY, c); + + // Partially pushable filter + Expression partialCondition = new And(EMPTY, pushableCondition, nonPushableCondition); + Filter filter = new Filter(EMPTY, join, partialCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + Filter topFilter = as(optimized, Filter.class); + // The top filter condition should be the original one + assertEquals(partialCondition, topFilter.condition()); + Join optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + // Only the pushable part should be a candidate + assertEquals(pushableCondition, rightFilter.condition()); + } + + public void testPushDownFilterPastLeftJoinWithOr() { + Join join = createLeftJoin(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + Expression pushableCondition = greaterThanOf(c, ONE); + Expression nonPushableCondition = new IsNull(EMPTY, c); + + // OR of pushable and non-pushable filter + Expression orCondition = new Or(EMPTY, pushableCondition, nonPushableCondition); + Filter filter = new Filter(EMPTY, join, orCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + // No optimization should be applied, the plan should be the same + assertEquals(filter, optimized); + // And the join inside should not have candidate filters + Join innerJoin = as(filter.child(), Join.class); + assertFalse(innerJoin.right() instanceof Filter); + } + + public void testPushDownFilterPastLeftJoinWithNotButStillPushable() { + Join join = createLeftJoin(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + Expression pushableCondition = greaterThanOf(c, ONE); + + // negation of pushable filter, in this case it remains pushable + Expression negationOfPushableCondition = new Not(EMPTY, pushableCondition); + Filter filter = new Filter(EMPTY, join, negationOfPushableCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + Filter topFilter = as(optimized, Filter.class); + assertEquals(negationOfPushableCondition, topFilter.condition()); + Join optimizedJoin = as(topFilter.child(), Join.class); + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + assertEquals(negationOfPushableCondition, rightFilter.condition()); + } + + public void testPushDownFilterPastLeftJoinWithNotNonPushable() { + Join join = createLeftJoin(); + FieldAttribute c = (FieldAttribute) join.right().output().get(0); + + Expression nonPushableCondition = new IsNull(EMPTY, c); + + // negation of non-pushable filter makes it pushable + Expression negationOfNonPushableCondition = new Not(EMPTY, nonPushableCondition); + Filter filter = new Filter(EMPTY, join, negationOfNonPushableCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + Filter topFilter = as(optimized, Filter.class); + assertEquals(negationOfNonPushableCondition, topFilter.condition()); + Join optimizedJoin = as(topFilter.child(), Join.class); + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + assertEquals(negationOfNonPushableCondition, rightFilter.condition()); + } + + public void testPushDownFilterPastLeftJoinWithComplexMix() { + // Setup + FieldAttribute a = getFieldAttribute("a"); + FieldAttribute c = getFieldAttribute("c"); + FieldAttribute d = getFieldAttribute("d"); + FieldAttribute e = getFieldAttribute("e"); + FieldAttribute f = getFieldAttribute("f"); + FieldAttribute g = getFieldAttribute("g"); + EsRelation left = relation(List.of(a, getFieldAttribute("b"))); + EsRelation right = relation(List.of(c, d, e, f, g)); + JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(a), List.of(a), List.of(c)); + Join join = new Join(EMPTY, left, right, joinConfig); + + // Predicates + Expression p1 = greaterThanOf(c, ONE); // pushable + Expression p2 = new Not(EMPTY, new IsNull(EMPTY, d)); // pushable (d IS NOT NULL) + Expression p3 = lessThanOf(e, THREE); // pushable + Expression p4 = rlike(f, "pat"); // pushable + Expression p5 = new Not(EMPTY, new IsNull(EMPTY, g)); // pushable (g IS NOT NULL) + Expression p6 = greaterThanOf(c, TWO); // pushable + Expression p7 = lessThanOf(d, FOUR); // pushable + Expression p8 = greaterThanOf(e, FIVE); // pushable + + Expression np1 = new IsNull(EMPTY, c); // non-pushable (c IS NULL) + Expression np2 = new Equals(EMPTY, new Coalesce(EMPTY, d, List.of(SIX)), SIX); // non-pushable + + // Build a complex condition + // np2 AND ((p1 AND p2 AND p3 AND p4 AND p5) AND (np1 OR (p6 AND p7) OR (p8 AND np2))) AND p1 AND p6 + Expression pushableBranch = Predicates.combineAnd(List.of(p1, p2, p3, p4, p5)); + Expression nonPushableBranch = new Or(EMPTY, np1, new Or(EMPTY, new And(EMPTY, p6, p7), new And(EMPTY, p8, np2))); + Expression complexCondition = new And(EMPTY, pushableBranch, nonPushableBranch); + complexCondition = Predicates.combineAnd(List.of(np2, complexCondition, p1, p6)); + + Filter filter = new Filter(EMPTY, join, complexCondition); + LogicalPlan optimized = new PushDownAndCombineFilters().apply(filter, optimizerContext); + + // The top filter with the original condition should remain, but the structure of the AND tree might have changed. + // So, we flatten the conditions and compare them as a set. + Filter topFilter = as(optimized, Filter.class); + Set actualTopPredicates = new HashSet<>(Predicates.splitAnd(topFilter.condition())); + Set expectedTopPredicates = new HashSet<>(List.of(p1, p2, p3, p4, p5, nonPushableBranch, np2, p1, p6)); + assertEquals(expectedTopPredicates, actualTopPredicates); + + // The pushable part of the filter should be added as a candidate to the join + Join optimizedJoin = as(topFilter.child(), Join.class); + assertEquals(left, optimizedJoin.left()); + Filter rightFilter = as(optimizedJoin.right(), Filter.class); + Set actualPushable = new HashSet<>(Predicates.splitAnd(rightFilter.condition())); + Set expectedPushable = new HashSet<>(List.of(p1, p2, p3, p4, p5, p6)); + assertEquals(expectedPushable, actualPushable); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, languages + * {f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNULL(language_name{f}#19)] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testDoNotPushDownIsNullFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NULL + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + assertThat(join.right(), instanceOf(EsRelation.class)); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, languages + * {f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[language_name{f}#19 > a[KEYWORD]] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_Filter[language_name{f}#19 > a[KEYWORD]] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testPushDownGreaterThanFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name > "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + var rightFilter = as(join.right(), Filter.class); + assertThat(rightFilter.condition().toString(), is("language_name > \"a\"")); + } + + /** + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, languages + * {f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[COALESCE(language_name{f}#19,a[KEYWORD]) == a[KEYWORD]] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18]] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testDoNotPushDownCoalesceFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE COALESCE(language_name, "a") == "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + assertThat(join.right(), instanceOf(EsRelation.class)); + } + + /** + * + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, languages + * {f}#10 AS language_code#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNOTNULL(language_name{f}#19)] + * \_Join[LEFT,[languages{f}#10],[languages{f}#10],[language_code{f}#18],false] + * |_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_Filter[ISNOTNULL(language_name{f}#19)] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testPushDownIsNotNullFilterPastLookupJoin() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NOT NULL + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + var join = as(filter.child(), Join.class); + var rightFilter = as(join.right(), Filter.class); + assertThat(rightFilter.condition().toString(), is("language_name IS NOT NULL")); + } + + /** + * Project[[_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, gender{f}#13, hire_date{f}#18, job{f}#19, job.raw{f}#20, languages + * {f}#14 AS language_code#4, last_name{f}#15, long_noidx{f}#21, salary{f}#16, language_name{f}#23]] + * \_Limit[1000[INTEGER],false] + * \_Filter[ISNOTNULL(language_name{f}#23) AND language_name{f}#23 > a[KEYWORD] AND LIKE(language_name{f}#23, "*b", false) + * AND COALESCE(language_name{f}#23,c[KEYWORD]) == c[KEYWORD] AND RLIKE(language_name{f}#23, "f.*", false)] + * \_Join[LEFT,[languages{f}#14],[languages{f}#14],[language_code{f}#22]] + * |_EsRelation[test][_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, ..] + * \_Filter[ISNOTNULL(language_name{f}#23) AND language_name{f}#23 > a[KEYWORD] AND LIKE(language_name{f}#23, "*b", false) + * AND RLIKE(language_name{f}#23, "f.*", false)] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#22, language_name{f}#23] + */ + public void testPushDownMultipleWhere() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NOT NULL + | WHERE language_name > "a" + | WHERE language_name LIKE "*b" + | WHERE COALESCE(language_name, "c") == "c" + | WHERE language_name RLIKE "f.*" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + var topFilter = as(limit.child(), Filter.class); + + // Verify the top-level filter contains all 5 original conditions combined + Set expectedAllFilters = Set.of( + "language_name IS NOT NULL", + "language_name > \"a\"", + "language_name LIKE \"*b\"", + "COALESCE(language_name, \"c\") == \"c\"", + "language_name RLIKE \"f.*\"" + ); + + Set actualAllFilters = new HashSet<>(Predicates.splitAnd(topFilter.condition()).stream().map(Object::toString).toList()); + assertEquals(expectedAllFilters, actualAllFilters); + + // Verify the join is below the top-level filter + var join = as(topFilter.child(), Join.class); + + // Verify a new filter with only the pushable predicates has been pushed to the right side of the join + var rightFilter = as(join.right(), Filter.class); + Set expectedPushedFilters = Set.of( + "language_name IS NOT NULL", + "language_name > \"a\"", + "language_name LIKE \"*b\"", + "language_name RLIKE \"f.*\"" + ); + Set actualPushedFilters = new HashSet<>( + Predicates.splitAnd(rightFilter.condition()).stream().map(Object::toString).toList() + ); + assertEquals(expectedPushedFilters, actualPushedFilters); + } + + /** + * + * Project[[$$languages$temp_name$32{r$}#33 AS language_code#4, salary{f}#13, language_name{f}#20, _meta_field{f}#27, emp + * _no{f}#21, first_name{f}#22, gender{f}#23, hire_date{f}#28, job{f}#29, job.raw{f}#30, languages{f}#24, + * last_name{f}#25, long_noidx{f}#31]] + * \_Limit[1000[INTEGER],true] + * \_Join[LEFT,[salary{f}#13],[salary{f}#13],[salary{f}#26]] + * |_Eval[[languages{f}#11 AS $$languages$temp_name$32#33]] + * | \_Limit[1000[INTEGER],false] + * | \_Filter[language_name{f}#20 > a[KEYWORD]] + * | \_Join[LEFT,[languages{f}#11],[languages{f}#11],[language_code{f}#19]] + * | |_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..] + * | \_Filter[language_name{f}#20 > a[KEYWORD]] + * | \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] + * \_EsRelation[test_lookup][LOOKUP][_meta_field{f}#27, emp_no{f}#21, first_name{f}#22, ..] + */ + public void testPushDownFilterPastTwoLookupJoins() { + var plan = plan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | LOOKUP JOIN test_lookup ON salary + | WHERE language_name > "a" + """); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + + // The filter is pushed down past the top join, so a Join is now at the top of the plan after the limit + var topJoin = as(limit.child(), Join.class); + assertThat(topJoin.right(), instanceOf(EsRelation.class)); // No filter on the top lookup join's right side + + // Traverse down the left side of the top join to find the filter and the bottom join + var eval = as(topJoin.left(), Eval.class); + var innerLimit = as(eval.child(), Limit.class); + var topFilter = as(innerLimit.child(), Filter.class); + assertThat(topFilter.condition().toString(), is("language_name > \"a\"")); + + // make sure that the filter was pushed to the right side of the bottom join + var bottomJoin = as(topFilter.child(), Join.class); + var rightFilter = as(bottomJoin.right(), Filter.class); + assertThat(rightFilter.condition().toString(), is("language_name > \"a\"")); + } + + private Join createLeftJoin() { + FieldAttribute a = getFieldAttribute("a"); + FieldAttribute b = getFieldAttribute("b"); + FieldAttribute c = getFieldAttribute("c"); + EsRelation left = relation(List.of(a, b)); + EsRelation right = relation(List.of(c, b)); + + JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(b), List.of(a, b), List.of(b, c)); + return new Join(EMPTY, left, right, joinConfig); + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java index 79c484c916af5..beef7ee4857a3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java @@ -29,11 +29,13 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo; import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizerTests; import org.elasticsearch.xpack.esql.optimizer.TestPlannerOptimizer; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.MergeExec; @@ -295,7 +297,24 @@ public void testDateTruncBucketTransformToQueryAndTagsWithOtherPushdownFunctions } } - // ReplaceRoundToWithQueryAndTags does not support lookup joins yet + /** + * ReplaceRoundToWithQueryAndTags does not support lookup joins yet + * LimitExec[1000[INTEGER],16] + * \_AggregateExec[[x{r}#8],[COUNT(*[KEYWORD],true[BOOLEAN]) AS count(*)#9, x{r}#8],FINAL,[x{r}#8, $$count(*)$count{r}#34, $$count(* + * )$seen{r}#35],16] + * \_ExchangeExec[[x{r}#8, $$count(*)$count{r}#34, $$count(*)$seen{r}#35],true] + * \_AggregateExec[[x{r}#8],[COUNT(*[KEYWORD],true[BOOLEAN]) AS count(*)#9, x{r}#8],INITIAL,[x{r}#8, $$count(*)$count{r}#36, $$count + * (*)$seen{r}#37],16] + * \_EvalExec[[ROUNDTO(date{f}#15,1697760000000[DATETIME],1697846400000[DATETIME],1697932800000[DATETIME],1698019200000[DATE + * TIME]) AS x#8]] + * \_FieldExtractExec[date{f}#15] + * \_LookupJoinExec[[integer{f}#21],[language_code{f}#32],[]] + * |_FieldExtractExec[integer{f}#21] + * | \_EsQueryExec[test], indexMode[standard], [_doc{f}#38], limit[], sort[] estimatedRowSize[24] + * queryBuilderAndTags [[QueryBuilderAndTags{queryBuilder=[null], tags=[]}]] + * \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[ + * EsRelation[languages_lookup][LOOKUP][language_code{f}#32]]] + */ public void testDateTruncBucketNotTransformToQueryAndTagsWithLookupJoin() { for (String dateHistogram : dateHistograms) { String query = LoggerMessageFormat.format(null, """ @@ -341,14 +360,9 @@ public void testDateTruncBucketNotTransformToQueryAndTagsWithLookupJoin() { assertTrue(queryBuilder.tags().isEmpty()); assertNull(esQueryExec.query()); // rhs of lookup join - esQueryExec = as(lookupJoinExec.right(), EsQueryExec.class); - assertEquals("languages_lookup", esQueryExec.indexPattern()); - queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); - assertEquals(1, queryBuilderAndTags.size()); - queryBuilder = queryBuilderAndTags.get(0); - assertNull(queryBuilder.query()); - assertTrue(queryBuilder.tags().isEmpty()); - assertNull(esQueryExec.query()); + FragmentExec fragmentExec = as(lookupJoinExec.right(), FragmentExec.class); + EsRelation esRelation = as(fragmentExec.fragment(), EsRelation.class); + assertTrue(esRelation.toString().contains("EsRelation[languages_lookup][LOOKUP]")); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java index ac831d36f1533..c2a012cd8e853 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java @@ -53,6 +53,7 @@ protected final NamedWriteableRegistry getNamedWriteableRegistry() { entries.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables()); // Query builders entries.add(Add.ENTRY); // Used by the eval tests entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY); + entries.add(LookupJoinExec.ENTRY); return new NamedWriteableRegistry(entries); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExecSerializationTests.java new file mode 100644 index 0000000000000..0fedf104f1df1 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExecSerializationTests.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.physical; + +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.tree.Source; + +import java.io.IOException; +import java.util.List; + +public class LookupJoinExecSerializationTests extends AbstractPhysicalPlanSerializationTests { + public static LookupJoinExec randomLookupJoinExec(int depth) { + Source source = randomSource(); + PhysicalPlan child = randomChild(depth); + PhysicalPlan lookup = randomChild(depth); + List leftFields = randomFields(); + List rightFields = randomFields(); + List addedFields = randomFields(); + return new LookupJoinExec(source, child, lookup, leftFields, rightFields, addedFields); + } + + private static List randomFields() { + return randomFieldAttributes(1, 5, false); + } + + @Override + protected LookupJoinExec createTestInstance() { + return randomLookupJoinExec(0); + } + + @Override + protected LookupJoinExec mutateInstance(LookupJoinExec instance) throws IOException { + PhysicalPlan child = instance.left(); + PhysicalPlan lookup = instance.lookup(); + List leftFields = randomFields(); + List rightFields = randomFields(); + List addedFields = randomFields(); + switch (between(0, 4)) { + case 0 -> child = randomValueOtherThan(child, () -> randomChild(0)); + case 1 -> lookup = randomValueOtherThan(lookup, () -> randomChild(0)); + case 2 -> leftFields = randomValueOtherThan(leftFields, LookupJoinExecSerializationTests::randomFields); + case 3 -> rightFields = randomValueOtherThan(rightFields, LookupJoinExecSerializationTests::randomFields); + case 4 -> addedFields = randomValueOtherThan(addedFields, LookupJoinExecSerializationTests::randomFields); + default -> throw new UnsupportedOperationException(); + } + return new LookupJoinExec(instance.source(), child, lookup, leftFields, rightFields, addedFields); + } + + @Override + protected boolean alwaysEmptySource() { + return true; + } +}