@@ -1781,98 +1781,92 @@ private static boolean isWidening(RelDataType type, RelDataType type1) {
17811781 }
17821782
17831783 RexNode cond = decorrelateExpr (castNonNull (currentRel ), map , cm , rel .getCondition ());
1784- boolean contained = RexUtil .containsFieldAccess (cond );
1785- int groupKeySize = 0 ;
1786- RelNode join = null ;
1784+ int valueGenFieldCount = 0 ;
1785+ RelNode newLeft = null ;
17871786 RexNode replacedCond = null ;
1788- final NavigableMap <CorDef , Integer > corDefOutputs = new TreeMap <>();
1789-
1790- if (contained && isCorVarDefined ) {
1791- final Pair <CorrelationId , Frame > outerFramePair = requireNonNull (this .frameStack .peek ());
1792- final CorrelationId outerFrameCorrId = outerFramePair .left ;
1793- final Frame outerFrame = outerFramePair .right ;
1794-
1795- // Collect CorDef from decorrelated condition.
1796- RexUtil .FieldAccessFinder finder = new RexUtil .FieldAccessFinder ();
1797- cond .accept (finder );
1798- List <RexFieldAccess > correlatedKeyList = finder .getFieldAccessList ();
1799- ImmutableBitSet .Builder corFieldBuilder = ImmutableBitSet .builder ();
1800-
1801- for (RexFieldAccess rfa : correlatedKeyList ) {
1802- RexCorrelVariable correlVariable = (RexCorrelVariable ) rfa .getReferenceExpr ();
1803- if (correlVariable .id .equals (outerFrameCorrId )) {
1804- int idx = rfa .getField ().getIndex ();
1805- int newIdx = requireNonNull (outerFrame .oldToNewOutputs .get (idx ));
1806- corFieldBuilder .set (newIdx );
1807- }
1808- }
1809-
1810- ImmutableBitSet groupSet = corFieldBuilder .build ();
1811- groupKeySize = groupSet .cardinality ();
1812-
1813- // Build LogicalAggregate to obtain the distinct set of corVar from outerFrame.
1814- relBuilder .push (outerFrame .r ).aggregate (relBuilder .groupKey (groupSet ));
1815- join = relBuilder .push (leftFrame .r )
1816- .join (JoinRelType .INNER , relBuilder .literal (true )).build ();
1787+ final NavigableMap <CorDef , Integer > valueGenCorDefOutputs = new TreeMap <>();
1788+ final Pair <CorrelationId , Frame > outerFramePair = requireNonNull (this .frameStack .peek ());
1789+ final CorrelationId outerFrameCorrId = outerFramePair .left ;
1790+ final List <CorRef > corVarList = new ArrayList <>();
1791+ for (CorRef ref : cm .mapRefRelToCorRef .get (rel )) {
1792+ if (ref != null && ref .corr .equals (outerFrameCorrId )) {
1793+ corVarList .add (ref );
1794+ }
1795+ }
1796+ Collections .sort (corVarList );
1797+
1798+ if (!corVarList .isEmpty ()) {
1799+ final RelNode valueGen = createValueGenerator (corVarList , 0 , valueGenCorDefOutputs );
1800+ requireNonNull (valueGen , "valueGen" );
1801+ valueGenFieldCount = valueGen .getRowType ().getFieldCount ();
1802+ newLeft = relBuilder
1803+ .push (valueGen )
1804+ .push (leftFrame .r )
1805+ .join (JoinRelType .INNER , relBuilder .literal (true ))
1806+ .build ();
18171807
18181808 final Map <RexFieldAccess , RexInputRef > replacementMap = new HashMap <>();
1819- for (RexFieldAccess rfa : correlatedKeyList ) {
1820- RexCorrelVariable correlVariable = (RexCorrelVariable ) rfa .getReferenceExpr ();
1821- if (correlVariable .id .equals (outerFrameCorrId )) {
1822- int idx = rfa .getField ().getIndex ();
1823- int newIdx = requireNonNull (outerFrame .oldToNewOutputs .get (idx ));
1824- int pos = groupSet .indexOf (newIdx );
1825- corDefOutputs .put (new CorDef (outerFrameCorrId , newIdx ), pos );
1826-
1827- RelDataType type = outerFrame .r .getRowType ().getFieldList ().get (newIdx ).getType ();
1828- RexInputRef replace = new RexInputRef (pos , type );
1829- replacementMap .put (rfa , replace );
1809+ RexUtil .FieldAccessFinder finder = new RexUtil .FieldAccessFinder ();
1810+ rel .getCondition ().accept (finder );
1811+ for (RexFieldAccess rfa : finder .getFieldAccessList ()) {
1812+ final RexNode ref = rfa .getReferenceExpr ();
1813+ if (ref instanceof RexCorrelVariable ) {
1814+ final RexCorrelVariable corrVar = (RexCorrelVariable ) ref ;
1815+ if (corrVar .id .equals (outerFrameCorrId )) {
1816+ final CorDef def = new CorDef (corrVar .id , rfa .getField ().getIndex ());
1817+ final Integer pos = valueGenCorDefOutputs .get (def );
1818+ if (pos != null ) {
1819+ final RelDataType type = valueGen .getRowType ().getFieldList ().get (pos ).getType ();
1820+ replacementMap .put (rfa , new RexInputRef (pos , type ));
1821+ }
1822+ }
18301823 }
18311824 }
18321825
1833- final RexNode condShifted = RexUtil .shift (cond , groupKeySize );
1826+ RexNode condShifted = RexUtil .shift (cond , valueGenFieldCount );
18341827 replacedCond = RexUtil .replaceRexFieldAccessToInputRef (condShifted , replacementMap );
18351828 }
18361829
18371830 RelNode newJoin = relBuilder
1838- .push (contained ? requireNonNull (join , "join " ) : leftFrame .r )
1831+ .push (! corVarList . isEmpty () ? requireNonNull (newLeft , "newLeft " ) : leftFrame .r )
18391832 .push (rightFrame .r )
18401833 .join (rel .getJoinType (),
1841- contained ? requireNonNull (replacedCond , "replacedCond" ) : cond ,
1834+ ! corVarList . isEmpty () ? requireNonNull (replacedCond , "replacedCond" ) : cond ,
18421835 ImmutableSet .of ())
18431836 .build ();
18441837
18451838 // Create the mapping between the output of the old correlation rel
18461839 // and the new join rel
18471840 Map <Integer , Integer > mapOldToNewOutputs = new HashMap <>();
18481841
1842+ final NavigableMap <CorDef , Integer > corDefOutputs = new TreeMap <>();
1843+ corDefOutputs .putAll (valueGenCorDefOutputs );
1844+
18491845 int oldLeftFieldCount = oldLeft .getRowType ().getFieldCount ();
1850- int newLeftFieldCount = leftFrame .r .getRowType ().getFieldCount () + groupKeySize ;
1846+ int newLeftFieldCount = leftFrame .r .getRowType ().getFieldCount () + valueGenFieldCount ;
18511847
18521848 int oldRightFieldCount = oldRight .getRowType ().getFieldCount ();
18531849 //noinspection AssertWithSideEffects
18541850 assert rel .getRowType ().getFieldCount ()
18551851 == oldLeftFieldCount + oldRightFieldCount ;
18561852
1857- // Left input positions are shifted by groupKeySize .
1853+ // Left input positions are shifted by valueGenFieldCount .
18581854 for (Map .Entry <Integer , Integer > entry : leftFrame .oldToNewOutputs .entrySet ()) {
1859- mapOldToNewOutputs .put (entry .getKey (), groupKeySize + entry .getValue ());
1855+ mapOldToNewOutputs .put (entry .getKey (), valueGenFieldCount + entry .getValue ());
18601856 }
18611857 // Right input positions are shifted by newLeftFieldCount.
18621858 for (int i = 0 ; i < oldRightFieldCount ; i ++) {
18631859 mapOldToNewOutputs .put (i + oldLeftFieldCount ,
18641860 requireNonNull (rightFrame .oldToNewOutputs .get (i )) + newLeftFieldCount );
18651861 }
18661862
1867- // Left input positions are shifted by groupKeySize .
1863+ // Left input positions are shifted by valueGenFieldCount .
18681864 for (Map .Entry <CorDef , Integer > entry : leftFrame .corDefOutputs .entrySet ()) {
1869- corDefOutputs .put (entry .getKey (), groupKeySize + entry .getValue ());
1865+ corDefOutputs .put (entry .getKey (), valueGenFieldCount + entry .getValue ());
18701866 }
18711867 // Right input positions are shifted by newLeftFieldCount.
1872- for (Map .Entry <CorDef , Integer > entry
1873- : rightFrame .corDefOutputs .entrySet ()) {
1874- corDefOutputs .put (entry .getKey (),
1875- entry .getValue () + newLeftFieldCount );
1868+ for (Map .Entry <CorDef , Integer > entry : rightFrame .corDefOutputs .entrySet ()) {
1869+ corDefOutputs .put (entry .getKey (), entry .getValue () + newLeftFieldCount );
18761870 }
18771871 return register (rel , newJoin , mapOldToNewOutputs , corDefOutputs );
18781872 }
0 commit comments