18
18
package org .apache .spark .sql .execution .adaptive
19
19
20
20
import org .apache .spark .sql .catalyst .analysis .UnresolvedAttribute
21
- import org .apache .spark .sql .catalyst .expressions .{Ascending , SortOrder }
22
21
import org .apache .spark .sql .catalyst .plans .Inner
23
22
import org .apache .spark .sql .catalyst .plans .physical .HashPartitioning
24
- import org .apache .spark .sql .execution .{ RangeExec , SortExec }
23
+ import org .apache .spark .sql .execution .RangeExec
25
24
import org .apache .spark .sql .execution .exchange .ShuffleExchangeExec
26
- import org .apache .spark .sql .execution .joins .SortMergeJoinExec
25
+ import org .apache .spark .sql .execution .joins .{ BuildRight , ShuffledHashJoinExec }
27
26
import org .apache .spark .sql .internal .SQLConf
28
27
import org .apache .spark .sql .test .SharedSQLContext
29
28
@@ -53,7 +52,7 @@ class PlanQueryStageTest extends SharedSQLContext {
53
52
conf.setConfString(" spark.sql.exchange.reuse" , " true" )
54
53
55
54
val planQueryStage = PlanQueryStage (conf)
56
- val newPlan = planQueryStage(createMergeJoinPlan (100 , 100 ))
55
+ val newPlan = planQueryStage(createJoinExec (100 , 100 ))
57
56
58
57
val collected = newPlan.collect {
59
58
case e : ShuffleQueryStageInput => e.childStage
@@ -68,7 +67,7 @@ class PlanQueryStageTest extends SharedSQLContext {
68
67
conf.setConfString(" spark.sql.exchange.reuse" , " true" )
69
68
70
69
val planQueryStage = PlanQueryStage (conf)
71
- val newPlan = planQueryStage(createMergeJoinPlan (100 , 101 ))
70
+ val newPlan = planQueryStage(createJoinExec (100 , 101 ))
72
71
73
72
val collected = newPlan.collect {
74
73
case e : ShuffleQueryStageInput => e.childStage
@@ -78,25 +77,20 @@ class PlanQueryStageTest extends SharedSQLContext {
78
77
assert(! collected(0 ).eq(collected(1 )))
79
78
}
80
79
81
- def createMergeJoinPlan (leftNum : Int , rightNum : Int ): SortMergeJoinExec = {
82
- val left = SortExec (
83
- Seq (SortOrder (UnresolvedAttribute (" blah" ), Ascending )),
84
- true ,
85
- ShuffleExchangeExec (
86
- HashPartitioning (Seq (UnresolvedAttribute (" blah" )), 100 ),
87
- RangeExec (org.apache.spark.sql.catalyst.plans.logical.Range (1 , leftNum, 1 , 1 ))))
88
-
89
- val right = SortExec (
90
- Seq (SortOrder (UnresolvedAttribute (" blah" ), Ascending )),
91
- true ,
92
- ShuffleExchangeExec (
93
- HashPartitioning (Seq (UnresolvedAttribute (" blah" )), 100 ),
94
- RangeExec (org.apache.spark.sql.catalyst.plans.logical.Range (1 , rightNum, 1 , 1 ))))
95
-
96
- SortMergeJoinExec (
80
+ def createJoinExec (leftNum : Int , rightNum : Int ): ShuffledHashJoinExec = {
81
+ val left = ShuffleExchangeExec (
82
+ HashPartitioning (Seq (UnresolvedAttribute (" blah" )), 100 ),
83
+ RangeExec (org.apache.spark.sql.catalyst.plans.logical.Range (1 , leftNum, 1 , 1 )))
84
+
85
+ val right = ShuffleExchangeExec (
86
+ HashPartitioning (Seq (UnresolvedAttribute (" blah" )), 100 ),
87
+ RangeExec (org.apache.spark.sql.catalyst.plans.logical.Range (1 , rightNum, 1 , 1 )))
88
+
89
+ ShuffledHashJoinExec (
97
90
Seq (UnresolvedAttribute (" blah" )),
98
91
Seq (UnresolvedAttribute (" blah" )),
99
92
Inner ,
93
+ BuildRight ,
100
94
None ,
101
95
left,
102
96
right)
0 commit comments