Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bcde6be
LookupJoin prejoin filter POC WIP
julian-elastic Jul 28, 2025
d24dab3
Get basic case with translatable filters to work
julian-elastic Jul 29, 2025
9603a7c
Fix failing UTs
julian-elastic Aug 13, 2025
e73996f
Fix failing UTs part 2
julian-elastic Aug 14, 2025
278877a
Add additional checks for right pushable filters
julian-elastic Aug 14, 2025
ec9817d
Merge branch 'main' into lookupPrefilter_v2
julian-elastic Aug 15, 2025
a59d0de
Update docs/changelog/132889.yaml
julian-elastic Aug 15, 2025
6e6e28e
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
018b40d
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
2ee02a1
bugfix
julian-elastic Aug 16, 2025
abfe672
Limit the change to pushable filters only, make the filter optional
julian-elastic Aug 19, 2025
7f82362
Add more UTs
julian-elastic Aug 20, 2025
ba9ab52
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 20, 2025
2aa2b49
Clean up, add more UTs
julian-elastic Aug 20, 2025
0e7b3ed
Update docs/changelog/133166.yaml
julian-elastic Aug 20, 2025
66c126f
Fix a bug where a mix of pushable and non-pushable filters resulted i…
julian-elastic Aug 21, 2025
84d2dcb
Address code review comments, add UTs
julian-elastic Aug 22, 2025
6b41aa9
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 22, 2025
d32cdc4
Fix formatting for UT
julian-elastic Aug 22, 2025
68d319b
Switch to storing the optional filter in the RHS of the Join
julian-elastic Aug 26, 2025
5405235
Address code review feedback
julian-elastic Aug 26, 2025
03796e0
Fix merge errors
julian-elastic Aug 27, 2025
c15df0a
Address a missed comment
julian-elastic Aug 27, 2025
63018a7
Switch to passing local logical plan to lookup node
julian-elastic Aug 27, 2025
76b4042
Switch to passing local logical plan to lookup node
julian-elastic Aug 28, 2025
4205693
Address more code review feedback
julian-elastic Aug 28, 2025
5a2b2fd
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
3c39e90
fix failing UT
julian-elastic Aug 28, 2025
35121eb
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
40c6d7a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
242455f
Address more code review comments
julian-elastic Aug 28, 2025
f5cb543
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 2, 2025
f7ff90e
Address code review comments
julian-elastic Sep 2, 2025
3116545
Address code review comments, part 2
julian-elastic Sep 2, 2025
7a8af28
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
c0733f7
Address more code review comments
julian-elastic Sep 3, 2025
166130f
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
0550dae
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
328af0a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,24 +355,9 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00);
public static final TransportVersion NODE_WEIGHTS_ADDED_TO_NODE_BALANCE_STATS = def(9_129_0_00);
public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00);
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00);
public static final TransportVersion ML_INFERENCE_AI21_COMPLETION_ADDED = def(9_134_0_00);
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00);
public static final TransportVersion DATA_STREAM_WRITE_INDEX_ONLY_SETTINGS = def(9_142_0_00);
public static final TransportVersion SCRIPT_RESCORER = def(9_143_0_00);
public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_145_0_00);
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_147_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public Query regexpQuery(
}

/**
* Returns a Lucine pushable Query for the current field
* Returns a Lucene pushable Query for the current field
* For now can only be AutomatonQuery or MatchAllDocsQuery() or MatchNoDocsQuery()
*/
public Query automatonQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
}
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
// we add both a Lucine pushable filter and a non-pushable filter
// we add both a Lucene pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil
return page;
}

