@@ -574,9 +574,7 @@ protected RexNode removeCorrelationExpr(
574574 }
575575
576576 if (isCorVarDefined && (rel .fetch != null || rel .offset != null )) {
577- if (rel .fetch != null
578- && rel .offset == null
579- && RexLiteral .intValue (rel .fetch ) == 1 ) {
577+ if (rel .fetch != null && rel .offset == null ) {
580578 return decorrelateFetchOneSort (rel , frame );
581579 }
582580 // Can not decorrelate if the sort has per-correlate-key attributes like
@@ -1099,25 +1097,48 @@ private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex,
10991097 for (RelDataTypeField field : sort .getRowType ().getFieldList ()) {
11001098 final int newIdx =
11011099 requireNonNull (frame .oldToNewOutputs .get (field .getIndex ()));
1102-
1103- RelBuilder .AggCall aggCall =
1104- relBuilder .aggregateCall (SqlStdOperatorTable .FIRST_VALUE ,
1105- RexInputRef .of (newIdx , fieldList ));
1106-
1107- // Convert each field from the sorted output to a window function that partitions by
1108- // correlated variables, orders by the collation, and return the first_value.
1109- RexNode winCall = aggCall .over ()
1110- .orderBy (sortExprs )
1111- .partitionBy (corVarProjects .leftList ())
1112- .toRex ();
11131100 mapOldToNewOutputs .put (newProjExprs .size (), newProjExprs .size ());
1114- newProjExprs .add (winCall , field .getName ());
1101+ newProjExprs .add (RexInputRef . of ( newIdx , fieldList ) , field .getName ());
11151102 }
11161103 newProjExprs .addAll (corVarProjects );
1117- RelNode result = relBuilder .push (frame .r )
1118- .project (newProjExprs .leftList (), newProjExprs .rightList ())
1119- .distinct ().build ();
11201104
1105+ relBuilder .push (frame .r );
1106+
1107+ RexNode rowNumberCall = relBuilder .aggregateCall (SqlStdOperatorTable .ROW_NUMBER )
1108+ .over ()
1109+ .partitionBy (corVarProjects .leftList ())
1110+ .orderBy (sortExprs )
1111+ .toRex ();
1112+ newProjExprs .add (rowNumberCall , "rn" ); // Add the row number column
1113+ relBuilder .project (newProjExprs .leftList (), newProjExprs .rightList ());
1114+
1115+ List <RexNode > conditions = new ArrayList <>();
1116+ if (sort .offset != null ) {
1117+ RexNode greaterThenLowerBound =
1118+ relBuilder .call (
1119+ SqlStdOperatorTable .GREATER_THAN ,
1120+ relBuilder .field (newProjExprs .size () - 1 ),
1121+ sort .offset );
1122+ conditions .add (greaterThenLowerBound );
1123+ }
1124+ if (sort .fetch != null ) {
1125+ RexNode upperBound = sort .offset == null
1126+ ? sort .fetch
1127+ : relBuilder .call (SqlStdOperatorTable .PLUS , sort .offset , sort .fetch );
1128+ RexNode lessThenOrEqualUpperBound =
1129+ relBuilder .call (
1130+ SqlStdOperatorTable .LESS_THAN_OR_EQUAL ,
1131+ relBuilder .field (newProjExprs .size () - 1 ),
1132+ upperBound );
1133+ conditions .add (lessThenOrEqualUpperBound );
1134+ }
1135+
1136+ RelNode result ;
1137+ if (!conditions .isEmpty ()) {
1138+ result = relBuilder .filter (conditions ).build ();
1139+ } else {
1140+ result = relBuilder .build ();
1141+ }
11211142 return register (sort , result , mapOldToNewOutputs , corDefOutputs );
11221143 }
11231144
0 commit comments