Skip to content

Commit 5960686

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning
## What changes were proposed in this pull request? Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children. For example, J = {A join B on key1 = key2} 1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC" 2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC" 3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1" So to fix this I changed the behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to: 1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering 2. Otherwise => required child ordering In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes. ## How was this patch tested? Added new test cases. Passed all integration tests. Author: maryannxue <[email protected]> Closes apache#19281 from maryannxue/spark-21998.
1 parent 5ac9685 commit 5960686

File tree

4 files changed

+102
-21
lines changed

4 files changed

+102
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@ object SortOrder {
9696
sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
9797
new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions)
9898
}
99+
100+
/**
101+
* Returns if a sequence of SortOrder satisfies another sequence of SortOrder.
102+
*
103+
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
104+
* or of A's prefix. Here are examples of ordering A satisfying ordering B:
105+
* <ul>
106+
* <li>ordering A is [x, y] and ordering B is [x]</li>
107+
* <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is [x1]</li>
108+
* <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is [x1]</li>
109+
* </ul>
110+
*/
111+
def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): Boolean = {
112+
if (ordering2.isEmpty) {
113+
true
114+
} else if (ordering2.length > ordering1.length) {
115+
false
116+
} else {
117+
ordering2.zip(ordering1).forall {
118+
case (o2, o1) => o1.satisfies(o2)
119+
}
120+
}
121+
}
99122
}
100123

101124
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -234,24 +234,11 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
234234

235235
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
236236
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
237-
if (requiredOrdering.nonEmpty) {
238-
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
239-
val orderingMatched = if (requiredOrdering.length > child.outputOrdering.length) {
240-
false
241-
} else {
242-
requiredOrdering.zip(child.outputOrdering).forall {
243-
case (requiredOrder, childOutputOrder) =>
244-
childOutputOrder.satisfies(requiredOrder)
245-
}
246-
}
247-
248-
if (!orderingMatched) {
249-
SortExec(requiredOrdering, global = false, child = child)
250-
} else {
251-
child
252-
}
253-
} else {
237+
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
238+
if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
254239
child
240+
} else {
241+
SortExec(requiredOrdering, global = false, child = child)
255242
}
256243
}
257244

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,22 @@ case class SortMergeJoinExec(
102102
}
103103

104104
/**
105-
* For SMJ, child's output must have been sorted on key or expressions with the same order as
106-
* key, so we can get ordering for key from child's output ordering.
105+
* The utility method to get output ordering for left or right side of the join.
106+
*
107+
* Returns the required ordering for left or right child if childOutputOrdering does not
108+
* satisfy the required ordering; otherwise, which means the child does not need to be sorted
109+
* again, returns the required ordering for this child with extra "sameOrderExpressions" from
110+
* the child's outputOrdering.
107111
*/
108112
private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder])
109113
: Seq[SortOrder] = {
110-
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
111-
SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
114+
val requiredOrdering = requiredOrders(keys)
115+
if (SortOrder.orderingSatisfies(childOutputOrdering, requiredOrdering)) {
116+
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
117+
SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
118+
}
119+
} else {
120+
requiredOrdering
112121
}
113122
}
114123

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import scala.language.existentials
2424
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
27+
import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
28+
import org.apache.spark.sql.execution.SortExec
2729
import org.apache.spark.sql.execution.joins._
2830
import org.apache.spark.sql.internal.SQLConf
2931
import org.apache.spark.sql.test.SharedSQLContext
@@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with SharedSQLContext {
787789
}
788790
}
789791
}
792+
793+
test("test SortMergeJoin output ordering") {
794+
val joinQueries = Seq(
795+
"SELECT * FROM testData JOIN testData2 ON key = a",
796+
"SELECT * FROM testData t1 JOIN " +
797+
"testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a",
798+
"SELECT * FROM testData t1 JOIN " +
799+
"testData2 t2 ON t1.key = t2.a JOIN " +
800+
"testData3 t3 ON t2.a = t3.a JOIN " +
801+
"testData t4 ON t1.key = t4.key")
802+
803+
def assertJoinOrdering(sqlString: String): Unit = {
804+
val df = sql(sqlString)
805+
val physical = df.queryExecution.sparkPlan
806+
val physicalJoins = physical.collect {
807+
case j: SortMergeJoinExec => j
808+
}
809+
val executed = df.queryExecution.executedPlan
810+
val executedJoins = executed.collect {
811+
case j: SortMergeJoinExec => j
812+
}
813+
// This only applies to the above tested queries, in which a child SortMergeJoin always
814+
// contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never
815+
// appear as parent of SortMergeJoin.
816+
executed.foreach {
817+
case s: SortExec => s.foreach {
818+
case j: SortMergeJoinExec => fail(
819+
s"No extra sort should be added since $j already satisfies the required ordering"
820+
)
821+
case _ =>
822+
}
823+
case _ =>
824+
}
825+
val joinPairs = physicalJoins.zip(executedJoins)
826+
val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
827+
assert(joinPairs.size == numOfJoins)
828+
829+
joinPairs.foreach {
830+
case(join1, join2) =>
831+
val leftKeys = join1.leftKeys
832+
val rightKeys = join1.rightKeys
833+
val outputOrderingPhysical = join1.outputOrdering
834+
val outputOrderingExecuted = join2.outputOrdering
835+
836+
// outputOrdering should always contain join keys
837+
assert(
838+
SortOrder.orderingSatisfies(
839+
outputOrderingPhysical, leftKeys.map(SortOrder(_, Ascending))))
840+
assert(
841+
SortOrder.orderingSatisfies(
842+
outputOrderingPhysical, rightKeys.map(SortOrder(_, Ascending))))
843+
// outputOrdering should be consistent between physical plan and executed plan
844+
assert(outputOrderingPhysical == outputOrderingExecuted,
845+
s"Operator $join1 did not have the same output ordering in the physical plan as in " +
846+
s"the executed plan.")
847+
}
848+
}
849+
850+
joinQueries.foreach(assertJoinOrdering)
851+
}
790852
}

0 commit comments

Comments
 (0)