private Query nextQuery() throws IOException {
private Query nextQuery() {
++queryPosition;
while (isFinished() == false) {
Query query = queryList.getQuery(queryPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected final Operator.OperatorFactory simple() {
/**
* Makes sure the description of {@link #simple} matches the {@link #expectedDescriptionOfSimple}.
*/
public final void testSimpleDescription() {
public void testSimpleDescription() {
Operator.OperatorFactory factory = simple();
String description = factory.describe();
assertThat(description, expectedDescriptionOfSimple());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5178,6 +5178,33 @@ id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:int
14 | Nina | foo2 | omicron | 15000
;

lookupJoinWithPushableFilterOnRightOneField
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields

FROM multi_column_joinable
| LOOKUP JOIN multi_column_joinable_lookup ON id_int
| WHERE other2 > 5000
| KEEP id_int, name_str, extra1, other1, other2
| SORT id_int, name_str, extra1, other1, other2
| LIMIT 20
;

warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int] failed, treating result as null. Only first 20 failures recorded.
warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value

id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
4 | David | qux | zeta | 6000
5 | Eve | quux | eta | 7000
5 | Eve | quux | theta | 8000
6 | null | corge | iota | 9000
7 | Grace | grault | kappa | 10000
8 | Hank | garply | lambda | 11000
12 | Liam | xyzzy | nu | 13000
13 | Mia | thud | xi | 14000
14 | Nina | foo2 | omicron | 15000
;

lookupJoinWithTwoPushableFiltersOnRight
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields
Expand All @@ -5201,6 +5228,69 @@ id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:int
6 | null | corge | iota | 9000
;

lookupJoinWithCoalesceFilterOnRight
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields

FROM multi_column_joinable
| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
| WHERE COALESCE(other1, "zeta") == "zeta"
| KEEP id_int, name_str, extra1, other1, other2
| SORT id_int, name_str, extra1, other1, other2
| LIMIT 20
;

warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value

id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
[1, 19, 21] | null | zyx | null | null
4 | David | qux | zeta | 6000
9 | null | waldo | null | null
10 | null | fred | null | null
15 | null | bar2 | null | null
[17, 18] | null | xyz | null | null
null | null | plugh | null | null
;

lookupJoinWithIsNullFilterOnRight
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields

FROM multi_column_joinable
| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
| WHERE other1 IS NULL
| KEEP id_int, name_str, extra1, other1, other2
| SORT id_int
;

warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value

id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
[1, 19, 21] | null | zyx | null | null
9 | null | waldo | null | null
10 | null | fred | null | null
15 | null | bar2 | null | null
[17, 18] | null | xyz | null | null
null | null | plugh | null | null
;

lookupJoinWithIsNullJoinKey
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields

FROM multi_column_joinable
| LOOKUP JOIN multi_column_joinable_lookup ON name_str
| WHERE name_str IS NULL
| KEEP id_int, name_str, extra1, other1, other2
| SORT id_int
;

id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
null | null | corge | null | null
;

lookupJoinWithMixLeftAndRightFilters
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields
Expand Down Expand Up @@ -5294,6 +5384,27 @@ warning:Line 3:18: java.lang.IllegalArgumentException: single-value function enc
13 | Mia | thud | xi | 14000
14 | Nina | foo2 | omicron | 15000
;

lookupJoinWithMixPushableAndUnpushableFilters
required_capability: join_lookup_v12
required_capability: lookup_join_on_multiple_fields

FROM multi_column_joinable
| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
| WHERE other2 > id_int + 5000 AND (extra1 == "qux" OR extra1 == "zyx") AND other1 like "*ta" AND ABS(other2) > 5500
| KEEP id_int, name_str, extra1, other1, other2
| SORT id_int, name_str, extra1, other1, other2
| LIMIT 20
;

warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
warning:Line 3:18: evaluation of [id_int + 5000] failed, treating result as null. Only first 20 failures recorded.
warning:Line 3:18: java.lang.IllegalArgumentException: single-value function encountered multi-value

id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
4 | David | qux | zeta | 6000
;

lookupJoinWithJoinAttrFilter
required_capability: join_lookup_v12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.compute.test.BlockTestUtils;
import org.elasticsearch.compute.test.TestDriverFactory;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -63,6 +64,10 @@
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
import org.elasticsearch.xpack.esql.enrich.MatchConfig;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -229,17 +234,19 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
}
}

private List<Expression> buildGreaterThanFilter(long value) {
private PhysicalPlan buildGreaterThanFilter(long value) {
FieldAttribute filterAttribute = new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
);
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
return List.of(greaterThan);
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), List.of());
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
return new FragmentExec(filter);
}

private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, List<Expression> filters) throws IOException {
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan filters) throws IOException {
String[] fieldMappers = new String[keyTypes.size() * 2];
for (int i = 0; i < keyTypes.size(); i++) {
fieldMappers[2 * i] = "key" + i;
Expand Down Expand Up @@ -269,9 +276,9 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();

Predicate<Integer> filterPredicate = l -> true;
if (filters != null) {
if (filters.size() == 1
&& filters.get(0) instanceof GreaterThan gt
if (filters instanceof FragmentExec fragmentExec) {
if (fragmentExec.fragment() instanceof Filter filter
&& filter.condition() instanceof GreaterThan gt
&& gt.left() instanceof FieldAttribute fa
&& fa.name().equals("l")
&& gt.right() instanceof Literal lit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
import org.elasticsearch.xpack.esql.stats.SearchContextStats;

Expand All @@ -29,9 +34,10 @@
import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER;

/**
* A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query.
* A {@link LookupEnrichQueryGenerator} that combines one or more {@link QueryList}s into a single query.
* Each query in the resulting query will be a conjunction of all queries from the input lists at the same position.
* In the future we can extend this to support more complex expressions, such as disjunctions or negations.
* In addition, we support an optional pre-join filter that will be applied to all queries if it is pushable.
* If the pre-join filter cannot be pushed down to Lucene, it will be ignored.
*/
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
private static final Logger logger = LogManager.getLogger(ExpressionQueryList.class);
Expand All @@ -42,43 +48,61 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator {
public ExpressionQueryList(
List<QueryList> queryLists,
SearchExecutionContext context,
List<Expression> candidateRightHandFilters,
PhysicalPlan rightPreJoinPlan,
ClusterService clusterService
) {
if (queryLists.size() < 2 && (candidateRightHandFilters == null || candidateRightHandFilters.isEmpty())) {
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
if (queryLists.size() < 2 && (rightPreJoinPlan instanceof FilterExec == false)) {
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists or a pre-join filter");
}
this.queryLists = queryLists;
this.context = context;
buildPrePostJoinFilter(candidateRightHandFilters, clusterService);
buildPreJoinFilter(rightPreJoinPlan, clusterService);
}

private void buildPrePostJoinFilter(List<Expression> candidateRightHandFilters, ClusterService clusterService) {
if (candidateRightHandFilters == null || candidateRightHandFilters.isEmpty()) {
return; // no filters to apply
private void addToPreJoinFilters(org.elasticsearch.index.query.QueryBuilder query) {
try {
if (query != null) {
preJoinFilters.add(query.toQuery(context));
}
} catch (IOException e) {
// as we treat the filter as optional an error in its application will be ignored
logger.error(() -> "Failed to translate optional pre-join filter: [" + query + "]", e);
}
for (Expression filter : candidateRightHandFilters) {
try {
}

private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) {
if (rightPreJoinPlan instanceof EsQueryExec esQueryExec) {
// this does not happen right now, as we only do local mapping on the lookup node
// so we have EsSourceExec, not esQueryExec
if (esQueryExec.query() != null) {
addToPreJoinFilters(esQueryExec.query());
}
} else if (rightPreJoinPlan instanceof FilterExec filterExec) {
List<Expression> candidateRightHandFilters = Predicates.splitAnd(filterExec.condition());
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
SearchContextStats.from(List.of(context)),
new EsqlFlags(clusterService.getClusterSettings())
);
for (Expression filter : candidateRightHandFilters) {
if (filter instanceof TranslationAware translationAware) {
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
SearchContextStats.from(List.of(context)),
new EsqlFlags(clusterService.getClusterSettings())
);
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
preJoinFilters.add(
translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context)
);
addToPreJoinFilters(translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder());
}
}
// If the filter is not translatable we will not apply it for now
// as performance testing showed no performance improvement.
// We can revisit this in the future if needed, once we have more optimized workflow in place.
// The filter is optional, so it is OK to ignore it if it cannot be translated.
} catch (IOException e) {
// as the filter is optional an error in its application will be ignored
logger.error(() -> "Failed to translate optional pre-join filter: [" + filter + "]", e);
}
// call recursively to find other filters that might be present
// either in another FilterExec or in an EsQueryExec
buildPreJoinFilter(filterExec.child(), clusterService);
} else if (rightPreJoinPlan instanceof UnaryExec unaryExec) {
// there can be other nodes in the plan such as FieldExtractExec in the future
buildPreJoinFilter(unaryExec.child(), clusterService);
}
// else we do nothing, as the filters are optional and we don't want to fail the query if there are any errors
// this also covers the case of rightPreJoinPlan being null
}

@Override
Expand Down
Loading