Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/136104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136104
summary: Add support for Full Text Functions for Lookup Join
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -861,11 +861,22 @@ private Map<String, Object> lookupExplosion(
}
}
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
// we add both a Lucene pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);

boolean applyAsExpressionJoinFilter = expressionBasedJoin && randomBoolean();
// we randomly add the filter after the join or as part of the join
// in both cases we should have the same amount of results
if (applyAsExpressionJoinFilter == false) {
// add a filter after the join to reduce the number of matches
// we add both a Lucene pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);
} else {
// apply the filter as part of the join
// then we filter out the rows that do not match the filter after
// so the number of rows is the same as in the field based join case
// and can get the same number of rows for verification purposes
query.append(" AND filter_key < ").append(lookupEntriesToKeep);
query.append(" | WHERE filter_key IS NOT NULL ");
}
}
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
return responseAsMap(query(query.toString(), null));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private PhysicalPlan buildGreaterThanFilter(long value) {
return new FragmentExec(filter);
}

private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan filters) throws IOException {
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException {
String[] fieldMappers = new String[keyTypes.size() * 2];
for (int i = 0; i < keyTypes.size(); i++) {
fieldMappers[2 * i] = "key" + i;
Expand Down Expand Up @@ -283,17 +283,8 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();

Predicate<Integer> filterPredicate = l -> true;
if (filters instanceof FragmentExec fragmentExec) {
if (fragmentExec.fragment() instanceof Filter filter
&& filter.condition() instanceof GreaterThan gt
&& gt.left() instanceof FieldAttribute fa
&& fa.name().equals("l")
&& gt.right() instanceof Literal lit) {
long value = ((Number) lit.value()).longValue();
filterPredicate = l -> l > value;
} else {
fail("Unsupported filter type in test baseline generation: " + filters);
}
if (pushedDownFilter instanceof FragmentExec fragmentExec && fragmentExec.fragment() instanceof Filter filter) {
filterPredicate = getPredicateFromFilter(filter);
}

int docCount = between(10, 1000);
Expand Down Expand Up @@ -396,6 +387,16 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
new EsField("rkey" + i, keyTypes.get(i), Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
);
joinOnConditions.add(new Equals(Source.EMPTY, leftAttr, rightAttr));
// randomly decide to apply the filter as additional join on filter instead of pushed down filter
boolean applyAsJoinOnCondition = EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION_V2.isEnabled()
? randomBoolean()
: false;
if (applyAsJoinOnCondition
&& pushedDownFilter instanceof FragmentExec fragmentExec
&& fragmentExec.fragment() instanceof Filter filter) {
joinOnConditions.add(filter.condition());
pushedDownFilter = null;
}
}
}
// the matchFields are shared for both types of join
Expand All @@ -412,7 +413,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
"lookup",
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
Source.EMPTY,
filters,
pushedDownFilter,
Predicates.combineAnd(joinOnConditions)
);
DriverContext driverContext = driverContext();
Expand Down Expand Up @@ -478,6 +479,19 @@ protected void start(Driver driver, ActionListener<Void> driverListener) {
}
}

private static Predicate<Integer> getPredicateFromFilter(Filter filter) {
if (filter.condition() instanceof GreaterThan gt
&& gt.left() instanceof FieldAttribute fa
&& fa.name().equals("l")
&& gt.right() instanceof Literal lit) {
long value = ((Number) lit.value()).longValue();
return l -> l > value;
} else {
fail("Unsupported filter type in test baseline generation: " + filter);
}
return null;
}

