Skip to content

Commit f451b2a

Browse files
iwanttobepowerfulmihaibudiu
authored andcommitted
[CALCITE-7379] LHS correlated variables are shadowed by nullable RHS outputs in LEFT JOIN
1 parent 6fab1a1 commit f451b2a

File tree

3 files changed

+990
-50
lines changed

3 files changed

+990
-50
lines changed

core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java

Lines changed: 236 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -959,18 +959,19 @@ private RelNode rewriteScalarAggregate(Aggregate oldRel,
959959

960960
// Build join conditions
961961
final Map<Integer, RexNode> newProjectMap = new HashMap<>();
962-
final List<RexNode> conditions = new ArrayList<>();
963962
for (Map.Entry<CorDef, Integer> corDefOutput : corDefOutputs.entrySet()) {
964963
final CorDef corDef = corDefOutput.getKey();
965964
final int leftPos = requireNonNull(valueGenCorDefOutputs.get(corDef));
966965
final int rightPos = corDefOutput.getValue();
967966
final RelDataType leftType = valueGen.getRowType().getFieldList().get(leftPos).getType();
968-
final RelDataType rightType = newRel.getRowType().getFieldList().get(rightPos).getType();
969967
final RexNode leftRef = new RexInputRef(leftPos, leftType);
970-
final RexNode rightRef = new RexInputRef(valueGenFieldCount + rightPos, rightType);
971-
conditions.add(relBuilder.isNotDistinctFrom(leftRef, rightRef));
972968
newProjectMap.put(valueGenFieldCount + rightPos, leftRef);
973969
}
970+
971+
final List<RexNode> conditions =
972+
buildCorDefJoinConditions(valueGenCorDefOutputs, corDefOutputs,
973+
valueGen, newRel, relBuilder);
974+
974975
final RexNode joinCond = RexUtil.composeConjunction(relBuilder.getRexBuilder(), conditions);
975976

976977
// Build [08] LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[left])
@@ -1277,21 +1278,13 @@ private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex,
12771278

12781279
// Build join conditions: for each CorDef of this branch that belongs
12791280
// to the current outFrameCorrId, equate valueGen(col) with branch(col).
1280-
final List<RexNode> conditions = new ArrayList<>();
1281-
for (Map.Entry<CorDef, Integer> e : frame.corDefOutputs.entrySet()) {
1282-
final CorDef corDef = e.getKey();
1283-
final int leftPos = requireNonNull(valueGenCorDefOutputs.get(corDef));
1284-
final int rightPos = e.getValue();
1285-
final RelDataType leftType = valueGen.getRowType().getFieldList().get(leftPos).getType();
1286-
final RelDataType rightType = frame.r.getRowType().getFieldList().get(rightPos).getType();
1287-
final RexNode leftRef = new RexInputRef(leftPos, leftType);
1288-
final RexNode rightRef = new RexInputRef(valueGenFieldCount + rightPos, rightType);
1289-
conditions.add(relBuilder.isNotDistinctFrom(leftRef, rightRef));
1290-
}
1281+
final List<RexNode> conditions =
1282+
buildCorDefJoinConditions(valueGenCorDefOutputs, frame.corDefOutputs,
1283+
valueGen, frame.r, relBuilder);
12911284
final RexNode joinCondition =
12921285
RexUtil.composeConjunction(relBuilder.getRexBuilder(), conditions);
12931286
RelNode join = relBuilder.push(valueGen).push(frame.r)
1294-
.join(JoinRelType.INNER, joinCondition).build();
1287+
.join(JoinRelType.INNER, joinCondition).build();
12951288

12961289
final List<RelDataTypeField> joinFields = join.getRowType().getFieldList();
12971290

@@ -1935,10 +1928,16 @@ private static boolean isWidening(RelDataType type, RelDataType type1) {
19351928
return decorrelateRel((RelNode) rel, isCorVarDefined, parentPropagatesNullValues);
19361929
}
19371930
//
1938-
// Rewrite logic:
1931+
// For other join types (INNER, LEFT, RIGHT, FULL):
19391932
//
1940-
// 1. rewrite join condition.
1941-
// 2. map output positions and produce corVars if any.
1933+
// 1. Decorrelates the left and right inputs recursively.
1934+
// 2. Ensures that required correlated variables are present in the inputs, adding
1935+
// value generators if necessary (e.g., for the nullable side of an outer join).
1936+
// 3. Constructs a new join condition that includes the original condition and
1937+
// equality conditions for the correlated variables.
1938+
// 4. For {@link JoinRelType#FULL}, adds a projection on top of the join to coalesce
1939+
// correlated variables that might be null on one side due to the join nature.
1940+
// 5. Updates the output mapping to reflect the new join structure.
19421941
//
19431942

19441943
final RelNode oldLeft = rel.getInput(0);
@@ -1952,53 +1951,178 @@ private static boolean isWidening(RelDataType type, RelDataType type1) {
19521951
return null;
19531952
}
19541953

1954+
// 1. Collect all CorRefs involved
1955+
final CorelMap localCorelMap = new CorelMapBuilder().build(rel);
1956+
final List<CorRef> corVarList = new ArrayList<>(localCorelMap.mapRefRelToCorRef.values());
1957+
Collections.sort(corVarList);
1958+
1959+
// 2. Ensure CorVars are present in inputs (adding ValueGenerators if needed)
19551960
Frame newLeftFrame = leftFrame;
1956-
boolean joinConditionContainsFieldAccess = RexUtil.containsFieldAccess(rel.getCondition());
1957-
if (joinConditionContainsFieldAccess && isCorVarDefined) {
1958-
final CorelMap localCorelMap = new CorelMapBuilder().build(rel);
1959-
final List<CorRef> corVarList = new ArrayList<>(localCorelMap.mapRefRelToCorRef.values());
1960-
Collections.sort(corVarList);
1961+
Frame newRightFrame = rightFrame;
1962+
final NavigableMap<CorDef, Integer> leftCorDefOutputs = new TreeMap<>();
1963+
final NavigableMap<CorDef, Integer> rightCorDefOutputs = new TreeMap<>();
1964+
boolean generatesNullsOnRight = rel.getJoinType().generatesNullsOnRight();
1965+
boolean generatesNullsOnLeft = rel.getJoinType().generatesNullsOnLeft();
1966+
1967+
if (isCorVarDefined) {
1968+
// ensure CorVars are present in left input
1969+
if (generatesNullsOnRight || RexUtil.containsFieldAccess(rel.getCondition())) {
1970+
newLeftFrame = supplyMissingCorVars(oldLeft, leftFrame, corVarList, leftCorDefOutputs);
1971+
rightCorDefOutputs.putAll(rightFrame.corDefOutputs);
1972+
}
1973+
// ensure CorVars are present in right input
1974+
if (generatesNullsOnLeft) {
1975+
newRightFrame = supplyMissingCorVars(oldRight, rightFrame, corVarList, rightCorDefOutputs);
1976+
leftCorDefOutputs.putAll(leftFrame.corDefOutputs);
1977+
}
1978+
} else {
1979+
leftCorDefOutputs.putAll(leftFrame.corDefOutputs);
1980+
rightCorDefOutputs.putAll(rightFrame.corDefOutputs);
1981+
}
1982+
1983+
// 3. Build Join Conditions
1984+
final List<RexNode> joinConditions = new ArrayList<>();
1985+
RexNode originalCond = decorrelateExpr(castNonNull(currentRel), map, cm, rel.getCondition());
1986+
if (!originalCond.isAlwaysTrue()) {
1987+
joinConditions.add(originalCond);
1988+
}
19611989

1962-
final NavigableMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
1963-
newLeftFrame = createFrameWithValueGenerator(oldLeft, leftFrame, corVarList, corDefOutputs);
1990+
if (generatesNullsOnLeft || generatesNullsOnRight) {
1991+
List<RexNode> conds =
1992+
buildCorDefJoinConditions(leftCorDefOutputs, rightCorDefOutputs,
1993+
newLeftFrame.r, newRightFrame.r, relBuilder);
1994+
joinConditions.addAll(conds);
19641995
}
19651996

1997+
RexNode finalCondition = joinConditions.isEmpty()
1998+
? relBuilder.literal(true)
1999+
: RexUtil.composeConjunction(relBuilder.getRexBuilder(), joinConditions);
2000+
19662001
RelNode newJoin = relBuilder
19672002
.push(newLeftFrame.r)
1968-
.push(rightFrame.r)
1969-
.join(rel.getJoinType(),
1970-
decorrelateExpr(castNonNull(currentRel), map, cm, rel.getCondition()),
1971-
ImmutableSet.of())
2003+
.push(newRightFrame.r)
2004+
.join(rel.getJoinType(), finalCondition, ImmutableSet.of())
19722005
.build();
19732006

1974-
// Create the mapping between the output of the old correlation rel
1975-
// and the new join rel
1976-
Map<Integer, Integer> mapOldToNewOutputs = new HashMap<>();
1977-
1978-
int oldLeftFieldCount = oldLeft.getRowType().getFieldCount();
2007+
// 4. Handle Full Join Projections (Coalesce)
2008+
NavigableMap<CorDef, Integer> corDefOutputs = new TreeMap<>(newLeftFrame.corDefOutputs);
19792009
int newLeftFieldCount = newLeftFrame.r.getRowType().getFieldCount();
2010+
if (rel.getJoinType() == JoinRelType.FULL && isCorVarDefined) {
2011+
//
2012+
// SELECT
2013+
// d.dname,
2014+
// (
2015+
// SELECT COUNT(sub.empno)
2016+
// FROM (
2017+
// SELECT * FROM emp e2 WHERE e2.deptno = d.deptno
2018+
// ) sub
2019+
// FULL JOIN emp e
2020+
// ON sub.mgr = e.mgr
2021+
// ) as matched_subordinate_count
2022+
// FROM dept d
2023+
// order by d.dname;
2024+
//
2025+
// LogicalJoin(condition=[=($3, $11)], joinType=[full])
2026+
// LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], ...)
2027+
// LogicalFilter(condition=[=($7, $cor0.DEPTNO)])
2028+
// LogicalTableScan(table=[[scott, EMP]])
2029+
// LogicalTableScan(table=[[scott, EMP]])
2030+
//
2031+
// convert to:
2032+
//
2033+
// LogicalProject(_cor_$cor0_0=[COALESCE($8, $17)], EMPNO=[$0])
2034+
// LogicalJoin(condition=[AND(=($3, $12), IS NOT DISTINCT FROM($8, $17))], joinType=[full])
2035+
// LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], ...)
2036+
// LogicalFilter(condition=[IS NOT NULL($7)])
2037+
// LogicalTableScan(table=[[scott, EMP]])
2038+
// LogicalJoin(condition=[true], joinType=[inner])
2039+
// LogicalTableScan(table=[[scott, EMP]])
2040+
// LogicalProject(DEPTNO=[$0])
2041+
// LogicalTableScan(table=[[scott, DEPT]])
2042+
List<RelDataTypeField> joinFields = newJoin.getRowType().getFieldList();
2043+
2044+
// 4.1. Pass through existing fields
2045+
final PairList<RexNode, String> projects = PairList.of();
2046+
for (int i = 0; i < joinFields.size(); i++) {
2047+
RexInputRef.add2(projects, i, joinFields);
2048+
}
2049+
2050+
// 4.2. Build Coalesced CorVars
2051+
NavigableMap<CorDef, Integer> mergedCorDefOutputs = new TreeMap<>(corDefOutputs);
2052+
int projectedIndex = joinFields.size();
2053+
boolean appended = false;
2054+
2055+
for (CorRef corRef : corVarList) {
2056+
CorDef corDef = corRef.def();
2057+
2058+
Integer leftPos = leftCorDefOutputs.get(corDef);
2059+
Integer rightPos = rightCorDefOutputs.get(corDef);
2060+
2061+
// If missing on both sides, nothing to coalesce or project
2062+
if (leftPos == null && rightPos == null) {
2063+
continue;
2064+
}
2065+
2066+
// Create references
2067+
RexNode leftRef = null;
2068+
if (leftPos != null) {
2069+
leftRef = new RexInputRef(leftPos, joinFields.get(leftPos).getType());
2070+
}
2071+
2072+
RexNode rightRef = null;
2073+
if (rightPos != null) {
2074+
// Right side indices are offset by the left field count in the join
2075+
int actualRightIndex = rightPos + newLeftFieldCount;
2076+
rightRef = new RexInputRef(actualRightIndex, joinFields.get(actualRightIndex).getType());
2077+
}
2078+
2079+
// Determine the expression
2080+
RexNode expr;
2081+
if (leftRef == null) {
2082+
expr = rightRef;
2083+
} else if (rightRef == null) {
2084+
expr = leftRef;
2085+
} else {
2086+
// Both exist, create COALESCE
2087+
expr = relBuilder.call(SqlStdOperatorTable.COALESCE, leftRef, rightRef);
2088+
}
19802089

2090+
String name = "_cor_" + corDef.corr.getName() + "_" + corDef.field;
2091+
projects.add(requireNonNull(expr, "expr"), name);
2092+
mergedCorDefOutputs.put(corDef, projectedIndex++);
2093+
appended = true;
2094+
}
2095+
2096+
if (appended) {
2097+
newJoin = relBuilder.push(newJoin)
2098+
.projectNamed(projects.leftList(), projects.rightList(), true)
2099+
.build();
2100+
corDefOutputs.clear();
2101+
corDefOutputs.putAll(mergedCorDefOutputs);
2102+
}
2103+
} else {
2104+
// Standard output mapping for non-Full Join (or Full Join without CorVars)
2105+
// Right input positions are shifted.
2106+
for (Map.Entry<CorDef, Integer> entry : newRightFrame.corDefOutputs.entrySet()) {
2107+
final int shifted = entry.getValue() + newLeftFieldCount;
2108+
if (rel.getJoinType().generatesNullsOnRight()) {
2109+
corDefOutputs.putIfAbsent(entry.getKey(), shifted);
2110+
} else {
2111+
corDefOutputs.put(entry.getKey(), shifted);
2112+
}
2113+
}
2114+
}
2115+
2116+
// 5. Output Mapping
2117+
int oldLeftFieldCount = oldLeft.getRowType().getFieldCount();
19812118
int oldRightFieldCount = oldRight.getRowType().getFieldCount();
1982-
//noinspection AssertWithSideEffects
1983-
assert rel.getRowType().getFieldCount()
1984-
== oldLeftFieldCount + oldRightFieldCount;
19852119

1986-
// Left input positions are not changed.
1987-
mapOldToNewOutputs.putAll(newLeftFrame.oldToNewOutputs);
1988-
// Right input positions are shifted by newLeftFieldCount.
2120+
Map<Integer, Integer> mapOldToNewOutputs = new HashMap<>(newLeftFrame.oldToNewOutputs);
19892121
for (int i = 0; i < oldRightFieldCount; i++) {
19902122
mapOldToNewOutputs.put(i + oldLeftFieldCount,
1991-
requireNonNull(rightFrame.oldToNewOutputs.get(i)) + newLeftFieldCount);
2123+
requireNonNull(newRightFrame.oldToNewOutputs.get(i)) + newLeftFieldCount);
19922124
}
19932125

1994-
final NavigableMap<CorDef, Integer> corDefOutputs =
1995-
new TreeMap<>(newLeftFrame.corDefOutputs);
1996-
// Right input positions are shifted by newLeftFieldCount.
1997-
for (Map.Entry<CorDef, Integer> entry
1998-
: rightFrame.corDefOutputs.entrySet()) {
1999-
corDefOutputs.put(entry.getKey(),
2000-
entry.getValue() + newLeftFieldCount);
2001-
}
20022126
return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs);
20032127
}
20042128

