|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.execution.adaptive
|
19 | 19 |
|
20 |
| -import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder} |
21 |
| -import org.apache.spark.sql.catalyst.plans.Inner |
22 | 20 | import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
|
23 |
| -import org.apache.spark.sql.execution.{RangeExec, SortExec} |
| 21 | +import org.apache.spark.sql.execution.RangeExec |
24 | 22 | import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
|
25 |
| -import org.apache.spark.sql.execution.joins.SortMergeJoinExec |
26 | 23 | import org.apache.spark.sql.internal.SQLConf
|
27 | 24 | import org.apache.spark.sql.test.SharedSQLContext
|
28 | 25 |
|
29 | 26 | class QueryStageTest extends SharedSQLContext {
|
30 |
| - |
31 | 27 | test("Adaptive Query Execution repartitions") {
|
32 |
| - val plan = createMergeJoinPlan(100, 100) |
33 |
| - |
34 |
| - val resultQueryStage = PlanQueryStage.apply(new SQLConf)(plan) |
| 28 | + val originalNumPartitions = 100 |
35 | 29 |
|
36 |
| - val rdd = resultQueryStage.execute() |
37 |
| - assert(rdd.getNumPartitions == 0) |
38 |
| - } |
39 |
| - |
40 |
| - def createMergeJoinPlan(leftNum: Int, rightNum: Int): SortMergeJoinExec = { |
41 |
| - val leftRangeExec = RangeExec( |
42 |
| - org.apache.spark.sql.catalyst.plans.logical.Range(1, leftNum, 1, 1)) |
43 |
| - val leftOutput = leftRangeExec.output(0) |
44 |
| - val left = SortExec( |
45 |
| - Seq(SortOrder(leftOutput, Ascending)), |
46 |
| - true, |
47 |
| - ShuffleExchangeExec( |
48 |
| - HashPartitioning(Seq(leftOutput), 100), |
49 |
| - leftRangeExec)) |
| 30 | + val plan = { |
| 31 | + val leftRangeExec = RangeExec( |
| 32 | + org.apache.spark.sql.catalyst.plans.logical.Range(1, 1000, 1, 1)) |
50 | 33 |
|
51 |
| - val rightRangeExec = RangeExec( |
52 |
| - org.apache.spark.sql.catalyst.plans.logical.Range(1, rightNum, 1, 1)) |
53 |
| - val rightOutput = rightRangeExec.output(0) |
54 |
| - val right = SortExec( |
55 |
| - Seq(SortOrder(rightOutput, Ascending)), |
56 |
| - true, |
57 | 34 | ShuffleExchangeExec(
|
58 |
| - HashPartitioning(Seq(rightOutput), 100), |
59 |
| - rightRangeExec)) |
| 35 | + HashPartitioning(leftRangeExec.output, originalNumPartitions), |
| 36 | + leftRangeExec) |
| 37 | + } |
60 | 38 |
|
61 |
| - SortMergeJoinExec( |
62 |
| - Seq(leftOutput), |
63 |
| - Seq(rightOutput), |
64 |
| - Inner, |
65 |
| - None, |
66 |
| - left, |
67 |
| - right) |
| 39 | + assert(plan.execute().getNumPartitions == originalNumPartitions) |
| 40 | + assert(PlanQueryStage.apply(new SQLConf)(plan).execute().getNumPartitions == 1) |
68 | 41 | }
|
69 | 42 | }
|
0 commit comments