Skip to content

Commit e3bbdd7

Browse files
Bugfix
1 parent f24c821 commit e3bbdd7

File tree

4 files changed

+112
-56
lines changed

4 files changed

+112
-56
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinGeneralExpressionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private void loadSingleDataset(
224224
}
225225

226226
private EsqlQueryResponse runQuery(String query) {
227-
return run(syncEsqlQueryRequest().query(query));
227+
return run(syncEsqlQueryRequest(query));
228228
}
229229

230230
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,15 @@
7979
import org.elasticsearch.xpack.esql.core.expression.Expression;
8080
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
8181
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
82+
import org.elasticsearch.xpack.esql.core.expression.NameId;
8283
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
83-
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
8484
import org.elasticsearch.xpack.esql.core.tree.Source;
8585
import org.elasticsearch.xpack.esql.core.type.DataType;
8686
import org.elasticsearch.xpack.esql.core.type.EsField;
8787
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
8888
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
89+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
90+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
8991
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
9092
import org.elasticsearch.xpack.esql.planner.Layout;
9193
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -94,7 +96,6 @@
9496
import java.io.IOException;
9597
import java.util.ArrayList;
9698
import java.util.Collections;
97-
import java.util.HashMap;
9899
import java.util.HashSet;
99100
import java.util.List;
100101
import java.util.Map;
@@ -586,38 +587,46 @@ private List<MatchConfig> collectLeftSideFieldsToBroadcast(LookupEnrichQueryGene
586587
List<Expression> postJoinFilterExpressions = postJoinFilterable.getPostJoinFilter();
587588
if (postJoinFilterExpressions.isEmpty() == false) {
588589
LookupFromIndexService.TransportRequest lookupRequest = (LookupFromIndexService.TransportRequest) request;
589-
// Build a map of matchField names for quick lookup
590-
Map<String, MatchConfig> matchFieldsByName = new HashMap<>();
591-
for (MatchConfig matchField : lookupRequest.getMatchFields()) {
592-
matchFieldsByName.put(matchField.fieldName(), matchField);
593-
}
594-
// Build a set of extractFields names to avoid adding duplicates
595-
Set<String> extractFieldNames = new HashSet<>();
590+
// Build a set of extractFields NameIDs to avoid adding duplicates
591+
Set<NameId> extractFieldNameIds = new HashSet<>();
596592
for (NamedExpression extractField : request.extractFields) {
597-
extractFieldNames.add(extractField.name());
593+
extractFieldNameIds.add(extractField.id());
594+
}
595+
// Collect right-side field NameIDs from EsRelation in rightPreJoinPlan
596+
Set<NameId> rightSideFieldNameIds = new HashSet<>();
597+
if (lookupRequest.getRightPreJoinPlan() instanceof FragmentExec fragmentExec) {
598+
fragmentExec.fragment().forEachDown(EsRelation.class, esRelation -> {
599+
for (Attribute attr : esRelation.output()) {
600+
rightSideFieldNameIds.add(attr.id());
601+
}
602+
});
598603
}
599-
// Extract FieldAttributes from post-join filter expressions and check if they are in matchFields
600-
Map<String, TypedAttribute> fieldsToBroadcast = new HashMap<>();
604+
605+
// Track which NameIDs we've already added to avoid duplicates
606+
Set<NameId> addedNameIds = new HashSet<>();
607+
// Traverse filter expressions and match attributes to MatchConfigs by NameID
608+
// Exclude right-side fields and fields already in extractFields
601609
for (Expression filterExpr : postJoinFilterExpressions) {
602610
for (Attribute attr : filterExpr.references()) {
603-
if (attr instanceof TypedAttribute typedAttribute) {
604-
String fieldName = attr.name();
605-
// Check if this field is in matchFields (left-side field with a channel)
606-
if (matchFieldsByName.containsKey(fieldName)) {
607-
fieldsToBroadcast.put(fieldName, typedAttribute);
611+
NameId nameId = attr.id();
612+
// Skip if right-side field, already in extractFields, or already found a match for
613+
if (rightSideFieldNameIds.contains(nameId)
614+
|| extractFieldNameIds.contains(nameId)
615+
|| addedNameIds.contains(nameId)) {
616+
continue;
617+
}
618+
// Find the corresponding MatchConfig for this attribute
619+
// we do match by just name
620+
// we made sure the same attribute is not on the right side with the checks above
621+
for (MatchConfig matchField : lookupRequest.getMatchFields()) {
622+
if (attr.name().equals(matchField.fieldName())) {
623+
builder.append(attr);
624+
allLeftFieldsToBroadcast.add(matchField);
625+
addedNameIds.add(nameId);
626+
break;
608627
}
609628
}
610-
}
611-
}
612-
// Add matching fields to layout and broadcast list, but skip if already in extractFields
613-
for (Map.Entry<String, TypedAttribute> entry : fieldsToBroadcast.entrySet()) {
614-
String fieldName = entry.getKey();
615-
TypedAttribute typedAttribute = entry.getValue();
616-
MatchConfig matchField = matchFieldsByName.get(fieldName);
617-
// Only add if not already in extractFields to avoid duplicates
618-
if (extractFieldNames.contains(fieldName) == false) {
619-
builder.append(typedAttribute);
620-
allLeftFieldsToBroadcast.add(matchField);
629+
621630
}
622631
}
623632
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
2626
import org.elasticsearch.xpack.esql.core.expression.Attribute;
2727
import org.elasticsearch.xpack.esql.core.expression.Expression;
28+
import org.elasticsearch.xpack.esql.core.expression.NameId;
2829
import org.elasticsearch.xpack.esql.core.type.DataType;
2930
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
3031
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
@@ -65,6 +66,7 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator, PostJoin
6566
private final SearchExecutionContext context;
6667
private final AliasFilter aliasFilter;
6768
private final LucenePushdownPredicates lucenePushdownPredicates;
69+
private final Set<NameId> rightSideFieldNameIds;
6870
private List<Expression> postJoinFilter;
6971
private int inputPagePositionCount = -1;
7072

@@ -82,10 +84,23 @@ private ExpressionQueryList(
8284
SearchContextStats.from(List.of(context)),
8385
new EsqlFlags(clusterService.getClusterSettings())
8486
);
87+
this.rightSideFieldNameIds = collectRightSideFieldNameIds(rightPreJoinPlan);
8588
postJoinFilter = new ArrayList<>();
8689
buildPreJoinFilter(rightPreJoinPlan, clusterService);
8790
}
8891

92+
private static Set<NameId> collectRightSideFieldNameIds(PhysicalPlan rightPreJoinPlan) {
93+
Set<NameId> rightSideFieldNameIds = new HashSet<>();
94+
if (rightPreJoinPlan != null) {
95+
rightPreJoinPlan.forEachDown(EsSourceExec.class, esSourceExec -> {
96+
for (Attribute attr : esSourceExec.output()) {
97+
rightSideFieldNameIds.add(attr.id());
98+
}
99+
});
100+
}
101+
return rightSideFieldNameIds;
102+
}
103+
89104
/**
90105
* Creates a new {@link ExpressionQueryList} for a field-based join.
91106
* A field-based join is a join where the join conditions are based on the equality of fields from the left and right datasets.
@@ -155,17 +170,12 @@ private void buildJoinOnForExpressionJoin(
155170
this.inputPagePositionCount = inputPage.getPositionCount();
156171
List<Expression> expressions = Predicates.splitAnd(joinOnConditions);
157172

158-
// Build set of left-side field names for categorization
159-
Set<String> leftSideFieldNames = new HashSet<>();
160-
for (MatchConfig matchField : matchFields) {
161-
leftSideFieldNames.add(matchField.fieldName());
162-
}
163-
164173
// Split expressions into left-only, right-only, and mixed
174+
// Anything not in right-side fields is left-side
165175
List<Expression> leftOnlyExpressions = new ArrayList<>();
166176
List<Expression> rightOnlyExpressions = new ArrayList<>();
167177
List<Expression> mixedExpressions = new ArrayList<>();
168-
splitExpressionsBySide(expressions, leftSideFieldNames, leftOnlyExpressions, rightOnlyExpressions, mixedExpressions);
178+
splitExpressionsBySide(expressions, rightSideFieldNameIds, leftOnlyExpressions, rightOnlyExpressions, mixedExpressions);
169179

170180
// Process mixed expressions - try as left-right binary comparison first
171181
// If that fails, add to post-join filter
@@ -191,7 +201,7 @@ private void buildJoinOnForExpressionJoin(
191201

192202
private void splitExpressionsBySide(
193203
List<Expression> expressions,
194-
Set<String> leftSideFieldNames,
204+
Set<NameId> rightSideFieldNameIds,
195205
List<Expression> leftOnlyExpressions,
196206
List<Expression> rightOnlyExpressions,
197207
List<Expression> mixedExpressions
@@ -204,10 +214,11 @@ private void splitExpressionsBySide(
204214
boolean hasRightSide = false;
205215

206216
for (Attribute attr : allAttributes) {
207-
if (leftSideFieldNames.contains(attr.name())) {
208-
hasLeftSide = true;
209-
} else {
217+
NameId nameId = attr.id();
218+
if (rightSideFieldNameIds.contains(nameId)) {
210219
hasRightSide = true;
220+
} else {
221+
hasLeftSide = true;
211222
}
212223
}
213224

@@ -222,19 +233,13 @@ private void splitExpressionsBySide(
222233
}
223234

224235
private boolean applyAsRightSidePushableFilter(Expression filter, List<MatchConfig> matchFields) {
225-
// First check if this filter only references right-side attributes
226-
// Right-side attributes are those NOT in matchFields (which are left-side fields)
227-
Set<String> leftSideFieldNames = new HashSet<>();
228-
for (MatchConfig matchField : matchFields) {
229-
leftSideFieldNames.add(matchField.fieldName());
230-
}
231236
// Check if any attribute in the filter expression tree is from the left side
232237
// We need to traverse the entire expression tree, not just top-level references,
233238
// because some functions may have attributes nested in their children
234239
List<Attribute> allAttributes = new ArrayList<>();
235240
filter.forEachDown(Attribute.class, allAttributes::add);
236241
for (Attribute attr : allAttributes) {
237-
if (leftSideFieldNames.contains(attr.name())) {
242+
if (rightSideFieldNameIds.contains(attr.id()) == false) {
238243
// This filter references a left-side attribute, so it cannot be pushed to Lucene
239244
return false;
240245
}
@@ -257,20 +262,58 @@ private boolean applyAsRightSidePushableFilter(Expression filter, List<MatchConf
257262
return false;
258263
}
259264

265+
/**
266+
* Reorients a binary comparison so that the left side is from the input page and the right side is from the lookup index.
267+
* Returns the comparison as-is if already correctly oriented, or a swapped version if needed.
268+
* Returns null if both attributes are from the same side (can't be reoriented).
269+
*/
270+
private static EsqlBinaryComparison reorientBinaryComparison(EsqlBinaryComparison binaryComparison, Set<NameId> rightSideFieldNameIds) {
271+
if (binaryComparison.left() instanceof Attribute leftAttr && binaryComparison.right() instanceof Attribute rightAttr) {
272+
// Determine which attribute is from the right side (lookup index)
273+
boolean leftIsRightSide = rightSideFieldNameIds.contains(leftAttr.id());
274+
boolean rightIsRightSide = rightSideFieldNameIds.contains(rightAttr.id());
275+
276+
// We need exactly one attribute from the right side and one from the left side
277+
if (leftIsRightSide == rightIsRightSide) {
278+
// Both are from the same side, can't process as left-right comparison
279+
return null;
280+
}
281+
282+
if (rightIsRightSide) {
283+
// Original orientation is correct: left is from input, right is from lookup
284+
return binaryComparison;
285+
} else {
286+
// Need to swap: original left is from lookup, original right is from input
287+
// Swap the comparison and flip the operator if needed
288+
return (EsqlBinaryComparison) binaryComparison.swapLeftAndRight();
289+
}
290+
}
291+
return null;
292+
}
293+
260294
private boolean applyAsLeftRightBinaryComparison(
261295
Expression expr,
262296
List<MatchConfig> matchFields,
263297
Page inputPage,
264298
ClusterService clusterService,
265299
Warnings warnings
266300
) {
267-
if (expr instanceof EsqlBinaryComparison binaryComparison
268-
&& binaryComparison.left() instanceof Attribute leftAttribute
269-
&& binaryComparison.right() instanceof Attribute rightAttribute) {
270-
// the left side comes from the page that was sent to the lookup node
271-
// the right side is the field from the lookup index
272-
// check if the left side is in the matchFields
273-
// if it is its corresponding page is the corresponding number in inputPage
301+
if (expr instanceof EsqlBinaryComparison binaryComparison) {
302+
// Reorient the comparison so that left is from input page and right is from lookup index
303+
EsqlBinaryComparison orientedComparison = reorientBinaryComparison(binaryComparison, rightSideFieldNameIds);
304+
if (orientedComparison == null) {
305+
// Can't reorient (both attributes from same side)
306+
return false;
307+
}
308+
309+
// After reorientation, left is from input page and right is from lookup index
310+
Attribute leftAttribute = (Attribute) orientedComparison.left();
311+
Attribute rightAttribute = (Attribute) orientedComparison.right();
312+
313+
// The left side comes from the page that was sent to the lookup node
314+
// The right side is the field from the lookup index
315+
// Check if the left side is in the matchFields
316+
// If it is its corresponding page is the corresponding number in inputPage
274317
Block block = null;
275318
DataType dataType = null;
276319
for (int i = 0; i < matchFields.size(); i++) {
@@ -285,7 +328,7 @@ private boolean applyAsLeftRightBinaryComparison(
285328
// special handle Equals operator
286329
// TermQuery is faster than BinaryComparisonQueryList, as it does less work per row
287330
// so here we reuse the existing logic from field based join to build a termQueryList for Equals
288-
if (binaryComparison instanceof Equals) {
331+
if (orientedComparison instanceof Equals) {
289332
QueryList termQueryForEquals = termQueryList(rightFieldType, context, aliasFilter, block, dataType).onlySingleValues(
290333
warnings,
291334
"LOOKUP JOIN encountered multi-value"
@@ -297,7 +340,7 @@ private boolean applyAsLeftRightBinaryComparison(
297340
rightFieldType,
298341
context,
299342
block,
300-
binaryComparison,
343+
orientedComparison,
301344
clusterService,
302345
aliasFilter,
303346
warnings

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ public List<MatchConfig> getMatchFields() {
291291
return matchFields;
292292
}
293293

294+
public PhysicalPlan getRightPreJoinPlan() {
295+
return rightPreJoinPlan;
296+
}
297+
294298
@Override
295299
public void writeTo(StreamOutput out) throws IOException {
296300
super.writeTo(out);

0 commit comments

Comments
 (0)