@@ -3719,6 +3843,68 @@ private static boolean isFieldNotNullRecursive(RelNode rel, int index) {
37193843
}
37203844
}
37213845

3846+
/**
3847+
* Ensures that the correlated variables in {@code allCorDefs} are present
3848+
* in the output of the frame.
3849+
* If any are missing, it creates a value generator to produce them and joins it with the frame.
3850+
*
3851+
* @param oldInput The original input RelNode
3852+
* @param frame The current frame for the input
3853+
* @param corVarList List of all correlated variables
3854+
* @param corDefOutputs Map to populate with the output positions of the correlated variables
3855+
* @return A new Frame with all required correlated variables, or the original frame
3856+
* if all were present
3857+
*/
3858+
private Frame supplyMissingCorVars(RelNode oldInput, Frame frame,
3859+
List<CorRef> corVarList, NavigableMap<CorDef, Integer> corDefOutputs) {
3860+
final ImmutableSortedSet<CorDef> haves = frame.corDefOutputs.keySet();
3861+
if (hasAll(corVarList, haves)) {
3862+
corDefOutputs.putAll(frame.corDefOutputs);
3863+
return frame;
3864+
}
3865+
3866+
final List<CorRef> miss = new ArrayList<>();
3867+
for (CorRef r : corVarList) {
3868+
if (!haves.contains(r.def())) {
3869+
miss.add(r);
3870+
}
3871+
}
3872+
3873+
return createFrameWithValueGenerator(oldInput, frame, miss, corDefOutputs);
3874+
}
3875+
3876+
/**
3877+
* Builds join conditions to equate correlated variables that are present in both left
3878+
* and right inputs.
3879+
*
3880+
* @param leftCorDefOutputs Map of CorDefs to output positions in the left input
3881+
* @param rightCorDefOutputs Map of CorDefs to output positions in the right input
3882+
* @param leftRel The left input RelNode
3883+
* @param rightRel The right input RelNode
3884+
* @param relBuilder RelBuilder for creating expressions
3885+
* @return A list of join conditions (IS NOT DISTINCT FROM) for matching correlated variables
3886+
*/
3887+
private List<RexNode> buildCorDefJoinConditions(
3888+
NavigableMap<CorDef, Integer> leftCorDefOutputs,
3889+
NavigableMap<CorDef, Integer> rightCorDefOutputs,
3890+
RelNode leftRel, RelNode rightRel, RelBuilder relBuilder) {
3891+
List<RexNode> joinConditions = new ArrayList<>();
3892+
int leftFieldCount = leftRel.getRowType().getFieldCount();
3893+
for (Map.Entry<CorDef, Integer> leftEntry : leftCorDefOutputs.entrySet()) {
3894+
CorDef corDef = leftEntry.getKey();
3895+
if (rightCorDefOutputs.containsKey(corDef)) {
3896+
int leftPos = leftEntry.getValue();
3897+
int rightPos = rightCorDefOutputs.get(corDef);
3898+
final RelDataType leftType = leftRel.getRowType().getFieldList().get(leftPos).getType();
3899+
final RelDataType rightType = rightRel.getRowType().getFieldList().get(rightPos).getType();
3900+
final RexNode leftRef = new RexInputRef(leftPos, leftType);
3901+
final RexNode rightRef = new RexInputRef(leftFieldCount + rightPos, rightType);
3902+
joinConditions.add(relBuilder.isNotDistinctFrom(leftRef, rightRef));
3903+
}
3904+
}
3905+
return joinConditions;
3906+
}
3907+
37223908
// -------------------------------------------------------------------------
37233909
// Getter/Setter
37243910
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)