Skip to content

Commit 22d9709

Browse files
pan3793peter-toth
andcommitted
[SPARK-53738][SQL] PlannedWrite should preserve custom sort order when query output contains literal
### What changes were proposed in this pull request? This PR fixes a bug in `plannedWrite`, where the `query` has a literal output of the partition column. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. The [idea](#52474 (comment)) is to expose and keep "constant order" expressions from `child.outputOrdering` ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 fixes the issue accidentally(and partially) by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` This PR fixes the issue thoroughly, with a new UT added. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52474 from pan3793/SPARK-53738. Lead-authored-by: Cheng Pan <[email protected]> Co-authored-by: Peter Toth <[email protected]> Co-authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]>
1 parent 011b2b8 commit 22d9709

File tree

16 files changed

+179
-34
lines changed

16 files changed

+179
-34
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ package object dsl extends SQLConfHelper {
151151

152152
def asc: SortOrder = SortOrder(expr, Ascending)
153153
def asc_nullsLast: SortOrder = SortOrder(expr, Ascending, NullsLast, Seq.empty)
154+
def const: SortOrder = SortOrder(expr, Constant)
154155
def desc: SortOrder = SortOrder(expr, Descending)
155156
def desc_nullsFirst: SortOrder = SortOrder(expr, Descending, NullsFirst, Seq.empty)
156157
def as(alias: String): NamedExpression = Alias(expr, alias)()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ case object Descending extends SortDirection {
4545
override def defaultNullOrdering: NullOrdering = NullsLast
4646
}
4747

48+
case object Constant extends SortDirection {
49+
override def sql: String = "CONST"
50+
override def defaultNullOrdering: NullOrdering = NullsFirst
51+
}
52+
4853
case object NullsFirst extends NullOrdering {
4954
override def sql: String = "NULLS FIRST"
5055
}
@@ -69,8 +74,13 @@ case class SortOrder(
6974

7075
override def children: Seq[Expression] = child +: sameOrderExpressions
7176

72-
override def checkInputDataTypes(): TypeCheckResult =
73-
TypeUtils.checkForOrderingExpr(dataType, prettyName)
77+
override def checkInputDataTypes(): TypeCheckResult = {
78+
if (direction == Constant) {
79+
TypeCheckResult.TypeCheckSuccess
80+
} else {
81+
TypeUtils.checkForOrderingExpr(dataType, prettyName)
82+
}
83+
}
7484

7585
override def dataType: DataType = child.dataType
7686
override def nullable: Boolean = child.nullable
@@ -81,8 +91,8 @@ case class SortOrder(
8191
def isAscending: Boolean = direction == Ascending
8292

8393
def satisfies(required: SortOrder): Boolean = {
84-
children.exists(required.child.semanticEquals) &&
85-
direction == required.direction && nullOrdering == required.nullOrdering
94+
children.exists(required.child.semanticEquals) && (direction == Constant ||
95+
direction == required.direction && nullOrdering == required.nullOrdering)
8696
}
8797

8898
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): SortOrder =
@@ -101,21 +111,38 @@ object SortOrder {
101111
* Returns if a sequence of SortOrder satisfies another sequence of SortOrder.
102112
*
103113
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
104-
* or of A's prefix. Here are examples of ordering A satisfying ordering B:
114+
* or of A's prefix, except for SortOrder in B that satisfies any constant SortOrder in A.
115+
*
116+
* Here are examples of ordering A satisfying ordering B:
105117
* <ul>
106118
* <li>ordering A is [x, y] and ordering B is [x]</li>
119+
* <li>ordering A is [z(const), x, y] and ordering B is [x, z]</li>
107120
* <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is [x1]</li>
108121
* <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is [x1]</li>
109122
* </ul>
110123
*/
111-
def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): Boolean = {
112-
if (ordering2.isEmpty) {
113-
true
114-
} else if (ordering2.length > ordering1.length) {
124+
def orderingSatisfies(
125+
providedOrdering: Seq[SortOrder], requiredOrdering: Seq[SortOrder]): Boolean = {
126+
if (requiredOrdering.isEmpty) {
127+
return true
128+
}
129+
130+
val (constantProvidedOrdering, nonConstantProvidedOrdering) = providedOrdering.partition {
131+
case SortOrder(_, Constant, _, _) => true
132+
case SortOrder(child, _, _, _) => child.foldable
133+
}
134+
135+
val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder =>
136+
constantProvidedOrdering.exists { providedOrder =>
137+
providedOrder.satisfies(requiredOrder)
138+
}
139+
}
140+
141+
if (effectiveRequiredOrdering.length > nonConstantProvidedOrdering.length) {
115142
false
116143
} else {
117-
ordering2.zip(ordering1).forall {
118-
case (o2, o1) => o1.satisfies(o2)
144+
effectiveRequiredOrdering.zip(nonConstantProvidedOrdering).forall {
145+
case (required, provided) => provided.satisfies(required)
119146
}
120147
}
121148
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
2020
import org.apache.spark.SparkIllegalArgumentException
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup}
23-
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
23+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Constant, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
2424
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLExpr
2525
import org.apache.spark.sql.catalyst.trees.UnaryLike
2626
import org.apache.spark.sql.catalyst.types.PhysicalDataType
@@ -199,6 +199,8 @@ case class Mode(
199199
this.copy(child = child, reverseOpt = Some(true))
200200
case SortOrder(child, Descending, _, _) =>
201201
this.copy(child = child, reverseOpt = Some(false))
202+
case SortOrder(child, Constant, _, _) =>
203+
this.copy(child = child)
202204
}
203205
case _ => this
204206
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class PercentileCont(left: Expression, right: Expression, reverse: Boolean
382382
nodeName, 1, orderingWithinGroup.length)
383383
}
384384
orderingWithinGroup.head match {
385-
case SortOrder(child, Ascending, _, _) => this.copy(left = child)
385+
case SortOrder(child, Ascending | Constant, _, _) => this.copy(left = child)
386386
case SortOrder(child, Descending, _, _) => this.copy(left = child, reverse = true)
387387
}
388388
}
@@ -440,7 +440,7 @@ case class PercentileDisc(
440440
nodeName, 1, orderingWithinGroup.length)
441441
}
442442
orderingWithinGroup.head match {
443-
case SortOrder(expr, Ascending, _, _) => this.copy(child = expr)
443+
case SortOrder(expr, Ascending | Constant, _, _) => this.copy(child = expr)
444444
case SortOrder(expr, Descending, _, _) => this.copy(child = expr, reverse = true)
445445
}
446446
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1919,7 +1919,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
19191919
object EliminateSorts extends Rule[LogicalPlan] {
19201920
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(_.containsPattern(SORT)) {
19211921
case s @ Sort(orders, _, child, _) if orders.isEmpty || orders.exists(_.child.foldable) =>
1922-
val newOrders = orders.filterNot(_.child.foldable)
1922+
val newOrders = orders.filterNot(o => o.direction != Constant && o.child.foldable)
19231923
if (newOrders.isEmpty) {
19241924
child
19251925
} else {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.sql.catalyst.SQLConfHelper
23-
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
23+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Constant, Empty2Null, Expression, NamedExpression, SortOrder}
2424
import org.apache.spark.sql.internal.SQLConf
2525

2626
/**
@@ -128,6 +128,8 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
128128
}
129129
}
130130
}
131-
newOrdering.takeWhile(_.isDefined).flatten.toSeq
131+
newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.collect {
132+
case a @ Alias(child, _) if child.foldable => SortOrder(a.toAttribute, Constant)
133+
}
132134
}
133135
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,8 @@ case class Sort(
912912
override def maxRowsPerPartition: Option[Long] = {
913913
if (global) maxRows else child.maxRowsPerPartition
914914
}
915-
override def outputOrdering: Seq[SortOrder] = order
915+
override def outputOrdering: Seq[SortOrder] =
916+
order ++ child.outputOrdering.filter(_.direction == Constant)
916917
final override val nodePatterns: Seq[TreePattern] = Seq(SORT)
917918
override protected def withNewChildInternal(newChild: LogicalPlan): Sort = copy(child = newChild)
918919
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
4343
val sortOrder = direction match {
4444
case Ascending => BoundReference(0, dataType, nullable = true).asc
4545
case Descending => BoundReference(0, dataType, nullable = true).desc
46+
case Constant => BoundReference(0, dataType, nullable = true).const
4647
}
4748
val expectedCompareResult = direction match {
48-
case Ascending => signum(expected)
49+
case Ascending | Constant => signum(expected)
4950
case Descending => -1 * signum(expected)
5051
}
5152

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ case class SortExec(
4646

4747
override def output: Seq[Attribute] = child.output
4848

49-
override def outputOrdering: Seq[SortOrder] = sortOrder
49+
override def outputOrdering: Seq[SortOrder] =
50+
sortOrder ++ child.outputOrdering.filter(_.direction == Constant)
5051

5152
// sort performed is local within a given partition so will retain
5253
// child operator's partitioning
@@ -73,15 +74,17 @@ case class SortExec(
7374
* should make it public.
7475
*/
7576
def createSorter(): UnsafeExternalRowSorter = {
77+
val effectiveSortOrder = sortOrder.filterNot(_.direction == Constant)
78+
7679
rowSorter = new ThreadLocal[UnsafeExternalRowSorter]()
7780

7881
val ordering = RowOrdering.create(sortOrder, output)
7982

8083
// The comparator for comparing prefix
81-
val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
84+
val boundSortExpression = BindReferences.bindReference(effectiveSortOrder.head, output)
8285
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
8386

84-
val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
87+
val canUseRadixSort = enableRadixSort && effectiveSortOrder.length == 1 &&
8588
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
8689

8790
// The generator for prefix

sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ object SortPrefixUtils {
6363
PrefixComparators.STRING_DESC_NULLS_FIRST
6464
case Descending =>
6565
PrefixComparators.STRING_DESC
66+
case Constant =>
67+
NoOpPrefixComparator
6668
}
6769
}
6870

@@ -76,6 +78,8 @@ object SortPrefixUtils {
7678
PrefixComparators.BINARY_DESC_NULLS_FIRST
7779
case Descending =>
7880
PrefixComparators.BINARY_DESC
81+
case Constant =>
82+
NoOpPrefixComparator
7983
}
8084
}
8185

@@ -89,6 +93,8 @@ object SortPrefixUtils {
8993
PrefixComparators.LONG_DESC_NULLS_FIRST
9094
case Descending =>
9195
PrefixComparators.LONG_DESC
96+
case Constant =>
97+
NoOpPrefixComparator
9298
}
9399
}
94100

@@ -102,6 +108,8 @@ object SortPrefixUtils {
102108
PrefixComparators.DOUBLE_DESC_NULLS_FIRST
103109
case Descending =>
104110
PrefixComparators.DOUBLE_DESC
111+
case Constant =>
112+
NoOpPrefixComparator
105113
}
106114
}
107115

0 commit comments

Comments
 (0)