/**
* Creates a {@link BigArrays} that tracks releases but doesn't throw circuit breaking exceptions.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,27 @@ public void testMatchWithLookupJoin() {
);
}

public void testMatchWithLookupJoinOnMatch() {
var query = """
FROM test
| rename id as id_left
| LOOKUP JOIN test_lookup ON id_left == id and MATCH(lookup_content, "fox")
| WHERE id > 0
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("content", "id_left", "id", "lookup_content"));
assertColumnTypes(resp.columns(), List.of("text", "integer", "integer", "text"));
// Should return rows where lookup_content matches "fox" (ids 1 and 6)
assertValues(
resp.values(),
List.of(
List.of("This is a brown fox", 1, 1, "This is a brown fox"),
List.of("The quick brown fox jumps over the lazy dog", 6, 6, "The quick brown fox jumps over the lazy dog")
)
);
}
}

static void createAndPopulateIndex(Consumer<String[]> ensureYellow) {
var indexName = "test";
var client = client().admin().indices();
Expand Down Expand Up @@ -341,5 +362,19 @@ static void createAndPopulateLookupIndex(IndicesAdminClient client, String looku
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup"))
.setMapping("id", "type=integer", "lookup_content", "type=text");
assertAcked(createRequest);

// Populate the lookup index with test data
client().prepareBulk()
.add(new IndexRequest(lookupIndexName).id("1").source("id", 1, "lookup_content", "This is a brown fox"))
.add(new IndexRequest(lookupIndexName).id("2").source("id", 2, "lookup_content", "This is a brown dog"))
.add(new IndexRequest(lookupIndexName).id("3").source("id", 3, "lookup_content", "This dog is really brown"))
.add(
new IndexRequest(lookupIndexName).id("4")
.source("id", 4, "lookup_content", "The dog is brown but this document is very very long")
)
.add(new IndexRequest(lookupIndexName).id("5").source("id", 5, "lookup_content", "There is also a white cat"))
.add(new IndexRequest(lookupIndexName).id("6").source("id", 6, "lookup_content", "The quick brown fox jumps over the lazy dog"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,11 @@ public enum Cap {
INLINE_STATS_SUPPORTS_REMOTE(INLINESTATS_V11.enabled),

INLINE_STATS_WITH_UNION_TYPES_IN_STUB_RELATION(INLINE_STATS.enabled),
/**
* Lookup join with Full Text Function or other Lucene Pushable condition
* to be applied to the lookup index used
*/
LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION_V2,

