Skip to content

Commit 2cdf742

Browse files
Bugfix
1 parent f24c821 commit 2cdf742

File tree

3 files changed

+43
-29
lines changed

3 files changed

+43
-29
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinGeneralExpressionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private void loadSingleDataset(
224224
}
225225

226226
private EsqlQueryResponse runQuery(String query) {
227-
return run(syncEsqlQueryRequest().query(query));
227+
return run(syncEsqlQueryRequest(query));
228228
}
229229

230230
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,15 @@
7979
import org.elasticsearch.xpack.esql.core.expression.Expression;
8080
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
8181
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
82+
import org.elasticsearch.xpack.esql.core.expression.NameId;
8283
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
83-
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
8484
import org.elasticsearch.xpack.esql.core.tree.Source;
8585
import org.elasticsearch.xpack.esql.core.type.DataType;
8686
import org.elasticsearch.xpack.esql.core.type.EsField;
8787
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
8888
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
89+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
90+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
8991
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
9092
import org.elasticsearch.xpack.esql.planner.Layout;
9193
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -94,7 +96,6 @@
9496
import java.io.IOException;
9597
import java.util.ArrayList;
9698
import java.util.Collections;
97-
import java.util.HashMap;
9899
import java.util.HashSet;
99100
import java.util.List;
100101
import java.util.Map;
@@ -586,38 +587,47 @@ private List<MatchConfig> collectLeftSideFieldsToBroadcast(LookupEnrichQueryGene
586587
List<Expression> postJoinFilterExpressions = postJoinFilterable.getPostJoinFilter();
587588
if (postJoinFilterExpressions.isEmpty() == false) {
588589
LookupFromIndexService.TransportRequest lookupRequest = (LookupFromIndexService.TransportRequest) request;
589-
// Build a map of matchField names for quick lookup
590-
Map<String, MatchConfig> matchFieldsByName = new HashMap<>();
591-
for (MatchConfig matchField : lookupRequest.getMatchFields()) {
592-
matchFieldsByName.put(matchField.fieldName(), matchField);
593-
}
594-
// Build a set of extractFields names to avoid adding duplicates
595-
Set<String> extractFieldNames = new HashSet<>();
590+
// Build a set of extractFields NameIDs to avoid adding duplicates
591+
Set<NameId> extractFieldNameIds = new HashSet<>();
596592
for (NamedExpression extractField : request.extractFields) {
597-
extractFieldNames.add(extractField.name());
593+
extractFieldNameIds.add(extractField.id());
594+
}
595+
// Collect right-side field NameIDs from EsRelation in rightPreJoinPlan
596+
Set<NameId> rightSideFieldNameIds = new HashSet<>();
597+
if (lookupRequest.getRightPreJoinPlan() != null) {
598+
if (lookupRequest.getRightPreJoinPlan() instanceof FragmentExec fragmentExec) {
599+
fragmentExec.fragment().forEachDown(EsRelation.class, esRelation -> {
600+
for (Attribute attr : esRelation.output()) {
601+
rightSideFieldNameIds.add(attr.id());
602+
}
603+
});
604+
}
598605
}
599-
// Extract FieldAttributes from post-join filter expressions and check if they are in matchFields
600-
Map<String, TypedAttribute> fieldsToBroadcast = new HashMap<>();
606+
// Track which NameIDs we've already added to avoid duplicates
607+
Set<NameId> addedNameIds = new HashSet<>();
608+
// Traverse filter expressions and match attributes to MatchConfigs by NameID
609+
// Exclude right-side fields and fields already in extractFields
601610
for (Expression filterExpr : postJoinFilterExpressions) {
602611
for (Attribute attr : filterExpr.references()) {
603-
if (attr instanceof TypedAttribute typedAttribute) {
604-
String fieldName = attr.name();
605-
// Check if this field is in matchFields (left-side field with a channel)
606-
if (matchFieldsByName.containsKey(fieldName)) {
607-
fieldsToBroadcast.put(fieldName, typedAttribute);
612+
NameId nameId = attr.id();
613+
// Skip if right-side field, already in extractFields, or already found a match for
614+
if (rightSideFieldNameIds.contains(nameId)
615+
|| extractFieldNameIds.contains(nameId)
616+
|| addedNameIds.contains(nameId)) {
617+
continue;
618+
}
619+
// Find the corresponding MatchConfig for this attribute
620+
// we do match by just name
621+
// we made sure the same attribute is not on the right side with the checks above
622+
for (MatchConfig matchField : lookupRequest.getMatchFields()) {
623+
if (attr.name().equals(matchField.fieldName())) {
624+
builder.append(attr);
625+
allLeftFieldsToBroadcast.add(matchField);
626+
addedNameIds.add(nameId);
627+
break;
608628
}
609629
}
610-
}
611-
}
612-
// Add matching fields to layout and broadcast list, but skip if already in extractFields
613-
for (Map.Entry<String, TypedAttribute> entry : fieldsToBroadcast.entrySet()) {
614-
String fieldName = entry.getKey();
615-
TypedAttribute typedAttribute = entry.getValue();
616-
MatchConfig matchField = matchFieldsByName.get(fieldName);
617-
// Only add if not already in extractFields to avoid duplicates
618-
if (extractFieldNames.contains(fieldName) == false) {
619-
builder.append(typedAttribute);
620-
allLeftFieldsToBroadcast.add(matchField);
630+
621631
}
622632
}
623633
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ public List<MatchConfig> getMatchFields() {
291291
return matchFields;
292292
}
293293

294+
public PhysicalPlan getRightPreJoinPlan() {
295+
return rightPreJoinPlan;
296+
}
297+
294298
@Override
295299
public void writeTo(StreamOutput out) throws IOException {
296300
super.writeTo(out);

0 commit comments

Comments
 (0)