17
17
18
18
package org .apache .spark .sql .execution .adaptive
19
19
20
- import org .apache .spark .sql .catalyst .analysis .UnresolvedAttribute
21
20
import org .apache .spark .sql .catalyst .expressions .{Ascending , SortOrder }
22
21
import org .apache .spark .sql .catalyst .plans .Inner
23
22
import org .apache .spark .sql .catalyst .plans .physical .HashPartitioning
@@ -29,32 +28,39 @@ import org.apache.spark.sql.test.SharedSQLContext
29
28
30
29
class QueryStageTest extends SharedSQLContext {
31
30
32
- test(" Replaces ShuffleExchangeExec/BroadcastExchangeExec with reuse disabled " ) {
31
+ test(" Adaptive Query Execution repartitions " ) {
33
32
val plan = createMergeJoinPlan(100 , 100 )
34
33
35
- val resultQueryStage = ResultQueryStage (plan)
34
+ val resultQueryStage = PlanQueryStage .apply( new SQLConf ) (plan)
36
35
37
- resultQueryStage.execute()
36
+ val rdd = resultQueryStage.execute()
37
+ assert(rdd.getNumPartitions == 0 )
38
38
}
39
39
40
40
def createMergeJoinPlan (leftNum : Int , rightNum : Int ): SortMergeJoinExec = {
41
+ val leftRangeExec = RangeExec (
42
+ org.apache.spark.sql.catalyst.plans.logical.Range (1 , leftNum, 1 , 1 ))
43
+ val leftOutput = leftRangeExec.output(0 )
41
44
val left = SortExec (
42
- Seq (SortOrder (UnresolvedAttribute ( " blah " ) , Ascending )),
45
+ Seq (SortOrder (leftOutput , Ascending )),
43
46
true ,
44
47
ShuffleExchangeExec (
45
- HashPartitioning (Seq (UnresolvedAttribute ( " blah " ) ), 100 ),
46
- RangeExec (org.apache.spark.sql.catalyst.plans.logical. Range ( 1 , leftNum, 1 , 1 )) ))
48
+ HashPartitioning (Seq (leftOutput ), 100 ),
49
+ leftRangeExec ))
47
50
51
+ val rightRangeExec = RangeExec (
52
+ org.apache.spark.sql.catalyst.plans.logical.Range (1 , rightNum, 1 , 1 ))
53
+ val rightOutput = rightRangeExec.output(0 )
48
54
val right = SortExec (
49
- Seq (SortOrder (UnresolvedAttribute ( " blah " ) , Ascending )),
55
+ Seq (SortOrder (rightOutput , Ascending )),
50
56
true ,
51
57
ShuffleExchangeExec (
52
- HashPartitioning (Seq (UnresolvedAttribute ( " blah " ) ), 100 ),
53
- RangeExec (org.apache.spark.sql.catalyst.plans.logical. Range ( 1 , rightNum, 1 , 1 )) ))
58
+ HashPartitioning (Seq (rightOutput ), 100 ),
59
+ rightRangeExec ))
54
60
55
61
SortMergeJoinExec (
56
- Seq (UnresolvedAttribute ( " blah " ) ),
57
- Seq (UnresolvedAttribute ( " blah " ) ),
62
+ Seq (leftOutput ),
63
+ Seq (rightOutput ),
58
64
Inner ,
59
65
None ,
60
66
left,
0 commit comments