/**
* Support TS command in non-snapshot builds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.analysis.AnalyzerRules.ParameterizedAnalyzerRule;
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.Alias;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.inference.ResolvedInference;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateExpressions;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
Expand Down Expand Up @@ -162,6 +164,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.esql.capabilities.TranslationAware.translatable;
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
Expand Down Expand Up @@ -724,50 +727,78 @@ private LogicalPlan resolveLookup(Lookup l, List<Attribute> childrenOutput) {

private List<Expression> resolveJoinFiltersAndSwapIfNeeded(
List<Expression> filters,
AttributeSet leftOutput,
AttributeSet rightOutput
AttributeSet leftChildOutput,
AttributeSet rightChildOutput,
List<Attribute> leftJoinKeysToPopulate,
List<Attribute> rightJoinKeysToPopulate
) {
if (filters.isEmpty()) {
return emptyList();
}
List<Attribute> childrenOutput = new ArrayList<>(leftOutput);
childrenOutput.addAll(rightOutput);
List<Attribute> childrenOutput = new ArrayList<>(leftChildOutput);
childrenOutput.addAll(rightChildOutput);

List<Expression> resolvedFilters = new ArrayList<>(filters.size());
for (Expression filter : filters) {
Expression filterResolved = filter.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
resolvedFilters.add(resolveAndOrientJoinCondition(filterResolved, leftOutput, rightOutput));
Expression result = resolveAndOrientJoinCondition(
filterResolved,
leftChildOutput,
rightChildOutput,
leftJoinKeysToPopulate,
rightJoinKeysToPopulate
);
resolvedFilters.add(result);
}
return resolvedFilters;
}

private Expression resolveAndOrientJoinCondition(Expression condition, AttributeSet leftOutput, AttributeSet rightOutput) {
private Expression resolveAndOrientJoinCondition(
Expression condition,
AttributeSet leftChildOutput,
AttributeSet rightChildOutput,
List<Attribute> leftJoinKeysToPopulate,
List<Attribute> rightJoinKeysToPopulate
) {
if (condition instanceof EsqlBinaryComparison comp
&& comp.left() instanceof Attribute leftAttr
&& comp.right() instanceof Attribute rightAttr) {

boolean leftIsFromLeft = leftOutput.contains(leftAttr);
boolean rightIsFromRight = rightOutput.contains(rightAttr);
boolean leftIsFromLeft = leftChildOutput.contains(leftAttr);
boolean rightIsFromRight = rightChildOutput.contains(rightAttr);

if (leftIsFromLeft && rightIsFromRight) {
leftJoinKeysToPopulate.add(leftAttr);
rightJoinKeysToPopulate.add(rightAttr);
return comp; // Correct orientation
}

boolean leftIsFromRight = rightOutput.contains(leftAttr);
boolean rightIsFromLeft = leftOutput.contains(rightAttr);
boolean leftIsFromRight = rightChildOutput.contains(leftAttr);
boolean rightIsFromLeft = leftChildOutput.contains(rightAttr);

if (leftIsFromRight && rightIsFromLeft) {
leftJoinKeysToPopulate.add(rightAttr);
rightJoinKeysToPopulate.add(leftAttr);
return comp.swapLeftAndRight(); // Swapped orientation
}
}
return handleRightOnlyPushableFilter(condition, rightChildOutput);
}

private Expression handleRightOnlyPushableFilter(Expression condition, AttributeSet rightChildOutput) {
if (isCompletelyRightSideAndTranslationAware(condition, rightChildOutput)) {
// The condition is completely on the right side and is translation aware, so it can be (potentially) pushed down
return condition;
} else {
// The condition cannot be used in the join on clause for now
// It is not a binary comparison between left and right attributes
// It is not using fields from the right side only and translation aware
return new UnresolvedAttribute(
condition.source(),
"unsupported",
"Join condition must be between one attribute on the left side and "
+ "one attribute on the right side of the join, but found: "
+ condition.sourceText()
"Unsupported join filter expression:" + condition.sourceText()
);
}
return condition; // Not a binary comparison between two attributes, no change needed.
}

private Join resolveLookupJoin(LookupJoin join) {
Expand All @@ -787,37 +818,22 @@ private Join resolveLookupJoin(LookupJoin join) {
List<Attribute> leftKeys = new ArrayList<>();
List<Attribute> rightKeys = new ArrayList<>();
List<Expression> resolvedFilters = new ArrayList<>();
Expression joinOnConditions = null;
if (join.config().joinOnConditions() != null) {
resolvedFilters = resolveJoinFiltersAndSwapIfNeeded(
Predicates.splitAnd(join.config().joinOnConditions()),
join.left().outputSet(),
join.right().outputSet()
join.right().outputSet(),
leftKeys,
rightKeys
);
// build leftKeys and rightKeys using the correct side of the resolvedFilters.
// resolveJoinFiltersAndSwapIfNeeded already put the left and right on the correct side
for (Expression expression : resolvedFilters) {
if (expression instanceof EsqlBinaryComparison binaryComparison
&& binaryComparison.left() instanceof Attribute leftAttribute
&& binaryComparison.right() instanceof Attribute rightAttribute) {
leftKeys.add(leftAttribute);
rightKeys.add(rightAttribute);
} else {
UnresolvedAttribute errorAttribute = new UnresolvedAttribute(
expression.source(),
"unsupported",
"Unsupported join filter expression:" + expression.sourceText()
);
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), null));

}
}
joinOnConditions = Predicates.combineAndWithSource(resolvedFilters, join.config().joinOnConditions().source());
} else {
// resolve the using columns against the left and the right side then assemble the new join config
leftKeys = resolveUsingColumns(join.config().leftFields(), join.left().output(), "left");
rightKeys = resolveUsingColumns(join.config().rightFields(), join.right().output(), "right");
}

config = new JoinConfig(type, leftKeys, rightKeys, Predicates.combineAnd(resolvedFilters));
config = new JoinConfig(type, leftKeys, rightKeys, joinOnConditions);
return new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote());
} else {
// everything else is unsupported for now
Expand All @@ -827,6 +843,33 @@ private Join resolveLookupJoin(LookupJoin join) {
}
}

private boolean isCompletelyRightSideAndTranslationAware(Expression expression, AttributeSet rightOutputSet) {
// Check if all references in the expression are from the right side
boolean isCompletelyRightSide = rightOutputSet.containsAll(expression.references());

if (isCompletelyRightSide == false) {
return false;
}

// Check if the expression and all its subexpressions implement TranslationAware
// and are translatable to Lucene
return isTranslationAware(expression);
}

private boolean isTranslationAware(Expression expression) {
// Check if the expression itself implements TranslationAware
if (expression instanceof TranslationAware == false) {
return false;
}

// Check if the expression is translatable
TranslationAware.Translatable translatable = translatable(expression, LucenePushdownPredicates.DEFAULT);
if (translatable == TranslationAware.Translatable.NO) {
return false;
}
return true;
}

private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
// we align the outputs of the sub plans such that they have the same columns
boolean changed = false;
Expand Down
Loading